Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion cli/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,21 @@ impl Node {
};
let queue = queue_factory.create()?;
let message_queue_adapter = MessageQueueAdapterStdImpl::new(queue);
message_queue_adapter.recover_after_restart(&mc_state)?;

if let Some(last_committed_block_id) =
message_queue_adapter.get_last_committed_mc_block_id()?
{
anyhow::ensure!(
last_committed_block_id == *last_block_id
|| last_committed_block_id.seqno < last_block_id.seqno,
"we commit messages queue only after master block was accepted",
);
}

// We should clear uncommitted queue state because it may contain incorrect diffs
// that were created before node restart. Or applied but not committed diffs.
// We will restore queue strictly above last committed state for applied blocks
message_queue_adapter.clear_uncommitted_state(&mc_state.get_top_shards()?)?;

let slasher = tycho_slasher::Slasher::new(
base.keypair.clone(),
Expand Down
89 changes: 0 additions & 89 deletions collator/src/internal_queue/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,6 @@ where
partitions: &FastHashSet<QueuePartitionIdx>,
) -> Result<()>;

/// Roll back commit pointers to the specified top blocks.
fn rollback_commit_pointers(
&self,
to_mc_block_id: &BlockId,
to_top_blocks: &[TopBlockIdUpdated],
) -> Result<Vec<ShardIdent>>;

/// Remove all data in uncommitted zone
fn clear_uncommitted_state(
&self,
Expand Down Expand Up @@ -441,88 +434,6 @@ where
Ok(())
}

fn rollback_commit_pointers(
&self,
to_mc_block_id: &BlockId,
to_top_blocks: &[TopBlockIdUpdated],
) -> Result<Vec<ShardIdent>> {
let _global_write_guard = self.global_lock.write();

let old_commit_pointers = self.state.get_commit_pointers()?;
let mut new_commit_pointers = FastHashMap::default();

for item in to_top_blocks {
let block_id = &item.block.block_id;

let diff = self
.state
.get_diff_info(&block_id.shard, block_id.seqno, DiffZone::Both)?;

let diff = match diff {
None if item.updated && item.block.ref_by_mc_seqno > self.zerostate_id.seqno => {
bail!(
"Diff not found for block_id: {} ref {} zerostate {} during commit pointers rollback",
block_id,
item.block.ref_by_mc_seqno,
self.zerostate_id.seqno
)
}
None if !item.updated => {
if let Some(old_pointer) = old_commit_pointers.get(&block_id.shard) {
if old_pointer.seqno != block_id.seqno {
bail!(
"Cannot roll back commit pointer for unchanged shard {} to seqno {}: \
target diff is missing and old pointer seqno is {}",
block_id.shard,
block_id.seqno,
old_pointer.seqno,
);
}
if new_commit_pointers
.insert(block_id.shard, (old_pointer.queue_key, old_pointer.seqno))
.is_some()
{
bail!(
"Duplicate shard in rollback_commit_pointers: {}",
block_id.shard
);
}
}
continue;
}
None => continue,
Some(diff) => diff,
};

if new_commit_pointers
.insert(block_id.shard, (diff.max_message, diff.seqno))
.is_some()
{
bail!(
"Duplicate shard in rollback_commit_pointers: {}",
block_id.shard
);
}
}

let removed_commit_pointer_shards: Vec<_> = old_commit_pointers
.keys()
.filter(|shard_id| !new_commit_pointers.contains_key(shard_id))
.copied()
.collect();

tracing::debug!(target: tracing_targets::MQ,
?new_commit_pointers,
?removed_commit_pointer_shards,
"rollback_commit_pointers",
);

self.state
.replace_commit_pointers(new_commit_pointers, to_mc_block_id)?;

Ok(removed_commit_pointer_shards)
}

fn clear_uncommitted_state(
&self,
partitions: &FastHashSet<QueuePartitionIdx>,
Expand Down
29 changes: 3 additions & 26 deletions collator/src/internal_queue/state/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,6 @@ pub trait QueueState<V: InternalMessageValue>: Send + Sync {
mc_block_id: &BlockId,
) -> Result<()>;

/// Replace commit pointers and last committed mc block id.
/// ATTENTION! Overrides old value without checks. Should validate the new value in the calling code.
fn replace_commit_pointers(
&self,
commit_pointers: FastHashMap<ShardIdent, (QueueKey, u32)>,
mc_block_id: &BlockId,
) -> Result<()>;

/// Load statistics for given partition and ranges
fn load_diff_statistics(
&self,
Expand Down Expand Up @@ -219,18 +211,8 @@ impl<V: InternalMessageValue> QueueState<V> for QueueStateStdImpl {
) -> Result<()> {
let mut tx = self.storage.begin_transaction();
tx.commit_messages(commit_pointers)?;
self.write_commit_transaction(tx, mc_block_id)
}

fn replace_commit_pointers(
&self,
commit_pointers: FastHashMap<ShardIdent, (QueueKey, u32)>,
mc_block_id: &BlockId,
) -> Result<()> {
let old_commit_pointers = self.storage.make_snapshot().read_commit_pointers()?;
let mut tx = self.storage.begin_transaction();
tx.replace_commit_pointers(&old_commit_pointers, commit_pointers)?;
self.write_commit_transaction(tx, mc_block_id)
tx.set_last_committed_mc_block_id(mc_block_id)?;
self.write_transaction(tx)
}

fn load_diff_statistics(
Expand Down Expand Up @@ -396,12 +378,7 @@ impl<V: InternalMessageValue> QueueState<V> for QueueStateStdImpl {
}

impl QueueStateStdImpl {
fn write_commit_transaction(
&self,
mut tx: InternalQueueTransaction,
mc_block_id: &BlockId,
) -> Result<()> {
tx.set_last_committed_mc_block_id(mc_block_id)?;
fn write_transaction(&self, tx: InternalQueueTransaction) -> Result<()> {
tx.write()?;
let db = self.storage.db();
db.rocksdb()
Expand Down
46 changes: 32 additions & 14 deletions collator/src/manager/commit.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use anyhow::Result;
use tycho_block_util::queue::QueuePartitionIdx;
use tycho_types::models::BlockId;
use tycho_util::FastHashSet;
use tycho_util::metrics::HistogramGuard;

use super::CollationManager;
Expand Down Expand Up @@ -75,6 +77,9 @@ where
let to_blocks_keys = master_block.get_top_blocks_keys()?;
self.blocks_cache.set_gc_to_boundary(&to_blocks_keys);

// check if mc block was not sent to sync
let mut mc_block_was_not_sent_to_sync = true;

// send to sync only if was not received from bc
if matches!(&master_block.data, BlockCacheEntryData::Collated {
received_after_collation: false,
Expand All @@ -83,10 +88,11 @@ where
let histogram =
HistogramGuard::begin("tycho_collator_send_blocks_to_sync_time");

self.send_block_to_sync(master_block.data)?;
mc_block_was_not_sent_to_sync =
!self.send_block_to_sync(master_block.data, Some(partitions.clone()))?;

for shard_block in shard_blocks {
self.send_block_to_sync(shard_block.data)?;
self.send_block_to_sync(shard_block.data, None)?;
}

sync_elapsed = histogram.finish();
Expand All @@ -107,15 +113,21 @@ where
top_processed_to_anchor_to_notify = master_block.top_processed_to_anchor;
}

let _histogram =
HistogramGuard::begin("tycho_collator_send_blocks_to_sync_commit_diffs_time");
// if mc block was not sent to sync by any reason then
// we will not receive OwnBlockApplied event,
// so we have to commit messages queue right now
if mc_block_was_not_sent_to_sync {
let _histogram = HistogramGuard::begin(
"tycho_collator_send_blocks_to_sync_commit_diffs_time",
);

Self::commit_block_queue_diff(
self.mq_adapter.clone(),
&master_block.block_id,
&master_block.top_shard_blocks_info,
&partitions,
)?;
Self::commit_block_queue_diff(
self.mq_adapter.clone(),
&master_block.block_id,
&master_block.top_shard_blocks_info,
&partitions,
)?;
}
}
McBlockSubgraphExtract::AlreadyExtracted => {
tracing::debug!(
Expand Down Expand Up @@ -145,25 +157,31 @@ where
Ok(collator_tasks.unwrap_or_default())
}

fn send_block_to_sync(&self, data: BlockCacheEntryData) -> Result<()> {
fn send_block_to_sync(
&self,
data: BlockCacheEntryData,
queue_partitions: Option<FastHashSet<QueuePartitionIdx>>,
) -> Result<bool> {
let candidate_stuff = match data {
BlockCacheEntryData::Collated {
candidate_stuff,
status,
received_after_collation: false,
..
} if status != CandidateStatus::Synced => candidate_stuff,
_ => return Ok(()),
// do not try to apply block because it is already applied,
// more likely we have synced to some next applied mc block
_ => return Ok(false),
};

let block_id = *candidate_stuff.candidate.block.id();
self.state_node_adapter
.accept_block(candidate_stuff.into_block_for_sync())?;
.accept_block(candidate_stuff.into_block_for_sync(queue_partitions))?;
tracing::debug!(
target: tracing_targets::COLLATION_MANAGER,
"Block was successfully sent to sync ({})",
block_id,
);
Ok(())
Ok(true)
}
}
18 changes: 15 additions & 3 deletions collator/src/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ use crate::types::processed_upto::{
};
use crate::types::{
BlockCollationResult, BlockIdExt, CollationSessionInfo, CollatorConfig, DebugDisplayOpt,
DisplayAsShortId, McData, ShardDescriptionShortExt, ShardHashesExt, TopBlockId,
TopBlockIdUpdated,
DisplayAsShortId, McData, ShardDescriptionShortExt, ShardHashesExt, ShardHashesReadExt,
TopBlockId, TopBlockIdUpdated,
};
use crate::utils::block::detect_top_processed_to_anchor;
use crate::utils::vset_cache::ValidatorSetCache;
Expand Down Expand Up @@ -229,10 +229,22 @@ where
break;
}
Some(event) => match event {
StateEvent::OwnBlockApplied { state, processed_upto } => {
StateEvent::OwnBlockApplied { state, processed_upto, queue_partitions } => {
// spawn validation cancellation
mgr.schedule_cancel_validation_sessions_until_block(state.clone());

// commit messages queue
if state.block_id().is_masterchain() {
let _histogram =
HistogramGuard::begin("tycho_collator_send_blocks_to_sync_commit_diffs_time");
Self::commit_block_queue_diff(
mgr.mq_adapter.clone(),
state.block_id(),
&state.shards()?.top_shard_blocks_info()?,
&queue_partitions.expect("should be Some for master block"),
)?;
}

// handle applied block in mempool
mgr.detect_top_processed_to_anchor_and_notify_mempool(
state,
Expand Down
34 changes: 20 additions & 14 deletions collator/src/manager/state_event_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,19 @@ use std::sync::Arc;
use anyhow::Result;
use async_trait::async_trait;
use tokio::sync::mpsc;
use tycho_block_util::queue::QueuePartitionIdx;
use tycho_block_util::state::ShardStateStuff;
use tycho_types::models::{BlockId, ProcessedUptoInfo};
use tycho_util::FastHashSet;

use crate::manager::metrics_report_last_applied_block_and_anchor;
use crate::state_node::StateNodeEventListener;
use crate::state_node::{AcceptedBlockContext, StateNodeEventListener};

pub enum StateEvent {
OwnBlockApplied {
state: ShardStateStuff,
processed_upto: ProcessedUptoInfo,
queue_partitions: Option<FastHashSet<QueuePartitionIdx>>,
},
ExternalBlockApplied {
mc_block_id: BlockId,
Expand All @@ -35,37 +38,40 @@ impl ChannelStateEventListener {

#[async_trait]
impl StateNodeEventListener for ChannelStateEventListener {
async fn on_block_accepted(
&self,
_mc_block_id: &BlockId,
state: &ShardStateStuff,
) -> Result<()> {
async fn on_block_accepted(&self, ctx: AcceptedBlockContext) -> Result<()> {
let AcceptedBlockContext {
state,
queue_partitions,
..
} = ctx;

let processed_upto = state.state().processed_upto.load()?;

metrics_report_last_applied_block_and_anchor(state, &processed_upto)?;
metrics_report_last_applied_block_and_anchor(&state, &processed_upto)?;

self.sender
.send(StateEvent::OwnBlockApplied {
state: state.clone(),
processed_upto,
queue_partitions,
})
.await?;

Ok(())
}

async fn on_block_accepted_external(
&self,
mc_block_id: &BlockId,
state: &ShardStateStuff,
) -> Result<()> {
async fn on_block_accepted_external(&self, ctx: AcceptedBlockContext) -> Result<()> {
let AcceptedBlockContext {
mc_block_id, state, ..
} = ctx;

let processed_upto = state.state().processed_upto.load()?;

metrics_report_last_applied_block_and_anchor(state, &processed_upto)?;
metrics_report_last_applied_block_and_anchor(&state, &processed_upto)?;

self.sender
.send(StateEvent::ExternalBlockApplied {
mc_block_id: *mc_block_id,
mc_block_id,
state: state.clone(),
processed_upto,
})
Expand Down
Loading
Loading