Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdds.scm;

import org.apache.ratis.thirdparty.io.grpc.CallOptions;
import org.apache.ratis.thirdparty.io.grpc.Channel;
import org.apache.ratis.thirdparty.io.grpc.ClientCall;
import org.apache.ratis.thirdparty.io.grpc.ClientInterceptor;
import org.apache.ratis.thirdparty.io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
import org.apache.ratis.thirdparty.io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
import org.apache.ratis.thirdparty.io.grpc.Metadata;
import org.apache.ratis.thirdparty.io.grpc.MethodDescriptor;
import org.apache.ratis.thirdparty.io.grpc.Status;

/**
* Interceptor to capture gRPC level metrics.
*/
public class GrpcClientMetricsInterceptor implements ClientInterceptor {

private final XceiverClientMetrics metrics;

public GrpcClientMetricsInterceptor(XceiverClientMetrics metrics) {
this.metrics = metrics;
}
Comment on lines +33 to +39

@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions,
Channel next) {

return new SimpleForwardingClientCall<ReqT, RespT>(
next.newCall(method, callOptions)) {

@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
super.start(new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onClose(Status status, Metadata trailers) {
if (!status.isOk()) {
if (status.getCode() == Status.Code.UNAUTHENTICATED) {
metrics.incGrpcAuthenticationFailures();
} else if (status.getCode() == Status.Code.UNAVAILABLE ||
status.getCode() == Status.Code.DEADLINE_EXCEEDED) {
metrics.incGrpcConnectionFailures();
}
}
super.onClose(status, trailers);
}
}, headers);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public class XceiverClientMetrics implements MetricsSource {
private @Metric MutableCounterLong totalOps;
private @Metric MutableCounterLong ecReconstructionTotal;
private @Metric MutableCounterLong ecReconstructionFailsTotal;
private @Metric MutableCounterLong grpcAuthenticationFailures;
private @Metric MutableCounterLong grpcConnectionFailures;
private EnumMap<ContainerProtos.Type, MutableCounterLong> pendingOpsArray;
private EnumMap<ContainerProtos.Type, MutableCounterLong> opsArray;
private EnumMap<ContainerProtos.Type, PerformanceMetrics> containerOpsLatency;
Expand Down Expand Up @@ -115,6 +117,14 @@ public void incECReconstructionFailsTotal() {
ecReconstructionFailsTotal.incr();
}

public void incGrpcAuthenticationFailures() {
grpcAuthenticationFailures.incr();
}

public void incGrpcConnectionFailures() {
grpcConnectionFailures.incr();
}

@VisibleForTesting
public long getTotalOpCount() {
return totalOps.value();
Expand Down Expand Up @@ -144,6 +154,8 @@ public void getMetrics(MetricsCollector collector, boolean b) {
totalOps.snapshot(recordBuilder, true);
ecReconstructionTotal.snapshot(recordBuilder, true);
ecReconstructionFailsTotal.snapshot(recordBuilder, true);
grpcAuthenticationFailures.snapshot(recordBuilder, true);
grpcConnectionFailures.snapshot(recordBuilder, true);

for (ContainerProtos.Type type : ContainerProtos.Type.values()) {
pendingOpsArray.get(type).snapshot(recordBuilder, b);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hdds.scm;

import static org.apache.ozone.test.MetricsAsserts.assertCounter;
import static org.apache.ozone.test.MetricsAsserts.getMetrics;

import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.junit.jupiter.api.Test;

/**
* Unit test for XceiverClientMetrics.
*/
public class TestXceiverClientMetricsUnit {

@Test
public void testGrpcFailures() {
XceiverClientMetrics metrics = XceiverClientMetrics.create();
try {
metrics.incGrpcAuthenticationFailures();
metrics.incGrpcConnectionFailures();
metrics.incGrpcConnectionFailures();

MetricsRecordBuilder recordBuilder = getMetrics(XceiverClientMetrics.SOURCE_NAME);
assertCounter("GrpcAuthenticationFailures", 1L, recordBuilder);
assertCounter("GrpcConnectionFailures", 2L, recordBuilder);
} finally {
metrics.unRegister();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.hadoop.ozone.HddsDatanodeStopService;
import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient;
import org.apache.hadoop.ozone.container.common.DatanodeLayoutStorage;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.report.ReportManager;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.CloseContainerCommandHandler;
Expand All @@ -66,6 +67,7 @@
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.RefreshVolumeUsageCommandHandler;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ReplicateContainerCommandHandler;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.SetNodeOperationalStateCommandHandler;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.volume.VolumeChoosingPolicyFactory;
import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCoordinator;
import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionMetrics;
Expand All @@ -80,6 +82,7 @@
import org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig;
import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor;
import org.apache.hadoop.ozone.container.replication.ReplicationSupervisorMetrics;
import org.apache.hadoop.ozone.container.replication.ReplicationTask;
import org.apache.hadoop.ozone.container.replication.SimpleContainerDownloader;
import org.apache.hadoop.ozone.container.upgrade.DataNodeUpgradeFinalizer;
import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures;
Expand Down Expand Up @@ -195,6 +198,9 @@ public DatanodeStateMachine(HddsDatanodeService hddsDatanodeService,
}
nextHB = new AtomicLong(Time.monotonicNow());

ReplicationConfig replicationConfig =
conf.getObject(ReplicationConfig.class);

ContainerImporter importer = new ContainerImporter(conf,
container.getContainerSet(),
container.getController(),
Expand All @@ -205,20 +211,36 @@ public DatanodeStateMachine(HddsDatanodeService hddsDatanodeService,
importer,
new SimpleContainerDownloader(conf, certClient));
ContainerReplicator pushReplicator = new PushReplicator(conf,
new OnDemandContainerReplicationSource(container.getController()),
new OnDemandContainerReplicationSource(container.getController(),
replicationConfig),
new GrpcContainerUploader(conf, certClient, container.getController())
);

pullReplicatorWithMetrics = new MeasuredReplicator(pullReplicator, "pull");
pushReplicatorWithMetrics = new MeasuredReplicator(pushReplicator, "push");

ReplicationConfig replicationConfig =
conf.getObject(ReplicationConfig.class);
supervisor = ReplicationSupervisor.newBuilder()
.stateContext(context)
.datanodeConfig(dnConf)
.replicationConfig(replicationConfig)
.clock(clock)
.volumeChooser(task -> {
if (task instanceof ReplicationTask) {
ReplicationTask repTask = (ReplicationTask) task;
if (repTask.getTarget() == null) {
try {
return importer.chooseNextVolume(importer.getDefaultReplicationSpace());

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This volume selection has a reservation side effect: chooseNextVolume() increments the target volume's committed bytes. Doing it here makes the reservation lifetime hard to pair with execution.

If the task is later dropped before DownloadAndImportReplicator runs, the reservation is leaked. Conversely, if a pull attempt hits REPLICATION_LIMIT_REACHED, DownloadAndImportReplicator releases the reservation, marks the same task QUEUED, and the retry reuses task.getVolume() without reserving again, so committed bytes can be decremented repeatedly.

Can we move volume selection/reservation into each actual pull attempt, or explicitly track whether the task currently owns a reservation?

} catch (IOException e) {
LOG.error("Failed to choose volume for replication task " + task, e);
return null;
}
} else {
Container localContainer = getContainer().getContainerSet().getContainer(task.getContainerId());
return localContainer != null ? (HddsVolume) localContainer.getContainerData().getVolume() : null;
}
}
return null;
})
.build();

replicationSupervisorMetrics =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.apache.commons.io.FileUtils;
Expand Down Expand Up @@ -109,6 +110,9 @@ public class HddsVolume extends StorageVolume {
private final AtomicBoolean dbLoaded = new AtomicBoolean(false);
private final AtomicBoolean dbLoadFailure = new AtomicBoolean(false);

private final AtomicInteger activeOutboundReplications =
new AtomicInteger(0);

/**
* Builder for HddsVolume.
*/
Expand Down Expand Up @@ -701,4 +705,16 @@ public void compactDb() {
LOG.warn("compact rocksdb error in {}", dbFilePath, e);
}
}

public int incActiveOutboundReplications() {
return activeOutboundReplications.incrementAndGet();
}

public int decActiveOutboundReplications() {
return activeOutboundReplications.decrementAndGet();
}

public int getActiveOutboundReplications() {
return activeOutboundReplications.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.time.Instant;
import java.time.ZoneId;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;

/**
* Abstract class to capture common variables and methods for different types
Expand All @@ -34,7 +35,7 @@ public abstract class AbstractReplicationTask {

private final long containerId;

private final Instant queued;
private Instant queued;

private final long deadlineMsSinceEpoch;

Expand All @@ -44,6 +45,8 @@ public abstract class AbstractReplicationTask {

private boolean shouldOnlyRunOnInServiceDatanodes = true;

private HddsVolume volume;

protected AbstractReplicationTask(long containerID,
long deadlineMsSinceEpoch, long term) {
this(containerID, deadlineMsSinceEpoch, term,
Expand Down Expand Up @@ -78,6 +81,10 @@ public Instant getQueued() {
return queued;
}

public void updateQueuedTime() {
this.queued = Instant.now();
}

public long getTerm() {
return term;
}
Expand Down Expand Up @@ -152,6 +159,14 @@ public String toString() {
return sb.toString();
}

public HddsVolume getVolume() {
return volume;
}

public void setVolume(HddsVolume volume) {
this.volume = volume;
}

/**
* ENUM representing the different status values a replication task can
* have.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.container.replication;

import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
Expand All @@ -34,6 +35,6 @@ public interface ContainerDownloader extends Closeable {

Path getContainerDataFromReplicas(long containerId,
List<DatanodeDetails> sources, Path downloadDir,
CopyContainerCompression compression);
CopyContainerCompression compression) throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ private static void deleteFileQuietely(Path tarFilePath) {
}
}

HddsVolume chooseNextVolume(long spaceToReserve) throws IOException {
public HddsVolume chooseNextVolume(long spaceToReserve) throws IOException {
// Choose volume that can hold both container in tmp and dest directory
LOG.debug("Choosing volume to reserve space : {}", spaceToReserve);
return volumeChoosingPolicy.chooseVolume(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.List;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.replication.AbstractReplicationTask.Status;
Expand Down Expand Up @@ -70,11 +72,13 @@ public void replicate(ReplicationTask task) {

LOG.info("Starting replication of container {} from {} using {}",
containerID, sourceDatanodes, compression);
HddsVolume targetVolume = null;

HddsVolume targetVolume = task.getVolume();
try {
targetVolume = containerImporter.chooseNextVolume(
containerImporter.getDefaultReplicationSpace());
if (targetVolume == null) {
targetVolume = containerImporter.chooseNextVolume(
containerImporter.getDefaultReplicationSpace());
task.setVolume(targetVolume);
}

// Wait for the download. This thread pool is limiting the parallel
// downloads, so it's ok to block here and wait for the full download.
Expand All @@ -96,8 +100,16 @@ public void replicate(ReplicationTask task) {
LOG.info("Container {} is replicated successfully", containerID);
task.setStatus(Status.DONE);
} catch (IOException e) {
LOG.error("Container {} replication was unsuccessful.", containerID, e);
task.setStatus(Status.FAILED);
if (e instanceof StorageContainerException &&
((StorageContainerException) e).getResult() ==
ContainerProtos.Result.REPLICATION_LIMIT_REACHED) {
LOG.info("Container {} replication will be retried as the " +
"replication limit was reached.", containerID);
task.setStatus(Status.QUEUED);
} else {
LOG.error("Container {} replication was unsuccessful.", containerID, e);
task.setStatus(Status.FAILED);
}
} finally {
if (targetVolume != null) {
targetVolume.incCommittedBytes(-containerImporter.getDefaultReplicationSpace());
Expand Down
Loading