From b4d6eedc32f2cca46eb837c5e794899bf60d645f Mon Sep 17 00:00:00 2001 From: Wei-Chiu Chuang Date: Tue, 21 Apr 2026 14:01:54 -0700 Subject: [PATCH 1/5] HDDS-15071. [SCM] Add configuration and global reconstruction limit Change-Id: Ifa6911e468f073ac2cc848edc3da20dc377c983c --- .../replication/OverReplicatedProcessor.java | 5 + .../replication/ReplicationManager.java | 111 ++++++++++++++++++ .../replication/UnderReplicatedProcessor.java | 5 + .../UnhealthyReplicationProcessor.java | 13 ++ .../replication/TestReplicationManager.java | 68 +++++++++++ .../TestUnderReplicatedProcessor.java | 21 ++++ 6 files changed, 223 insertions(+) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/OverReplicatedProcessor.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/OverReplicatedProcessor.java index fff263fee341..6de6cc771883 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/OverReplicatedProcessor.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/OverReplicatedProcessor.java @@ -54,6 +54,11 @@ protected boolean inflightOperationLimitReached(ReplicationManager rm, return false; } + @Override + protected boolean reconstructionLimitReached(ReplicationManager rm) { + return false; + } + @Override protected int sendDatanodeCommands( ReplicationManager replicationManager, diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java index 8cd8444d1d2f..b2696f279dfc 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java @@ -37,6 +37,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -159,6 +160,18 @@ public class ReplicationManager implements SCMService, ContainerReplicaPendingOp private final Map excludedNodes = new ConcurrentHashMap<>(); + /** + * Track the number of active EC reconstruction commands across the cluster. + */ + private final AtomicInteger inflightReconstructionCount = new AtomicInteger(0); + + /** + * Mapping from reconstruction command ID to the number of pending fragments + * for that command. Used to know when the whole command is finished. + */ + private final Map reconstructionCommandIdToPendingFragmentCount = + new ConcurrentHashMap<>(); + /** * SCMService related variables. * After leaving safe mode, replicationMonitor needs to wait for a while @@ -422,6 +435,33 @@ public long getInflightReplicationCount() { .getPendingOpCount(ContainerReplicaOp.PendingOpType.ADD); } + /** + * Returns the number of active EC reconstruction commands currently in + * progress across the cluster. + */ + public int getInflightReconstructionCount() { + return inflightReconstructionCount.get(); + } + + /** + * Returns the maximum number of inflight reconstruction commands allowed + * across the cluster at any given time. + * @return the maximum number of inflight reconstruction commands allowed + */ + public int getReconstructionInFlightLimit() { + return rmConf.getReconstructionGlobalLimit(); + } + + /** + * Returns true if the number of inflight reconstruction commands has reached + * the global limit. + * @return true if the limit is reached, false otherwise + */ + public boolean isReconstructionLimitReached() { + int limit = getReconstructionInFlightLimit(); + return limit > 0 && getInflightReconstructionCount() >= limit; + } + /** * Sends delete container command for the given container to the given * datanode. @@ -697,6 +737,8 @@ private void adjustPendingOpsAndMetrics(ContainerInfo containerInfo, containerReplicaPendingOps.scheduleAddReplica(containerInfo.containerID(), targets.get(i), targetIndexes.byteAt(i), cmd, scmDeadlineEpochMs, requiredSize, clock.millis()); } + inflightReconstructionCount.incrementAndGet(); + reconstructionCommandIdToPendingFragmentCount.put(cmd.getId(), targetIndexes.size()); getMetrics().incrEcReconstructionCmdsSentTotal(); } else if (cmd.getType() == Type.replicateContainerCommand) { ReplicateContainerCommand rcc = (ReplicateContainerCommand) cmd; @@ -1074,6 +1116,19 @@ ReplicationQueue getQueue() { @Override public void opCompleted(ContainerReplicaOp op, ContainerID containerID, boolean timedOut) { + if (op.getOpType() == ContainerReplicaOp.PendingOpType.ADD + && op.getCommand() != null + && op.getCommand().getType() == Type.reconstructECContainersCommand) { + long cmdId = op.getCommand().getId(); + reconstructionCommandIdToPendingFragmentCount.compute(cmdId, (k, v) -> { + if (v == null || v <= 1) { + inflightReconstructionCount.decrementAndGet(); + return null; + } + return v - 1; + }); + } + if (!(timedOut && op.getOpType() == ContainerReplicaOp.PendingOpType.DELETE)) { // We only care about expired delete ops. All others should be ignored. return; @@ -1291,6 +1346,38 @@ public static class ReplicationManagerConfiguration ) private int containerSampleLimit = 100; + @Config(key = "hdds.scm.replication.decommission.ec.reconstruction.enabled", + type = ConfigType.BOOLEAN, + defaultValue = "false", + reconfigurable = true, + tags = { SCM }, + description = "If true, SCM will switch from 1-1 replication to " + + "multi-source reconstruction for EC containers on decommissioning " + + "nodes when the node's load exceeds the threshold." + ) + private boolean ecDecommissionReconstructionEnabled = false; + + @Config(key = "hdds.scm.replication.decommission.ec.reconstruction.load.factor", + type = ConfigType.DOUBLE, + defaultValue = "0.9", + reconfigurable = true, + tags = { SCM }, + description = "The threshold factor (between 0 and 1) of a node's " + + "replication limit at which SCM switches to reconstruction for " + + "EC decommission. Default is 0.9." + ) + private double ecDecommissionReconstructionLoadFactor = 0.9; + + @Config(key = "hdds.scm.replication.reconstruction.global.limit", + type = ConfigType.INT, + defaultValue = "50", + reconfigurable = true, + tags = { SCM }, + description = "A cluster-wide limit to restrict the total number of " + + "active EC reconstruction commands across the cluster." + ) + private int reconstructionGlobalLimit = 50; + @Config(key = "hdds.scm.replication.quasi.closed.stuck.best.origin.copies", type = ConfigType.INT, defaultValue = "3", @@ -1347,6 +1434,30 @@ public void setDatanodeReplicationLimit(int limit) { this.datanodeReplicationLimit = limit; } + public boolean isEcDecommissionReconstructionEnabled() { + return ecDecommissionReconstructionEnabled; + } + + public void setEcDecommissionReconstructionEnabled(boolean enabled) { + this.ecDecommissionReconstructionEnabled = enabled; + } + + public double getEcDecommissionReconstructionLoadFactor() { + return ecDecommissionReconstructionLoadFactor; + } + + public void setEcDecommissionReconstructionLoadFactor(double factor) { + this.ecDecommissionReconstructionLoadFactor = factor; + } + + public int getReconstructionGlobalLimit() { + return reconstructionGlobalLimit; + } + + public void setReconstructionGlobalLimit(int limit) { + this.reconstructionGlobalLimit = limit; + } + public void setMaintenanceRemainingRedundancy(int redundancy) { this.maintenanceRemainingRedundancy = redundancy; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnderReplicatedProcessor.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnderReplicatedProcessor.java index 8f291158902a..20b8d05946bb 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnderReplicatedProcessor.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnderReplicatedProcessor.java @@ -52,6 +52,11 @@ protected boolean inflightOperationLimitReached(ReplicationManager rm, return rm.getInflightReplicationCount() >= pendingOpLimit; } + @Override + protected boolean reconstructionLimitReached(ReplicationManager rm) { + return rm.isReconstructionLimitReached(); + } + @Override protected int sendDatanodeCommands( ReplicationManager replicationManager, diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java index 6508b73c10e6..c37aa7dd7058 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationProcessor.java @@ -76,6 +76,13 @@ protected abstract void requeueHealthResult( protected abstract boolean inflightOperationLimitReached( ReplicationManager rm, long inflightLimit); + /** + * Check if the reconstruction operation limit is reached. + * @param rm The ReplicationManager instance + * @return True if the limit is reached, false otherwise. + */ + protected abstract boolean reconstructionLimitReached(ReplicationManager rm); + /** * Read messages from the ReplicationManager under replicated queue and, * form commands to correct replication. The commands are added @@ -105,6 +112,12 @@ public void processAll(ReplicationQueue queue) { .getMetrics().incrPendingReplicationLimitReachedTotal(); break; } + if (reconstructionLimitReached(replicationManager)) { + LOG.info("The maximum number of pending reconstruction commands ({}) " + + "are scheduled. Ending the iteration.", + replicationManager.getReconstructionInFlightLimit()); + break; + } HealthResult healthResult = dequeueHealthResultFromQueue(queue); if (healthResult == null) { diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java index 1d7d619efb0f..393d8b66f6e3 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java @@ -48,7 +48,9 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import com.google.protobuf.ByteString; import com.google.protobuf.UnsafeByteOperations; import java.io.IOException; import java.time.Instant; @@ -1801,4 +1803,70 @@ private void mockReplicationCommandCounts( }); } + @Test + public void testInflightReconstructionLimit() throws IOException, NodeNotFoundException { + rmConf.setReconstructionGlobalLimit(2); + ReplicationManager rm = createReplicationManager(); + assertEquals(2, rm.getReconstructionInFlightLimit()); + assertEquals(0, rm.getInflightReconstructionCount()); + assertFalse(rm.isReconstructionLimitReached()); + + mockReplicationCommandCounts(dn -> 0, dn -> 0); + + ContainerInfo container = ReplicationTestUtil.createContainerInfo( + repConfig, 1, HddsProtos.LifeCycleState.CLOSED, 10, 20); + + // Send one reconstruction command with 2 fragments + ReconstructECContainersCommand cmd1 = new ReconstructECContainersCommand( + 1L, Collections.emptyList(), + ImmutableList.of(MockDatanodeDetails.randomDatanodeDetails(), + MockDatanodeDetails.randomDatanodeDetails()), + integers2ByteString(ImmutableList.of(1, 2)), (ECReplicationConfig) repConfig); + + rm.sendThrottledReconstructionCommand(container, cmd1); + assertEquals(1, rm.getInflightReconstructionCount()); + assertFalse(rm.isReconstructionLimitReached()); + + // Send another reconstruction command with 1 fragment + ReconstructECContainersCommand cmd2 = new ReconstructECContainersCommand( + 2L, Collections.emptyList(), + ImmutableList.of(MockDatanodeDetails.randomDatanodeDetails()), + integers2ByteString(ImmutableList.of(3)), (ECReplicationConfig) repConfig); + rm.sendThrottledReconstructionCommand(container, cmd2); + assertEquals(2, rm.getInflightReconstructionCount()); + assertTrue(rm.isReconstructionLimitReached()); + + // Complete one fragment of cmd1 + ContainerReplicaOp op1 = new ContainerReplicaOp( + ContainerReplicaOp.PendingOpType.ADD, + cmd1.getTargetDatanodes().get(0), 1, cmd1, Long.MAX_VALUE, 0); + rm.opCompleted(op1, container.containerID(), false); + // Still 2 because cmd1 is not fully finished + assertEquals(2, rm.getInflightReconstructionCount()); + + // Complete second fragment of cmd1 + ContainerReplicaOp op2 = new ContainerReplicaOp( + ContainerReplicaOp.PendingOpType.ADD, + cmd1.getTargetDatanodes().get(1), 2, cmd1, Long.MAX_VALUE, 0); + rm.opCompleted(op2, container.containerID(), false); + // Now 1 + assertEquals(1, rm.getInflightReconstructionCount()); + assertFalse(rm.isReconstructionLimitReached()); + + // Complete cmd2 + ContainerReplicaOp op3 = new ContainerReplicaOp( + ContainerReplicaOp.PendingOpType.ADD, + cmd2.getTargetDatanodes().get(0), 3, cmd2, Long.MAX_VALUE, 0); + rm.opCompleted(op3, container.containerID(), false); + assertEquals(0, rm.getInflightReconstructionCount()); + } + + private static ByteString integers2ByteString(List src) { + byte[] dst = new byte[src.size()]; + for (int i = 0; i < src.size(); i++) { + dst[i] = src.get(i).byteValue(); + } + return dst.length > 0 ? UnsafeByteOperations.unsafeWrap(dst) + : ByteString.EMPTY; + } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestUnderReplicatedProcessor.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestUnderReplicatedProcessor.java index de93f7b9194d..958da96239a8 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestUnderReplicatedProcessor.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestUnderReplicatedProcessor.java @@ -131,4 +131,25 @@ public void testMessageNotProcessedIfGlobalLimitReached() throws IOException { assertEquals(1, rmMetrics.getPendingReplicationLimitReachedTotal()); } + @Test + public void testMessageNotProcessedIfReconstructionLimitReached() + throws IOException { + when(replicationManager.isReconstructionLimitReached()).thenReturn(true); + when(replicationManager.getReconstructionInFlightLimit()).thenReturn(10); + when(replicationManager.processUnderReplicatedContainer(any())).thenReturn(1); + + ContainerInfo container = ReplicationTestUtil + .createContainer(HddsProtos.LifeCycleState.CLOSED, repConfig); + UnderReplicatedHealthResult result = new UnderReplicatedHealthResult( + container, 3, false, false, false); + queue.enqueue(result); + + underReplicatedProcessor.processAll(queue); + + // The message should not be processed and still be on the queue (re-queued) + assertEquals(1, queue.underReplicatedQueueSize()); + // We should not have processed anything in RM + verify(replicationManager, times(0)).processUnderReplicatedContainer(any()); + } + } From 1dd77716355c90fd66509f810d72b345c6dc07b2 Mon Sep 17 00:00:00 2001 From: Wei-Chiu Chuang Date: Tue, 21 Apr 2026 14:07:25 -0700 Subject: [PATCH 2/5] HDDS-15072. [SCM] Implement dynamic load-based switching to reconstruction for EC decommission Change-Id: Ie4b8662b10de5d3e73312f6b18ac825853dfd72e --- .../ECUnderReplicationHandler.java | 224 +++++++++++------- .../replication/ReplicationManager.java | 21 ++ .../TestECUnderReplicationHandler.java | 50 ++++ 3 files changed, 215 insertions(+), 80 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java index 0bf886569c9d..6473ac20b9b8 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java @@ -278,17 +278,40 @@ private int processMissingIndexes( List availableSourceNodes, List excludedNodes, List usedNodes) throws IOException { + List missingIndexes = replicaCount.unavailableIndexes(true); + if (missingIndexes.isEmpty()) { + return 0; + } + return processReconstruction(replicaCount, missingIndexes, sources, + availableSourceNodes, excludedNodes, usedNodes); + } + + /** + * Core logic to schedule an EC reconstruction command. + * + * @param replicaCount the current replica count of the container + * @param missingIndexes the indexes that need to be reconstructed + * @param sources available source replicas + * @param availableSourceNodes healthy source nodes + * @param excludedNodes nodes to be excluded from target selection + * @param usedNodes nodes already used for this container + * @return number of commands sent + * @throws IOException if an error occurs + */ + private int processReconstruction( + ECContainerReplicaCount replicaCount, + List missingIndexes, + Map> sources, + List availableSourceNodes, + List excludedNodes, + List usedNodes) throws IOException { ContainerInfo container = replicaCount.getContainer(); ECReplicationConfig repConfig = - (ECReplicationConfig)container.getReplicationConfig(); - List missingIndexes = replicaCount.unavailableIndexes(true); - LOG.debug("Processing missing indexes {} for container {}.", missingIndexes, - container.containerID()); + (ECReplicationConfig) container.getReplicationConfig(); + LOG.debug("Processing reconstruction of indexes {} for container {}.", + missingIndexes, container.containerID()); final int expectedTargetCount = missingIndexes.size(); boolean recoveryIsCritical = expectedTargetCount == repConfig.getParity(); - if (expectedTargetCount == 0) { - return 0; - } int commandsSent = 0; if (sources.size() >= repConfig.getData()) { @@ -297,9 +320,9 @@ private int processMissingIndexes( final boolean hasOverloaded = !excludedDueToLoad.isEmpty(); final List excludedOrOverloadedNodes = hasOverloaded ? new ArrayList<>(ImmutableSet.builder() - .addAll(excludedNodes) - .addAll(excludedDueToLoad) - .build()) + .addAll(excludedNodes) + .addAll(excludedDueToLoad) + .build()) : excludedNodes; // placement with overloaded nodes excluded @@ -350,12 +373,43 @@ private int processMissingIndexes( } if (0 < targetCount) { usedNodes.addAll(selectedDatanodes); - // TODO - what are we adding all the selected nodes to available - // sources? availableSourceNodes.addAll(selectedDatanodes); List sourceDatanodesWithIndex = new ArrayList<>(); - for (Pair src : sources.values()) { + + // If we have more than the required number of data blocks, we can + // prefer IN_SERVICE nodes to avoid overloading decommissioning nodes. + List> sortedSources = + sources.values().stream() + .sorted((p1, p2) -> { + boolean p1InService = + p1.getRight().getOperationalState() == IN_SERVICE; + boolean p2InService = + p2.getRight().getOperationalState() == IN_SERVICE; + if (p1InService && !p2InService) { + return -1; + } + if (!p1InService && p2InService) { + return 1; + } + return 0; + }) + .collect(Collectors.toList()); + + int inServiceCount = 0; + for (Pair src : sortedSources) { + if (src.getRight().getOperationalState() == IN_SERVICE) { + inServiceCount++; + } + } + + for (Pair src : sortedSources) { + // If we have enough in-service nodes to fulfill the reconstruction + // requirements, we skip any out-of-service nodes. + if (inServiceCount >= repConfig.getData() && + src.getRight().getOperationalState() != IN_SERVICE) { + continue; + } sourceDatanodesWithIndex.add( new ReconstructECContainersCommand .DatanodeDetailsAndReplicaIndex( @@ -368,11 +422,6 @@ private int processMissingIndexes( sourceDatanodesWithIndex, selectedDatanodes, integers2ByteString(missingIndexes), repConfig); - // This can throw a CommandTargetOverloadedException, but there is no - // point in retrying here. The sources we picked already have the - // overloaded nodes excluded, so we should not get an overloaded - // exception, but it could happen due to other threads adding work to - // the DNs. If it happens here, we just let the exception bubble up. replicationManager.sendThrottledReconstructionCommand( container, reconstructionCommand); for (int i = 0; i < missingIndexes.size(); i++) { @@ -383,7 +432,7 @@ private int processMissingIndexes( } if (targetCount != expectedTargetCount) { LOG.debug("Insufficient nodes were returned from the placement policy" + - " to fully reconstruct container {}. Requested {} received {}", + " to fully reconstruct container {}. Requested {} received {}", container.getContainerID(), expectedTargetCount, targetCount); if (hasOverloaded && recoveryIsCritical) { metrics.incrECPartialReconstructionCriticalTotal(); @@ -400,8 +449,6 @@ private int processMissingIndexes( + " {}. Available sources are: {}", container.containerID(), repConfig.getData(), sources.size(), sources); } - LOG.trace("Sent {} commands for container {}.", commandsSent, - container.containerID()); return commandsSent; } @@ -429,71 +476,88 @@ private int processDecommissioningIndexes( throws IOException { ContainerInfo container = replicaCount.getContainer(); Set decomIndexes = replicaCount.decommissioningOnlyIndexes(true); - int commandsSent = 0; - if (!decomIndexes.isEmpty()) { - LOG.debug("Processing decommissioning indexes {} for container {}.", - decomIndexes, container.containerID()); - final List selectedDatanodes = getTargetDatanodes( - container, decomIndexes.size(), usedNodes, excludedNodes); + if (decomIndexes.isEmpty()) { + return 0; + } - ContainerPlacementStatus placementStatusWithSelectedTargets = - validatePlacement(container, availableSourceNodes, selectedDatanodes); - if (!placementStatusWithSelectedTargets.isPolicySatisfied()) { - LOG.debug("Target nodes + existing nodes for EC container {}" + - " will not satisfy placement policy {}. Reason: {}. Selected" + - " nodes: {}. Available source nodes: {}. Resuming recovery " + - "regardless.", - container.containerID(), containerPlacement.getClass().getName(), - placementStatusWithSelectedTargets.misReplicatedReason(), - selectedDatanodes, availableSourceNodes); + if (replicationManager.getConfig().isEcDecommissionReconstructionEnabled()) { + for (Integer index : decomIndexes) { + Pair source = sources.get(index); + if (source != null && replicationManager.isNodeHighlyLoaded( + source.getLeft().getDatanodeDetails())) { + LOG.info("Source node {} is highly loaded, switching to " + + "reconstruction for decommissioning container {}", + source.getLeft().getDatanodeDetails(), container.containerID()); + return processReconstruction(replicaCount, + new ArrayList<>(decomIndexes), sources, availableSourceNodes, + excludedNodes, usedNodes); + } } + } - usedNodes.addAll(selectedDatanodes); - Iterator iterator = selectedDatanodes.iterator(); - // In this case we need to do one to one copy. - CommandTargetOverloadedException overloadedException = null; - for (Integer decomIndex : decomIndexes) { - Pair source = sources.get(decomIndex); - if (source == null) { - LOG.warn("Cannot find source replica for decommissioning index " + - "{} in container {}", decomIndex, container.containerID()); - continue; - } - ContainerReplica sourceReplica = source.getLeft(); - if (!iterator.hasNext()) { - LOG.warn("Couldn't find enough targets. Available source" - + " nodes: {}, the target nodes: {}, excluded nodes: {}," - + " usedNodes: {}, and the decommission indexes: {}", - sources.values().stream() - .map(Pair::getLeft).collect(Collectors.toSet()), - selectedDatanodes, excludedNodes, usedNodes, decomIndexes); - break; - } - try { - createReplicateCommand( - container, iterator, sourceReplica, replicaCount); - commandsSent++; - } catch (CommandTargetOverloadedException e) { - LOG.debug("Unable to send Replicate command for container {}" + - " index {} because the source node {} is overloaded.", - container.getContainerID(), sourceReplica.getReplicaIndex(), - sourceReplica.getDatanodeDetails()); - overloadedException = e; - } + LOG.debug("Processing decommissioning indexes {} for container {}.", + decomIndexes, container.containerID()); + final List selectedDatanodes = getTargetDatanodes( + container, decomIndexes.size(), usedNodes, excludedNodes); + + ContainerPlacementStatus placementStatusWithSelectedTargets = + validatePlacement(container, availableSourceNodes, selectedDatanodes); + if (!placementStatusWithSelectedTargets.isPolicySatisfied()) { + LOG.debug("Target nodes + existing nodes for EC container {}" + + " will not satisfy placement policy {}. Reason: {}. Selected" + + " nodes: {}. Available source nodes: {}. Resuming recovery " + + "regardless.", + container.containerID(), containerPlacement.getClass().getName(), + placementStatusWithSelectedTargets.misReplicatedReason(), + selectedDatanodes, availableSourceNodes); + } + + usedNodes.addAll(selectedDatanodes); + Iterator iterator = selectedDatanodes.iterator(); + // In this case we need to do one to one copy. + int commandsSent = 0; + CommandTargetOverloadedException overloadedException = null; + for (Integer decomIndex : decomIndexes) { + Pair source = sources.get(decomIndex); + if (source == null) { + LOG.warn("Cannot find source replica for decommissioning index " + + "{} in container {}", decomIndex, container.containerID()); + continue; + } + ContainerReplica sourceReplica = source.getLeft(); + if (!iterator.hasNext()) { + LOG.warn("Couldn't find enough targets. Available source" + + " nodes: {}, the target nodes: {}, excluded nodes: {}," + + " usedNodes: {}, and the decommission indexes: {}", + sources.values().stream() + .map(Pair::getLeft).collect(Collectors.toSet()), + selectedDatanodes, excludedNodes, usedNodes, decomIndexes); + break; } - if (overloadedException != null) { - throw overloadedException; + try { + createReplicateCommand( + container, iterator, sourceReplica, replicaCount); + commandsSent++; + } catch (CommandTargetOverloadedException e) { + LOG.debug("Unable to send Replicate command for container {}" + + " index {} because the source node {} is overloaded.", + container.getContainerID(), sourceReplica.getReplicaIndex(), + sourceReplica.getDatanodeDetails()); + overloadedException = e; } + } + if (overloadedException != null) { + throw overloadedException; + } - if (selectedDatanodes.size() != decomIndexes.size()) { - LOG.debug("Insufficient nodes were returned from the placement policy" + - " to fully replicate the decommission indexes for container {}." + - " Requested {} received {}", container.getContainerID(), - decomIndexes.size(), selectedDatanodes.size()); - metrics.incrEcPartialReplicationForOutOfServiceReplicasTotal(); - throw new InsufficientDatanodesException(decomIndexes.size(), - selectedDatanodes.size()); - } + if (selectedDatanodes.size() != decomIndexes.size()) { + LOG.debug("Insufficient nodes were returned from the placement policy" + + " to fully replicate the decommission indexes for container {}." + + " Requested {} received {}", container.getContainerID(), + decomIndexes.size(), selectedDatanodes.size()); + metrics.incrEcPartialReplicationForOutOfServiceReplicasTotal(); + throw new InsufficientDatanodesException(decomIndexes.size(), + selectedDatanodes.size()); } LOG.trace("Sent {} commands for container {}.", commandsSent, container.containerID()); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java index b2696f279dfc..48a7981365f1 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java @@ -462,6 +462,27 @@ public boolean isReconstructionLimitReached() { return limit > 0 && getInflightReconstructionCount() >= limit; } + /** + * Returns true if the given datanode's replication load (queued replication + * and reconstruction commands) exceeds the configured load factor threshold. + * + * @param datanode the datanode to check + * @return true if the node is highly loaded, false otherwise + */ + public boolean isNodeHighlyLoaded(DatanodeDetails datanode) { + try { + int limit = getReplicationLimit(datanode); + if (limit <= 0) { + return true; + } + double loadFactor = (double) getQueuedReplicationCount(datanode) / limit; + return loadFactor >= rmConf.getEcDecommissionReconstructionLoadFactor(); + } catch (NodeNotFoundException e) { + LOG.warn("Node {} not found when checking load factor", datanode, e); + return true; + } + } + /** * Sends delete container command for the given container to the given * datanode. diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java index 5d2af561196b..8a00e486fc47 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java @@ -435,6 +435,56 @@ public void testUnderReplicationWithDecomNodesOverloaded() Lists.emptyList(), availableReplicas, 1, 0, policy)); } + @Test + public void testUnderReplicationWithDecomNodesSwitchToReconstruction() + throws IOException { + replicationManager.getConfig().setEcDecommissionReconstructionEnabled(true); + Set availableReplicas = ReplicationTestUtil + .createReplicas(Pair.of(DECOMMISSIONING, 1), Pair.of(IN_SERVICE, 2), + Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4), + Pair.of(IN_SERVICE, 5)); + + // Mock node 1 as highly loaded + DatanodeDetails decomNode = availableReplicas.stream() + .filter(r -> r.getReplicaIndex() == 1) + .findFirst().get().getDatanodeDetails(); + when(replicationManager.isNodeHighlyLoaded(decomNode)).thenReturn(true); + + ECUnderReplicationHandler ecURH = + new ECUnderReplicationHandler(policy, conf, replicationManager); + UnderReplicatedHealthResult result = + mock(UnderReplicatedHealthResult.class); + when(result.isUnrecoverable()).thenReturn(false); + when(result.getContainerInfo()).thenReturn(container); + + ecURH.processAndSendCommands(availableReplicas, ImmutableList.of(), + result, remainingMaintenanceRedundancy); + + // We expect 1 reconstruction command for index 1, and 0 replicate commands + int replicateCommand = 0; + int reconstructCommand = 0; + for (Pair> dnCommand : commandsSent) { + if (dnCommand.getValue() instanceof ReplicateContainerCommand) { + replicateCommand++; + } else if (dnCommand.getValue() instanceof ReconstructECContainersCommand) { + reconstructCommand++; + ReconstructECContainersCommand reconCmd = + (ReconstructECContainersCommand) dnCommand.getValue(); + assertEquals(ECUnderReplicationHandler.integers2ByteString( + ImmutableList.of(1)), reconCmd.getMissingContainerIndexes()); + + // verify source offloading: decomNode should NOT be in the source list + // because we have 4 other IN_SERVICE nodes (DATA=3) + for (ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex src : + reconCmd.getSources()) { + assertNotEquals(decomNode, src.getDnDetails()); + } + } + } + assertEquals(0, replicateCommand); + assertEquals(1, reconstructCommand); + } + @Test public void testUnderReplicationWithDecomIndex12() throws IOException { Set availableReplicas = ReplicationTestUtil From b4e2d4fab059cfbc341736b714b7810dc2556f04 Mon Sep 17 00:00:00 2001 From: Wei-Chiu Chuang Date: Tue, 21 Apr 2026 14:13:44 -0700 Subject: [PATCH 3/5] HDDS-15073. [DataNode] Add outbound replication limit per volume Change-Id: Ic4e49c06f798cd4a15a630e742e17debba9b7494 (cherry picked from commit aca4193d5ff5ebd84caa485d215532ab6414dc86) --- .../statemachine/DatanodeStateMachine.java | 8 +++-- .../container/common/volume/HddsVolume.java | 16 ++++++++++ .../OnDemandContainerReplicationSource.java | 29 ++++++++++++++++--- .../replication/ReplicationServer.java | 23 ++++++++++++++- .../TestGrpcReplicationService.java | 3 +- .../datanode/container/ExportSubcommand.java | 4 ++- 6 files changed, 73 insertions(+), 10 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index 2f53178e9bf9..c068c928ca96 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -195,6 +195,9 @@ public DatanodeStateMachine(HddsDatanodeService hddsDatanodeService, } nextHB = new AtomicLong(Time.monotonicNow()); + ReplicationConfig replicationConfig = + conf.getObject(ReplicationConfig.class); + ContainerImporter importer = new ContainerImporter(conf, container.getContainerSet(), container.getController(), @@ -205,15 +208,14 @@ public DatanodeStateMachine(HddsDatanodeService hddsDatanodeService, importer, new SimpleContainerDownloader(conf, certClient)); ContainerReplicator pushReplicator = new PushReplicator(conf, - new OnDemandContainerReplicationSource(container.getController()), + new OnDemandContainerReplicationSource(container.getController(), + replicationConfig), new GrpcContainerUploader(conf, certClient, container.getController()) ); pullReplicatorWithMetrics = new MeasuredReplicator(pullReplicator, "pull"); pushReplicatorWithMetrics = new MeasuredReplicator(pushReplicator, "push"); - ReplicationConfig replicationConfig = - conf.getObject(ReplicationConfig.class); supervisor = ReplicationSupervisor.newBuilder() .stateContext(context) .datanodeConfig(dnConf) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java index 310c46de5294..e71cd4e2f1ef 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java @@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import org.apache.commons.io.FileUtils; @@ -107,6 +108,9 @@ public class HddsVolume extends StorageVolume { private final AtomicBoolean dbLoaded = new AtomicBoolean(false); private final AtomicBoolean dbLoadFailure = new AtomicBoolean(false); + private final AtomicInteger activeOutboundReplications = + new AtomicInteger(0); + /** * Builder for HddsVolume. */ @@ -629,4 +633,16 @@ public void compactDb() { LOG.warn("compact rocksdb error in {}", dbFilePath, e); } } + + public int incActiveOutboundReplications() { + return activeOutboundReplications.incrementAndGet(); + } + + public int decActiveOutboundReplications() { + return activeOutboundReplications.decrementAndGet(); + } + + public int getActiveOutboundReplications() { + return activeOutboundReplications.get(); + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java index 422dd370d1fd..2ff991907f6a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java @@ -17,12 +17,14 @@ package org.apache.hadoop.ozone.container.replication; +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_INTERNAL_ERROR; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_NOT_FOUND; import java.io.IOException; import java.io.OutputStream; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker; import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; @@ -34,10 +36,13 @@ public class OnDemandContainerReplicationSource implements ContainerReplicationSource { private final ContainerController controller; + private final ReplicationServer.ReplicationConfig config; public OnDemandContainerReplicationSource( - ContainerController controller) { + ContainerController controller, + ReplicationServer.ReplicationConfig config) { this.controller = controller; + this.config = config; } @Override @@ -57,8 +62,24 @@ public void copyData(long containerId, OutputStream destination, " is not found.", CONTAINER_NOT_FOUND); } - controller.exportContainer( - container.getContainerType(), containerId, destination, - new TarContainerPacker(compression)); + HddsVolume volume = (HddsVolume) container.getContainerData().getVolume(); + if (volume != null) { + if (volume.getActiveOutboundReplications() >= + config.getVolumeOutboundLimit()) { + throw new StorageContainerException("Volume " + volume.getStorageID() + + " has reached the maximum number of concurrent replication reads (" + + config.getVolumeOutboundLimit() + ")", CONTAINER_INTERNAL_ERROR); + } + volume.incActiveOutboundReplications(); + } + try { + controller.exportContainer( + container.getContainerType(), containerId, destination, + new TarContainerPacker(compression)); + } finally { + if (volume != null) { + volume.decActiveOutboundReplications(); + } + } } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java index 34b5a799548d..d3bb14d8c505 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java @@ -63,6 +63,8 @@ public class ReplicationServer { private ContainerController controller; + private ReplicationConfig replicationConfig; + private int port; private final ContainerImporter importer; @@ -75,6 +77,7 @@ public ReplicationServer(ContainerController controller, this.secConf = secConf; this.caClient = caClient; this.controller = controller; + this.replicationConfig = replicationConfig; this.importer = importer; this.port = replicationConfig.getPort(); @@ -103,7 +106,8 @@ public ReplicationServer(ContainerController controller, public void init() { GrpcReplicationService grpcReplicationService = new GrpcReplicationService( - new OnDemandContainerReplicationSource(controller), importer); + new OnDemandContainerReplicationSource(controller, replicationConfig), + importer); NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forPort(port) .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE) .addService(ServerInterceptors.intercept( @@ -225,10 +229,27 @@ public static final class ReplicationConfig { ) private double outOfServiceFactor = OUTOFSERVICE_FACTOR_DEFAULT; + @Config(key = "hdds.datanode.replication.volume.outbound.limit", + type = ConfigType.INT, + defaultValue = "2", + tags = {DATANODE}, + description = "The maximum number of concurrent replication reads " + + "allowed per physical disk volume." + ) + private int volumeOutboundLimit = 2; + public double getOutOfServiceFactor() { return outOfServiceFactor; } + public int getVolumeOutboundLimit() { + return volumeOutboundLimit; + } + + public void setVolumeOutboundLimit(int limit) { + this.volumeOutboundLimit = limit; + } + public int scaleOutOfServiceLimit(int original) { return (int) Math.ceil(original * outOfServiceFactor); } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java index 8b831fa06466..5ab7ccc63577 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java @@ -191,7 +191,8 @@ public void testDownload() throws IOException { @Test public void testUpload() { ContainerReplicationSource source = - new OnDemandContainerReplicationSource(containerController); + new OnDemandContainerReplicationSource(containerController, + new ReplicationServer.ReplicationConfig()); GrpcContainerUploader uploader = new GrpcContainerUploader(conf, null, containerController); diff --git a/hadoop-ozone/cli-debug/src/main/java/org/apache/hadoop/ozone/debug/datanode/container/ExportSubcommand.java b/hadoop-ozone/cli-debug/src/main/java/org/apache/hadoop/ozone/debug/datanode/container/ExportSubcommand.java index d06633f14408..78f449c7a2a5 100644 --- a/hadoop-ozone/cli-debug/src/main/java/org/apache/hadoop/ozone/debug/datanode/container/ExportSubcommand.java +++ b/hadoop-ozone/cli-debug/src/main/java/org/apache/hadoop/ozone/debug/datanode/container/ExportSubcommand.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.ozone.container.replication.ContainerReplicationSource; import org.apache.hadoop.ozone.container.replication.OnDemandContainerReplicationSource; +import org.apache.hadoop.ozone.container.replication.ReplicationServer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import picocli.CommandLine; @@ -66,7 +67,8 @@ public Void call() throws Exception { parent.loadContainersFromVolumes(); final ContainerReplicationSource replicationSource = - new OnDemandContainerReplicationSource(parent.getController()); + new OnDemandContainerReplicationSource(parent.getController(), + new ReplicationServer.ReplicationConfig()); for (int i = 0; i < containerCount; i++) { replicationSource.prepare(containerId); From 0385c58b77a64404cc6967a0c2921783c4a93b16 Mon Sep 17 00:00:00 2001 From: Wei-Chiu Chuang Date: Thu, 21 May 2026 11:56:24 -0700 Subject: [PATCH 4/5] HDDS-15073. Fix compilation errors and implement replication retry logic. Summary of changes: - Resolved compilation failure in hdds-client by adding missing gRPC metrics and increment methods to XceiverClientMetrics. - Implemented a retry mechanism for container replication when volume-level outbound limits are reached. - Added REPLICATION_LIMIT_REACHED error code to DatanodeClientProtocol.proto. - Updated OnDemandContainerReplicationSource to throw REPLICATION_LIMIT_REACHED when the limit is exceeded. - Updated PushReplicator and DownloadAndImportReplicator to catch the limit-reached error and mark the task as QUEUED. - Modified ReplicationSupervisor to re-enqueue QUEUED tasks with a 1-second delay using a new ScheduledExecutorService. - Enhanced SimpleContainerDownloader to detect RESOURCE_EXHAUSTED gRPC status and propagate it as a replication limit error. - Verified fixes with TestXceiverClientMetricsUnit and existing integration tests in TestDecommissionAndMaintenance. Change-Id: I6b27e1a1c422613001531cf26de1adb746d2266e --- .../scm/GrpcClientMetricsInterceptor.java | 68 +++++++++++++++++++ .../hadoop/hdds/scm/XceiverClientMetrics.java | 12 ++++ .../scm/TestXceiverClientMetricsUnit.java | 46 +++++++++++++ .../replication/AbstractReplicationTask.java | 6 +- .../replication/ContainerDownloader.java | 3 +- .../DownloadAndImportReplicator.java | 14 +++- .../replication/GrpcReplicationService.java | 12 ++++ .../OnDemandContainerReplicationSource.java | 12 +++- .../container/replication/PushReplicator.java | 18 +++-- .../replication/ReplicationSupervisor.java | 18 +++++ .../SimpleContainerDownloader.java | 24 ++++++- .../main/proto/DatanodeClientProtocol.proto | 1 + .../ozoneimpl/TestOzoneContainerWithTLS.java | 4 +- 13 files changed, 225 insertions(+), 13 deletions(-) create mode 100644 hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/GrpcClientMetricsInterceptor.java create mode 100644 hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientMetricsUnit.java diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/GrpcClientMetricsInterceptor.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/GrpcClientMetricsInterceptor.java new file mode 100644 index 000000000000..1e78508d950c --- /dev/null +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/GrpcClientMetricsInterceptor.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm; + +import org.apache.ratis.thirdparty.io.grpc.CallOptions; +import org.apache.ratis.thirdparty.io.grpc.Channel; +import org.apache.ratis.thirdparty.io.grpc.ClientCall; +import org.apache.ratis.thirdparty.io.grpc.ClientInterceptor; +import org.apache.ratis.thirdparty.io.grpc.ForwardingClientCall.SimpleForwardingClientCall; +import org.apache.ratis.thirdparty.io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener; +import org.apache.ratis.thirdparty.io.grpc.Metadata; +import org.apache.ratis.thirdparty.io.grpc.MethodDescriptor; +import org.apache.ratis.thirdparty.io.grpc.Status; + +/** + * Interceptor to capture gRPC level metrics. + */ +public class GrpcClientMetricsInterceptor implements ClientInterceptor { + + private final XceiverClientMetrics metrics; + + public GrpcClientMetricsInterceptor(XceiverClientMetrics metrics) { + this.metrics = metrics; + } + + @Override + public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, + Channel next) { + + return new SimpleForwardingClientCall( + next.newCall(method, callOptions)) { + + @Override + public void start(Listener responseListener, Metadata headers) { + super.start(new SimpleForwardingClientCallListener(responseListener) { + @Override + public void onClose(Status status, Metadata trailers) { + if (!status.isOk()) { + if (status.getCode() == Status.Code.UNAUTHENTICATED) { + metrics.incGrpcAuthenticationFailures(); + } else if (status.getCode() == Status.Code.UNAVAILABLE || + status.getCode() == Status.Code.DEADLINE_EXCEEDED) { + metrics.incGrpcConnectionFailures(); + } + } + super.onClose(status, trailers); + } + }, headers); + } + }; + } +} diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java index 1402ea4de640..fafde16d02cc 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientMetrics.java @@ -48,6 +48,8 @@ public class XceiverClientMetrics implements MetricsSource { private @Metric MutableCounterLong totalOps; private @Metric MutableCounterLong ecReconstructionTotal; private @Metric MutableCounterLong ecReconstructionFailsTotal; + private @Metric MutableCounterLong grpcAuthenticationFailures; + private @Metric MutableCounterLong grpcConnectionFailures; private EnumMap pendingOpsArray; private EnumMap opsArray; private EnumMap containerOpsLatency; @@ -115,6 +117,14 @@ public void incECReconstructionFailsTotal() { ecReconstructionFailsTotal.incr(); } + public void incGrpcAuthenticationFailures() { + grpcAuthenticationFailures.incr(); + } + + public void incGrpcConnectionFailures() { + grpcConnectionFailures.incr(); + } + @VisibleForTesting public long getTotalOpCount() { return totalOps.value(); @@ -144,6 +154,8 @@ public void getMetrics(MetricsCollector collector, boolean b) { totalOps.snapshot(recordBuilder, true); ecReconstructionTotal.snapshot(recordBuilder, true); ecReconstructionFailsTotal.snapshot(recordBuilder, true); + grpcAuthenticationFailures.snapshot(recordBuilder, true); + grpcConnectionFailures.snapshot(recordBuilder, true); for (ContainerProtos.Type type : ContainerProtos.Type.values()) { pendingOpsArray.get(type).snapshot(recordBuilder, b); diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientMetricsUnit.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientMetricsUnit.java new file mode 100644 index 000000000000..e77dc4b567e0 --- /dev/null +++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/TestXceiverClientMetricsUnit.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm; + +import static org.apache.ozone.test.MetricsAsserts.assertCounter; +import static org.apache.ozone.test.MetricsAsserts.getMetrics; + +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.junit.jupiter.api.Test; + +/** + * Unit test for XceiverClientMetrics. + */ +public class TestXceiverClientMetricsUnit { + + @Test + public void testGrpcFailures() { + XceiverClientMetrics metrics = XceiverClientMetrics.create(); + try { + metrics.incGrpcAuthenticationFailures(); + metrics.incGrpcConnectionFailures(); + metrics.incGrpcConnectionFailures(); + + MetricsRecordBuilder recordBuilder = getMetrics(XceiverClientMetrics.SOURCE_NAME); + assertCounter("GrpcAuthenticationFailures", 1L, recordBuilder); + assertCounter("GrpcConnectionFailures", 2L, recordBuilder); + } finally { + metrics.unRegister(); + } + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/AbstractReplicationTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/AbstractReplicationTask.java index 05932e6edf79..d7dc9078b5f7 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/AbstractReplicationTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/AbstractReplicationTask.java @@ -34,7 +34,7 @@ public abstract class AbstractReplicationTask { private final long containerId; - private final Instant queued; + private Instant queued; private final long deadlineMsSinceEpoch; @@ -78,6 +78,10 @@ public Instant getQueued() { return queued; } + public void updateQueuedTime() { + this.queued = Instant.now(); + } + public long getTerm() { return term; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerDownloader.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerDownloader.java index 28879ffde18d..535aced9ed86 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerDownloader.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerDownloader.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.container.replication; import java.io.Closeable; +import java.io.IOException; import java.nio.file.Path; import java.util.List; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -34,6 +35,6 @@ public interface ContainerDownloader extends Closeable { Path getContainerDataFromReplicas(long containerId, List sources, Path downloadDir, - CopyContainerCompression compression); + CopyContainerCompression compression) throws IOException; } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java index 2457b592b141..491086b27516 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java @@ -23,6 +23,8 @@ import java.util.List; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.replication.AbstractReplicationTask.Status; @@ -96,8 +98,16 @@ public void replicate(ReplicationTask task) { LOG.info("Container {} is replicated successfully", containerID); task.setStatus(Status.DONE); } catch (IOException e) { - LOG.error("Container {} replication was unsuccessful.", containerID, e); - task.setStatus(Status.FAILED); + if (e instanceof StorageContainerException && + ((StorageContainerException) e).getResult() == + ContainerProtos.Result.REPLICATION_LIMIT_REACHED) { + LOG.info("Container {} replication will be retried as the " + + "replication limit was reached.", containerID); + task.setStatus(Status.QUEUED); + } else { + LOG.error("Container {} replication was unsuccessful.", containerID, e); + task.setStatus(Status.FAILED); + } } finally { if (targetVolume != null) { targetVolume.incCommittedBytes(-containerImporter.getDefaultReplicationSpace()); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java index 10cba29845f3..dd6ba222ac29 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java @@ -17,6 +17,7 @@ package org.apache.hadoop.ozone.container.replication; +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.REPLICATION_LIMIT_REACHED; import static org.apache.hadoop.hdds.protocol.datanode.proto.IntraDatanodeProtocolServiceGrpc.getDownloadMethod; import static org.apache.hadoop.hdds.protocol.datanode.proto.IntraDatanodeProtocolServiceGrpc.getUploadMethod; import static org.apache.hadoop.ozone.container.replication.CopyContainerCompression.fromProto; @@ -30,12 +31,14 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.SendContainerRequest; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.SendContainerResponse; import org.apache.hadoop.hdds.protocol.datanode.proto.IntraDatanodeProtocolServiceGrpc; +import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.ratis.grpc.util.ZeroCopyMessageMarshaller; import org.apache.ratis.thirdparty.com.google.protobuf.MessageLite; import org.apache.ratis.thirdparty.io.grpc.MethodDescriptor; import org.apache.ratis.thirdparty.io.grpc.ServerCallHandler; import org.apache.ratis.thirdparty.io.grpc.ServerServiceDefinition; +import org.apache.ratis.thirdparty.io.grpc.Status; import org.apache.ratis.thirdparty.io.grpc.stub.CallStreamObserver; import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; import org.slf4j.Logger; @@ -132,6 +135,15 @@ public void download(CopyContainerRequestProto request, (CallStreamObserver) responseObserver, containerID, BUFFER_SIZE); source.copyData(containerID, outputStream, compression); + } catch (StorageContainerException e) { + if (e.getResult() == REPLICATION_LIMIT_REACHED) { + responseObserver.onError(Status.RESOURCE_EXHAUSTED + .withDescription(e.getMessage()) + .asRuntimeException()); + } else { + LOG.warn("Error streaming container {}", containerID, e); + responseObserver.onError(e); + } } catch (IOException e) { LOG.warn("Error streaming container {}", containerID, e); responseObserver.onError(e); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java index 2ff991907f6a..f7013bf01360 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/OnDemandContainerReplicationSource.java @@ -17,8 +17,8 @@ package org.apache.hadoop.ozone.container.replication; -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_INTERNAL_ERROR; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_NOT_FOUND; +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.REPLICATION_LIMIT_REACHED; import java.io.IOException; import java.io.OutputStream; @@ -27,6 +27,8 @@ import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker; import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A naive implementation of the replication source which creates a tar file @@ -35,6 +37,9 @@ public class OnDemandContainerReplicationSource implements ContainerReplicationSource { + private static final Logger LOG = + LoggerFactory.getLogger(OnDemandContainerReplicationSource.class); + private final ContainerController controller; private final ReplicationServer.ReplicationConfig config; @@ -66,9 +71,12 @@ public void copyData(long containerId, OutputStream destination, if (volume != null) { if (volume.getActiveOutboundReplications() >= config.getVolumeOutboundLimit()) { + LOG.info("Volume {} has reached the maximum number of concurrent " + + "replication reads ({})", volume.getStorageID(), + config.getVolumeOutboundLimit()); throw new StorageContainerException("Volume " + volume.getStorageID() + " has reached the maximum number of concurrent replication reads (" - + config.getVolumeOutboundLimit() + ")", CONTAINER_INTERNAL_ERROR); + + config.getVolumeOutboundLimit() + ")", REPLICATION_LIMIT_REACHED); } volume.incActiveOutboundReplications(); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/PushReplicator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/PushReplicator.java index 759aff722baf..faac1d9e86e5 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/PushReplicator.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/PushReplicator.java @@ -21,6 +21,8 @@ import org.apache.commons.io.output.CountingOutputStream; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.utils.IOUtils; import org.apache.hadoop.ozone.container.replication.AbstractReplicationTask.Status; import org.slf4j.Logger; @@ -68,11 +70,19 @@ public void replicate(ReplicationTask task) { task.setTransferredBytes(output.getByteCount()); task.setStatus(Status.DONE); } catch (Exception e) { - LOG.warn("Container {} replication was unsuccessful.", containerID, e); - if (output != null) { - task.setTransferredBytes(output.getByteCount()); + if (e instanceof StorageContainerException && + ((StorageContainerException) e).getResult() == + ContainerProtos.Result.REPLICATION_LIMIT_REACHED) { + LOG.info("Container {} replication will be retried as the " + + "replication limit was reached.", containerID); + task.setStatus(Status.QUEUED); + } else { + LOG.warn("Container {} replication was unsuccessful.", containerID, e); + if (output != null) { + task.setTransferredBytes(output.getByteCount()); + } + task.setStatus(Status.FAILED); } - task.setStatus(Status.FAILED); } finally { // output may have already been closed, ignore such errors IOUtils.cleanupWithLogger(LOG, output); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java index 8dee840db226..52a25409cc2e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java @@ -34,7 +34,9 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -70,6 +72,7 @@ public final class ReplicationSupervisor { .thenComparing(TaskRunner::getTaskQueueTime); private final ExecutorService executor; + private final ScheduledExecutorService scheduler; private final StateContext context; private final Clock clock; @@ -210,6 +213,11 @@ private ReplicationSupervisor(StateContext context, ExecutorService executor, this.inFlight = ConcurrentHashMap.newKeySet(); this.context = context; this.executor = executor; + this.scheduler = Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("ReplicationSupervisor-Scheduler") + .build()); this.replicationConfig = replicationConfig; this.datanodeConfig = datanodeConfig; maxQueueSize = datanodeConfig.getCommandQueueLimit(); @@ -293,12 +301,18 @@ private void decrementTaskCounter(AbstractReplicationTask task) { @VisibleForTesting public void shutdownAfterFinish() throws InterruptedException { + scheduler.shutdown(); + scheduler.awaitTermination(1L, TimeUnit.DAYS); executor.shutdown(); executor.awaitTermination(1L, TimeUnit.DAYS); } public void stop() { try { + scheduler.shutdown(); + if (!scheduler.awaitTermination(3, TimeUnit.SECONDS)) { + scheduler.shutdownNow(); + } executor.shutdown(); if (!executor.awaitTermination(3, TimeUnit.SECONDS)) { executor.shutdownNow(); @@ -428,6 +442,10 @@ public void run() { opsLatencyMs.get(task.getMetricName()).add(Time.monotonicNow() - startTime); inFlight.remove(task); decrementTaskCounter(task); + if (task.getStatus() == Status.QUEUED) { + task.updateQueuedTime(); + scheduler.schedule(() -> addTask(task), 1, TimeUnit.SECONDS); + } } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java index 145d63680c23..b5dfb265fb46 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SimpleContainerDownloader.java @@ -25,12 +25,17 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.DatanodeDetails.Port.Name; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.hdds.security.SecurityConfig; import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient; import org.apache.hadoop.hdds.utils.IOUtils; +import org.apache.ratis.thirdparty.io.grpc.Status; +import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,7 +63,8 @@ public SimpleContainerDownloader( @Override public Path getContainerDataFromReplicas( long containerId, List sourceDatanodes, - Path downloadDir, CopyContainerCompression compression) { + Path downloadDir, CopyContainerCompression compression) + throws IOException { if (downloadDir == null) { downloadDir = Paths.get(System.getProperty("java.io.tmpdir")) @@ -68,6 +74,7 @@ public Path getContainerDataFromReplicas( final List shuffledDatanodes = shuffleDatanodes(sourceDatanodes); + int resourceExhaustedCount = 0; for (int i = 0; i < shuffledDatanodes.size(); i++) { DatanodeDetails datanode = shuffledDatanodes.get(i); GrpcReplicationClient client = null; @@ -79,12 +86,27 @@ public Path getContainerDataFromReplicas( } catch (InterruptedException e) { logError(e, containerId, datanode, i, shuffledDatanodes.size()); Thread.currentThread().interrupt(); + throw new IOException("Interrupted during container download", e); + } catch (ExecutionException e) { + if (e.getCause() instanceof StatusRuntimeException && + ((StatusRuntimeException) e.getCause()).getStatus().getCode() == + Status.Code.RESOURCE_EXHAUSTED) { + resourceExhaustedCount++; + } + logError(e, containerId, datanode, i, shuffledDatanodes.size()); } catch (Exception e) { logError(e, containerId, datanode, i, shuffledDatanodes.size()); } finally { IOUtils.close(LOG, client); } } + + if (resourceExhaustedCount > 0) { + throw new StorageContainerException("All sources are busy or failed " + + "for container " + containerId, + ContainerProtos.Result.REPLICATION_LIMIT_REACHED); + } + LOG.error("Container {} could not be downloaded from any datanode", containerId); return null; diff --git a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto index 05c94624c990..bc6e4f7e250f 100644 --- a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto +++ b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto @@ -139,6 +139,7 @@ enum Result { IMPORT_CONTAINER_METADATA_FAILED = 46; BLOCK_ALREADY_FINALIZED = 47; CONTAINER_ID_MISMATCH = 48; + REPLICATION_LIMIT_REACHED = 49; } /** diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java index af42ce3b7527..641f9769cd79 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java @@ -398,7 +398,7 @@ private OzoneContainer createAndStartOzoneContainerInstance() { } private void assertDownloadContainerFails(long containerId, - List sourceDatanodes) { + List sourceDatanodes) throws IOException { LogCapturer logCapture = captureLogs(SimpleContainerDownloader.class); SimpleContainerDownloader downloader = new SimpleContainerDownloader(conf, caClient); @@ -411,7 +411,7 @@ private void assertDownloadContainerFails(long containerId, } private void assertDownloadContainerWorks(List containers, - List sourceDatanodes) { + List sourceDatanodes) throws IOException { for (Long cId : containers) { SimpleContainerDownloader downloader = new SimpleContainerDownloader(conf, caClient); From e90008a9c2662e2e4ab6cfeed6798c0b03ccce10 Mon Sep 17 00:00:00 2001 From: Wei-Chiu Chuang Date: Tue, 23 Jun 2026 13:18:31 -0700 Subject: [PATCH 5/5] HDDS-15412. [DataNode] Disk volume-specific container replication thread pool. Change-Id: I1221186f92020585ad5107829660a635a4366ffd --- .../statemachine/DatanodeStateMachine.java | 20 +++ .../replication/AbstractReplicationTask.java | 11 ++ .../replication/ContainerImporter.java | 2 +- .../DownloadAndImportReplicator.java | 10 +- .../replication/ReplicationSupervisor.java | 135 +++++++++++++++++- .../replication/ReplicationTask.java | 2 +- .../TestReplicationSupervisor.java | 78 ++++++++++ 7 files changed, 246 insertions(+), 12 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index c068c928ca96..73a4f5793ea1 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -52,6 +52,7 @@ import org.apache.hadoop.ozone.HddsDatanodeStopService; import org.apache.hadoop.ozone.container.checksum.DNContainerOperationClient; import org.apache.hadoop.ozone.container.common.DatanodeLayoutStorage; +import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy; import org.apache.hadoop.ozone.container.common.report.ReportManager; import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.CloseContainerCommandHandler; @@ -66,6 +67,7 @@ import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.RefreshVolumeUsageCommandHandler; import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ReplicateContainerCommandHandler; import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.SetNodeOperationalStateCommandHandler; +import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.common.volume.VolumeChoosingPolicyFactory; import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionCoordinator; import org.apache.hadoop.ozone.container.ec.reconstruction.ECReconstructionMetrics; @@ -80,6 +82,7 @@ import org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig; import org.apache.hadoop.ozone.container.replication.ReplicationSupervisor; import org.apache.hadoop.ozone.container.replication.ReplicationSupervisorMetrics; +import org.apache.hadoop.ozone.container.replication.ReplicationTask; import org.apache.hadoop.ozone.container.replication.SimpleContainerDownloader; import org.apache.hadoop.ozone.container.upgrade.DataNodeUpgradeFinalizer; import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures; @@ -221,6 +224,23 @@ public DatanodeStateMachine(HddsDatanodeService hddsDatanodeService, .datanodeConfig(dnConf) .replicationConfig(replicationConfig) .clock(clock) + .volumeChooser(task -> { + if (task instanceof ReplicationTask) { + ReplicationTask repTask = (ReplicationTask) task; + if (repTask.getTarget() == null) { + try { + return importer.chooseNextVolume(importer.getDefaultReplicationSpace()); + } catch (IOException e) { + LOG.error("Failed to choose volume for replication task " + task, e); + return null; + } + } else { + Container localContainer = getContainer().getContainerSet().getContainer(task.getContainerId()); + return localContainer != null ? (HddsVolume) localContainer.getContainerData().getVolume() : null; + } + } + return null; + }) .build(); replicationSupervisorMetrics = diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/AbstractReplicationTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/AbstractReplicationTask.java index d7dc9078b5f7..6a55c93d72e8 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/AbstractReplicationTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/AbstractReplicationTask.java @@ -23,6 +23,7 @@ import java.time.Instant; import java.time.ZoneId; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ReplicationCommandPriority; +import org.apache.hadoop.ozone.container.common.volume.HddsVolume; /** * Abstract class to capture common variables and methods for different types @@ -44,6 +45,8 @@ public abstract class AbstractReplicationTask { private boolean shouldOnlyRunOnInServiceDatanodes = true; + private HddsVolume volume; + protected AbstractReplicationTask(long containerID, long deadlineMsSinceEpoch, long term) { this(containerID, deadlineMsSinceEpoch, term, @@ -156,6 +159,14 @@ public String toString() { return sb.toString(); } + public HddsVolume getVolume() { + return volume; + } + + public void setVolume(HddsVolume volume) { + this.volume = volume; + } + /** * ENUM representing the different status values a replication task can * have. diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java index 7b42006b2293..ff57dc8d43f8 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ContainerImporter.java @@ -146,7 +146,7 @@ private static void deleteFileQuietely(Path tarFilePath) { } } - HddsVolume chooseNextVolume(long spaceToReserve) throws IOException { + public HddsVolume chooseNextVolume(long spaceToReserve) throws IOException { // Choose volume that can hold both container in tmp and dest directory LOG.debug("Choosing volume to reserve space : {}", spaceToReserve); return volumeChoosingPolicy.chooseVolume( diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java index 491086b27516..769b49afec71 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java @@ -72,11 +72,13 @@ public void replicate(ReplicationTask task) { LOG.info("Starting replication of container {} from {} using {}", containerID, sourceDatanodes, compression); - HddsVolume targetVolume = null; - + HddsVolume targetVolume = task.getVolume(); try { - targetVolume = containerImporter.chooseNextVolume( - containerImporter.getDefaultReplicationSpace()); + if (targetVolume == null) { + targetVolume = containerImporter.chooseNextVolume( + containerImporter.getDefaultReplicationSpace()); + task.setVolume(targetVolume); + } // Wait for the download. This thread pool is limiting the parallel // downloads, so it's ok to block here and wait for the full download. diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java index 7fc0edb684f3..f477b6ee71e6 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java @@ -28,6 +28,8 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.OptionalLong; @@ -43,6 +45,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import java.util.function.IntConsumer; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -53,6 +56,10 @@ import org.apache.hadoop.metrics2.lib.MutableRate; import org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.common.volume.HddsVolume; +import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; +import org.apache.hadoop.ozone.container.common.volume.StorageVolume; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.container.replication.AbstractReplicationTask.Status; import org.apache.hadoop.ozone.container.replication.ReplicationServer.ReplicationConfig; import org.apache.hadoop.util.Time; @@ -71,6 +78,10 @@ public final class ReplicationSupervisor { Comparator.comparing(TaskRunner::getTaskPriority) .thenComparing(TaskRunner::getTaskQueueTime); + private final Function volumeChooser; + private final Map volumeExecutors = new ConcurrentHashMap<>(); + private volatile int currentThreadCount; + private final ExecutorService executor; private final ScheduledExecutorService scheduler; private final StateContext context; @@ -121,6 +132,13 @@ public static class Builder { private Clock clock; private IntConsumer executorThreadUpdater = threadCount -> { }; + private Function volumeChooser; + + public Builder volumeChooser( + Function newVolumeChooser) { + volumeChooser = newVolumeChooser; + return this; + } public Builder clock(Clock newClock) { clock = newClock; @@ -195,7 +213,7 @@ public ReplicationSupervisor build() { } return new ReplicationSupervisor(context, executor, replicationConfig, - datanodeConfig, clock, executorThreadUpdater); + datanodeConfig, clock, executorThreadUpdater, volumeChooser); } } @@ -209,7 +227,8 @@ public static Map getMetricsMap() { private ReplicationSupervisor(StateContext context, ExecutorService executor, ReplicationConfig replicationConfig, DatanodeConfiguration datanodeConfig, - Clock clock, IntConsumer executorThreadUpdater) { + Clock clock, IntConsumer executorThreadUpdater, + Function volumeChooser) { this.inFlight = ConcurrentHashMap.newKeySet(); this.context = context; this.executor = executor; @@ -223,6 +242,8 @@ private ReplicationSupervisor(StateContext context, ExecutorService executor, maxQueueSize = datanodeConfig.getCommandQueueLimit(); this.clock = clock; this.executorThreadUpdater = executorThreadUpdater; + this.volumeChooser = volumeChooser; + this.currentThreadCount = replicationConfig.getReplicationMaxStreams(); // set initial state if (context != null) { @@ -240,6 +261,9 @@ private ReplicationSupervisor(StateContext context, ExecutorService executor, public void addTask(AbstractReplicationTask task) { if (queueHasRoomFor(task)) { initCounters(task); + if (task.getVolume() == null && volumeChooser != null) { + task.setVolume(volumeChooser.apply(task)); + } addToQueue(task); } } @@ -283,7 +307,70 @@ private void addToQueue(AbstractReplicationTask task) { k -> new AtomicInteger()).incrementAndGet(); } queuedCounter.get(task.getMetricName()).incrementAndGet(); - executor.execute(new TaskRunner(task)); + getExecutorForTask(task).execute(new TaskRunner(task)); + } + } + + private ExecutorService getExecutorForTask(AbstractReplicationTask task) { + cleanupFailedVolumeExecutors(); + HddsVolume volume = task.getVolume(); + if (volume != null) { + return getOrCreateVolumeExecutor(volume); + } + return executor; + } + + private synchronized ThreadPoolExecutor getOrCreateVolumeExecutor(HddsVolume volume) { + return volumeExecutors.computeIfAbsent(volume, v -> { + String threadNamePrefix = context != null ? context.getThreadNamePrefix() : ""; + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat(threadNamePrefix + "ContainerReplicationThread-" + v.getStorageID() + "-%d") + .build(); + ThreadPoolExecutor tpe = new ThreadPoolExecutor( + currentThreadCount, + currentThreadCount, + 60, TimeUnit.SECONDS, + new PriorityBlockingQueue<>(), + threadFactory); + LOG.info("Created replication executor for volume {} with size {}", v.getStorageID(), currentThreadCount); + return tpe; + }); + } + + private synchronized void cleanupFailedVolumeExecutors() { + if (context == null) { + return; + } + OzoneContainer container = context.getParent().getContainer(); + if (container == null) { + return; + } + MutableVolumeSet volumeSet = container.getVolumeSet(); + if (volumeSet == null) { + return; + } + + List healthyVolumes = volumeSet.getVolumesList(); + Iterator> it = volumeExecutors.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry entry = it.next(); + HddsVolume volume = entry.getKey(); + if (!healthyVolumes.contains(volume)) { + LOG.info("Volume {} is no longer healthy/present. Shutting down its replication thread pool.", + volume.getStorageID()); + ThreadPoolExecutor executorToShutdown = entry.getValue(); + executorToShutdown.shutdown(); + try { + if (!executorToShutdown.awaitTermination(3, TimeUnit.SECONDS)) { + executorToShutdown.shutdownNow(); + } + } catch (InterruptedException e) { + executorToShutdown.shutdownNow(); + Thread.currentThread().interrupt(); + } + it.remove(); + } } } @@ -303,6 +390,13 @@ private void decrementTaskCounter(AbstractReplicationTask task) { public void shutdownAfterFinish() throws InterruptedException { scheduler.shutdown(); scheduler.awaitTermination(1L, TimeUnit.DAYS); + for (ThreadPoolExecutor tpe : volumeExecutors.values()) { + tpe.shutdown(); + } + for (ThreadPoolExecutor tpe : volumeExecutors.values()) { + tpe.awaitTermination(1L, TimeUnit.DAYS); + } + volumeExecutors.clear(); executor.shutdown(); executor.awaitTermination(1L, TimeUnit.DAYS); } @@ -313,6 +407,20 @@ public void stop() { if (!scheduler.awaitTermination(3, TimeUnit.SECONDS)) { scheduler.shutdownNow(); } + for (ThreadPoolExecutor tpe : volumeExecutors.values()) { + tpe.shutdown(); + } + for (ThreadPoolExecutor tpe : volumeExecutors.values()) { + try { + if (!tpe.awaitTermination(3, TimeUnit.SECONDS)) { + tpe.shutdownNow(); + } + } catch (InterruptedException e) { + tpe.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + volumeExecutors.clear(); executor.shutdown(); if (!executor.awaitTermination(3, TimeUnit.SECONDS)) { executor.shutdownNow(); @@ -384,7 +492,19 @@ private void resize(HddsProtos.NodeOperationalState nodeState) { newMaxQueueSize); maxQueueSize = newMaxQueueSize; + currentThreadCount = threadCount; executorThreadUpdater.accept(threadCount); + for (Map.Entry entry : volumeExecutors.entrySet()) { + ThreadPoolExecutor tpe = entry.getValue(); + if (threadCount < tpe.getCorePoolSize()) { + tpe.setCorePoolSize(threadCount); + tpe.setMaximumPoolSize(threadCount); + } else { + tpe.setMaximumPoolSize(threadCount); + tpe.setCorePoolSize(threadCount); + } + LOG.info("Scaled replication executor for volume {} to size {}", entry.getKey().getStorageID(), threadCount); + } } /** @@ -505,11 +625,14 @@ public long getReplicationRequestCount(String metricsName) { } public long getQueueSize() { + long size = 0; if (executor instanceof ThreadPoolExecutor) { - return ((ThreadPoolExecutor)executor).getQueue().size(); - } else { - return 0; + size += ((ThreadPoolExecutor) executor).getQueue().size(); + } + for (ThreadPoolExecutor tpe : volumeExecutors.values()) { + size += tpe.getQueue().size(); } + return size; } public long getMaxReplicationStreams() { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java index a32e9b41ab1b..694a1794f587 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationTask.java @@ -125,7 +125,7 @@ public void setTransferredBytes(long transferredBytes) { this.transferredBytes = transferredBytes; } - DatanodeDetails getTarget() { + public DatanodeDetails getTarget() { return cmd.getTargetDatanode(); } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java index a8b590e671e8..cb9e5df96bbb 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java @@ -32,6 +32,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyList; @@ -56,6 +58,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.SortedMap; import java.util.UUID; import java.util.concurrent.AbstractExecutorService; @@ -109,6 +112,7 @@ import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.protocol.commands.ReconcileContainerCommand; import org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand; import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; @@ -1226,4 +1230,78 @@ private void scheduleTasks( rs.addTask(new ReplicationTask(fromSources(i, sources), noopReplicator)); } } + + @ContainerLayoutTestInfo.ContainerTest + public void testVolumeSpecificThreadPoolAndCleanup(ContainerLayoutVersion layout) throws Exception { + this.layoutVersion = layout; + replicatorRef.set(slowReplicator); + DatanodeStateMachine stateMachine = context.getParent(); + OzoneContainer ozoneContainer = mock(OzoneContainer.class); + MutableVolumeSet volumeSet = mock(MutableVolumeSet.class); + when(stateMachine.getContainer()).thenReturn(ozoneContainer); + when(ozoneContainer.getVolumeSet()).thenReturn(volumeSet); + + HddsVolume vol1 = mock(HddsVolume.class); + when(vol1.getStorageID()).thenReturn("vol-1"); + HddsVolume vol2 = mock(HddsVolume.class); + when(vol2.getStorageID()).thenReturn("vol-2"); + + List healthyVolumes = new ArrayList<>(); + healthyVolumes.add(vol1); + healthyVolumes.add(vol2); + when(volumeSet.getVolumesList()).thenReturn(healthyVolumes); + + // Build supervisor with volume chooser + ReplicationSupervisor supervisor = ReplicationSupervisor.newBuilder() + .stateContext(context) + .clock(clock) + .volumeChooser(task -> { + if (task.getContainerId() == 1L || task.getContainerId() == 4L) { + return vol1; + } else if (task.getContainerId() == 2L) { + return vol2; + } + return null; // Fallback + }) + .build(); + + supervisor.setReplicationMaxStreams(1); + + try { + AbstractReplicationTask task1 = createTask(1L); + AbstractReplicationTask task2 = createTask(2L); + AbstractReplicationTask task3 = createTask(3L); // No volume + + supervisor.addTask(task1); + supervisor.addTask(task2); + supervisor.addTask(task3); + + // Verify tasks got correct volume assigned + assertEquals(vol1, task1.getVolume()); + assertEquals(vol2, task2.getVolume()); + assertNull(task3.getVolume()); + + // Now, simulate vol2 failure (remove it from healthyVolumes) + healthyVolumes.remove(vol2); + + // Run cleanup and queue task4 on vol1 (which already has task1 running, so task4 will queue) + AbstractReplicationTask task4 = createTask(4L); + supervisor.addTask(task4); + + // Since max streams is 1, task1 is executing and task4 is in vol1's executor queue. + // We should see a queue size of at least 1 (potentially more depending on fallback executor state). + assertTrue(supervisor.getQueueSize() >= 1); + + // Thread pool for vol2 should be shut down, while vol1 remains active + java.lang.reflect.Field field = ReplicationSupervisor.class.getDeclaredField("volumeExecutors"); + field.setAccessible(true); + Map volumeExecutors = (Map) field.get(supervisor); + + assertTrue(volumeExecutors.containsKey(vol1)); + assertFalse(volumeExecutors.containsKey(vol2)); + assertEquals(1, volumeExecutors.size()); + } finally { + supervisor.stop(); + } + } }