diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index d5fd113a8..966e73210 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -64,6 +64,15 @@ impl BlockProcessingOutcome { }, } } + + /// Returns `true` if the block was successfully processed and can be removed from any import + /// queues or temporary storage. + pub fn sucessfully_processed(&self) -> bool { + match self { + BlockProcessingOutcome::ValidBlock(_) => true, + _ => false, + } + } } pub struct BeaconChain { diff --git a/beacon_node/network/src/sync/simple_sync.rs b/beacon_node/network/src/sync/simple_sync.rs index 7609f5750..b77a976b1 100644 --- a/beacon_node/network/src/sync/simple_sync.rs +++ b/beacon_node/network/src/sync/simple_sync.rs @@ -425,14 +425,15 @@ impl SimpleSync { } pub fn process_import_queue(&mut self, network: &mut NetworkContext) { - let mut blocks: Vec<(Hash256, BeaconBlock, PeerId)> = self + let mut blocks: Vec<(usize, BeaconBlock, PeerId)> = self .import_queue .partials .iter() - .filter_map(|(key, partial)| { + .enumerate() + .filter_map(|(i, partial)| { if let Some(_) = partial.body { let (block, _root) = partial.clone().complete().expect("Body must be Some"); - Some((*key, block, partial.sender.clone())) + Some((i, block, partial.sender.clone())) } else { None } @@ -469,7 +470,7 @@ impl SimpleSync { if !keys_to_delete.is_empty() { info!(self.log, "Processed {} blocks", keys_to_delete.len()); for key in keys_to_delete { - self.import_queue.partials.remove(&key); + self.import_queue.partials.remove(key); } } } @@ -539,7 +540,7 @@ pub struct ImportQueue { /// BeaconChain pub chain: Arc, /// Partially imported blocks, keyed by the root of `BeaconBlockBody`. - pub partials: HashMap, + pub partials: Vec, /// Time before a queue entry is consider state. pub stale_time: Duration, /// Logging @@ -550,7 +551,7 @@ impl ImportQueue { pub fn new(chain: Arc, stale_time: Duration, log: slog::Logger) -> Self { Self { chain, - partials: HashMap::new(), + partials: vec![], stale_time, log, } @@ -561,29 +562,30 @@ impl ImportQueue { /// An entry is stale if it has as a `inserted` time that is more than `self.stale_time` in the /// past. pub fn remove_stale(&mut self) { - let keys: Vec = self + let stale_indices: Vec = self .partials .iter() - .filter_map(|(key, partial)| { + .enumerate() + .filter_map(|(i, partial)| { if partial.inserted + self.stale_time <= Instant::now() { - Some(*key) + Some(i) } else { None } }) .collect(); - if !keys.is_empty() { + if !stale_indices.is_empty() { debug!( self.log, "ImportQueue removing stale entries"; - "stale_count" => keys.len(), + "stale_items" => stale_indices.len(), "stale_time_seconds" => self.stale_time.as_secs() ); } - keys.iter().for_each(|key| { - self.partials.remove(&key); + stale_indices.iter().for_each(|&i| { + self.partials.remove(i); }); } @@ -646,29 +648,39 @@ impl ImportQueue { /// If the header already exists, the `inserted` time is set to `now` and not other /// modifications are made. fn insert_header(&mut self, block_root: Hash256, header: BeaconBlockHeader, sender: PeerId) { - self.partials - .entry(header.block_body_root) - .and_modify(|p| p.inserted = Instant::now()) - .or_insert(PartialBeaconBlock { + if let Some(i) = self + .partials + .iter() + .position(|p| p.block_root == block_root) + { + self.partials[i].inserted = Instant::now(); + } else { + self.partials.push(PartialBeaconBlock { block_root, header, body: None, inserted: Instant::now(), sender, - }); + }) + } } /// Updates an existing partial with the `body`. /// /// If there is no header for the `body`, the body is simply discarded. + /// + /// If the body already existed, the `inserted` time is set to `now`. fn insert_body(&mut self, body: BeaconBlockBody, sender: PeerId) { let body_root = Hash256::from_slice(&body.hash_tree_root()[..]); - self.partials.entry(body_root).and_modify(|p| { + self.partials.iter_mut().for_each(|mut p| { if body_root == p.header.block_body_root { - p.body = Some(body); p.inserted = Instant::now(); - p.sender = sender; + + if p.body.is_none() { + p.body = Some(body.clone()); + p.sender = sender.clone(); + } } }); } diff --git a/beacon_node/network/tests/tests.rs b/beacon_node/network/tests/tests.rs index b951d7d2a..2952e5105 100644 --- a/beacon_node/network/tests/tests.rs +++ b/beacon_node/network/tests/tests.rs @@ -511,7 +511,10 @@ fn sync_two_nodes() { // Node A builds out a longer, better chain. for _ in 0..blocks { + // Node A should build a block. node_a.harness.advance_chain_with_block(); + // Node B should just increment it's slot without a block. + node_b.harness.increment_beacon_chain_slot(); } node_a.harness.run_fork_choice();