1use crate::{consensus::Digest, epoch::SchemeProvider};
2use alloy_consensus::{BlockHeader, Transaction, transaction::TxHashRef};
3use alloy_primitives::{Address, B256, BlockHash, Bytes, TxHash};
4use alloy_rlp::Decodable;
5use commonware_codec::DecodeExt;
6use commonware_consensus::{
7 Epochable, Reporter, Viewable,
8 marshal::SchemeProvider as _,
9 simplex::{
10 select_leader,
11 signing_scheme::{
12 Scheme as _,
13 bls12381_threshold::{self, Scheme},
14 },
15 types::Activity,
16 },
17 types::Round,
18 utils,
19};
20use commonware_cryptography::{
21 Signer, Verifier,
22 bls12381::primitives::variant::MinSig,
23 ed25519::{PrivateKey, PublicKey, Signature},
24};
25use commonware_p2p::{Receiver, Recipients, Sender};
26use commonware_runtime::{Handle, Metrics, Pacer, Spawner};
27use eyre::{Context, OptionExt};
28use futures::{FutureExt as _, StreamExt, channel::mpsc};
29use indexmap::IndexMap;
30use parking_lot::Mutex;
31use reth_consensus_common::validation::MAX_RLP_BLOCK_SIZE;
32use reth_evm::{Evm, revm::database::State};
33use reth_node_builder::ConfigureEvm;
34use reth_primitives_traits::Recovered;
35use reth_provider::{
36 BlockReader, BlockSource, ProviderError, StateProviderBox, StateProviderFactory,
37};
38use reth_revm::database::StateProviderDatabase;
39use std::{
40 pin::Pin,
41 sync::{Arc, mpsc::RecvError},
42 time::{Duration, Instant},
43};
44use tempo_node::{TempoFullNode, consensus::TEMPO_SHARED_GAS_DIVISOR, evm::evm::TempoEvm};
45use tempo_primitives::{
46 RecoveredSubBlock, SignedSubBlock, SubBlock, SubBlockVersion, TempoTxEnvelope,
47};
48use tokio::sync::broadcast;
49use tracing::{Instrument, Level, Span, debug, instrument, warn};
50
51const MAX_SUBBLOCK_TXS: usize = 100_000;
56
57pub(crate) struct Config<TContext> {
58 pub(crate) context: TContext,
59 pub(crate) signer: PrivateKey,
60 pub(crate) scheme_provider: SchemeProvider,
61 pub(crate) node: TempoFullNode,
62 pub(crate) fee_recipient: Address,
63 pub(crate) time_to_build_subblock: Duration,
64 pub(crate) subblock_broadcast_interval: Duration,
65 pub(crate) epoch_length: u64,
66}
67
68pub(crate) struct Actor<TContext> {
79 actions_tx: mpsc::UnboundedSender<Message>,
81 actions_rx: mpsc::UnboundedReceiver<Message>,
83 subblock_transactions_rx: broadcast::Receiver<Recovered<TempoTxEnvelope>>,
85 our_subblock: PendingSubblock,
87
88 scheme_provider: SchemeProvider,
90 context: TContext,
92 signer: PrivateKey,
94 node: TempoFullNode,
96 fee_recipient: Address,
98 time_to_build_subblock: Duration,
100 subblock_broadcast_interval: Duration,
102 epoch_length: u64,
104
105 consensus_tip: Option<(Round, BlockHash, bls12381_threshold::Signature<MinSig>)>,
107
108 subblocks: IndexMap<B256, RecoveredSubBlock>,
110 subblock_transactions: Arc<Mutex<IndexMap<TxHash, Arc<Recovered<TempoTxEnvelope>>>>>,
112}
113
114impl<TContext: Spawner + Metrics + Pacer> Actor<TContext> {
115 pub(crate) fn new(
116 Config {
117 context,
118 signer,
119 scheme_provider,
120 node,
121 fee_recipient,
122 time_to_build_subblock,
123 subblock_broadcast_interval,
124 epoch_length,
125 }: Config<TContext>,
126 ) -> Self {
127 let (actions_tx, actions_rx) = mpsc::unbounded();
128 Self {
129 our_subblock: PendingSubblock::None,
130 subblock_transactions_rx: node.add_ons_handle.eth_api().subblock_transactions_rx(),
131 scheme_provider,
132 actions_tx,
133 actions_rx,
134 context,
135 signer,
136 node,
137 fee_recipient,
138 time_to_build_subblock,
139 subblock_broadcast_interval,
140 epoch_length,
141 consensus_tip: None,
142 subblocks: Default::default(),
143 subblock_transactions: Default::default(),
144 }
145 }
146
147 pub(crate) fn mailbox(&self) -> Mailbox {
149 Mailbox {
150 tx: self.actions_tx.clone(),
151 }
152 }
153
154 pub(crate) async fn run(
155 mut self,
156 (mut network_tx, mut network_rx): (
157 impl Sender<PublicKey = PublicKey>,
158 impl Receiver<PublicKey = PublicKey>,
159 ),
160 ) {
161 loop {
162 let (subblock_task, broadcast_interval) = match &mut self.our_subblock {
163 PendingSubblock::None => (None, None),
164 PendingSubblock::Task(task) => (Some(task), None),
165 PendingSubblock::Built(built) => (None, Some(&mut built.broadcast_interval)),
166 };
167
168 tokio::select! {
169 biased;
170
171 Some(action) = self.actions_rx.next() => {
173 self.on_new_message(action);
174 },
175 Ok(transaction) = self.subblock_transactions_rx.recv() => {
177 self.on_new_subblock_transaction(transaction);
178 },
179 Ok((sender, message)) = network_rx.recv() => {
181 let _ = self.on_network_message(sender, message, &mut network_tx).await;
182 },
183 subblock = if let Some(task) = subblock_task {
185 (&mut task.handle).fuse()
186 } else {
187 futures::future::Fuse::terminated()
188 } => {
189 let task = self.our_subblock.take_task().unwrap();
190 self.on_built_subblock(subblock, task.proposer).await;
191 }
192 _ = if let Some(broadcast_interval) = broadcast_interval {
194 broadcast_interval.fuse()
195 } else {
196 futures::future::Fuse::terminated()
197 } => {
198 self.broadcast_built_subblock(&mut network_tx).await;
199 }
200 }
201 }
202 }
203
204 fn tip(&self) -> Option<BlockHash> {
206 self.consensus_tip.as_ref().map(|(_, tip, _)| *tip)
207 }
208
209 fn on_new_message(&mut self, action: Message) {
210 match action {
211 Message::GetSubBlocks { parent, response } => {
212 if self.tip() != Some(parent) {
214 let _ = response.send(Vec::new());
215 return;
216 }
217 let subblocks = self.subblocks.values().cloned().collect();
219 let _ = response.send(subblocks);
220 }
221 Message::Consensus(activity) => self.on_consensus_event(*activity),
222 Message::ValidatedSubblock(subblock) => self.on_validated_subblock(subblock),
223 }
224 }
225
226 #[instrument(skip_all, fields(transaction.tx_hash = %transaction.tx_hash()))]
227 fn on_new_subblock_transaction(&self, transaction: Recovered<TempoTxEnvelope>) {
228 if !transaction
229 .subblock_proposer()
230 .is_some_and(|k| k.matches(self.signer.public_key()))
231 {
232 return;
233 }
234 let mut txs = self.subblock_transactions.lock();
235 if txs.len() >= MAX_SUBBLOCK_TXS {
236 return;
237 }
238 txs.insert(*transaction.tx_hash(), Arc::new(transaction));
239 }
240
241 #[instrument(skip_all, fields(event.epoch = event.epoch(), event.view = event.view()))]
243 fn on_consensus_event(&mut self, event: Activity<Scheme<PublicKey, MinSig>, Digest>) {
244 let (new_tip, new_round, new_cert) = match event {
245 Activity::Notarization(n) => {
246 (Some(n.proposal.payload.0), n.proposal.round, n.certificate)
247 }
248 Activity::Finalization(n) => {
249 (Some(n.proposal.payload.0), n.proposal.round, n.certificate)
250 }
251 Activity::Nullification(n) => (None, n.round, n.certificate),
252 _ => return,
253 };
254
255 if let Some((round, tip, cert)) = &mut self.consensus_tip
256 && *round <= new_round
257 {
258 *round = new_round;
259 *cert = new_cert;
260
261 if let Some(new_tip) = new_tip
262 && *tip != new_tip
263 {
264 self.subblocks.clear();
266 *tip = new_tip;
267 }
268 } else if self.consensus_tip.is_none()
269 && let Some(new_tip) = new_tip
270 {
271 self.consensus_tip = Some((new_round, new_tip, new_cert));
273 }
274
275 let Some((round, tip, certificate)) = &self.consensus_tip else {
276 return;
277 };
278
279 let Ok(Some(header)) = self
280 .node
281 .provider
282 .find_block_by_hash(*tip, BlockSource::Any)
283 else {
284 debug!(?tip, "missing header for the tip block at {tip}");
285 return;
286 };
287
288 let epoch_of_next_block = utils::epoch(self.epoch_length, header.number() + 1);
289
290 let Some(scheme) = self.scheme_provider.scheme(epoch_of_next_block) else {
292 debug!(epoch_of_next_block, "scheme not found for epoch");
293 return;
294 };
295
296 let next_round = if round.epoch() == epoch_of_next_block {
297 Round::new(round.epoch(), round.view() + 1)
298 } else {
299 Round::new(epoch_of_next_block, 1)
300 };
301
302 let seed = if next_round.view() == 1 {
303 None
305 } else {
306 scheme.seed(*round, certificate)
307 };
308
309 let (next_proposer, _) = select_leader::<Scheme<PublicKey, MinSig>, _>(
310 scheme.participants().as_ref(),
311 next_round,
312 seed,
313 );
314
315 debug!(?next_proposer, ?next_round, "determined next proposer");
316
317 if self.our_subblock.parent_hash() != Some(*tip)
319 || self.our_subblock.target_proposer() != Some(&next_proposer)
320 {
321 debug!(%tip, %next_proposer, "building new subblock");
322 self.build_new_subblock(*tip, next_proposer, scheme);
323 }
324 }
325
326 fn build_new_subblock(
327 &mut self,
328 parent_hash: BlockHash,
329 next_proposer: PublicKey,
330 scheme: Arc<Scheme<PublicKey, MinSig>>,
331 ) {
332 let transactions = self.subblock_transactions.clone();
333 let node = self.node.clone();
334 let num_validators = scheme.participants().len();
335 let signer = self.signer.clone();
336 let fee_recipient = self.fee_recipient;
337 let timeout = self.time_to_build_subblock;
338 let span = Span::current();
339 let handle = self
340 .context
341 .with_label("validate_subblock")
342 .shared(true)
343 .spawn(move |_| {
344 build_subblock(
345 transactions,
346 node,
347 parent_hash,
348 num_validators,
349 signer,
350 fee_recipient,
351 timeout,
352 )
353 .instrument(span)
354 });
355
356 self.our_subblock = PendingSubblock::Task(BuildSubblockTask {
357 handle,
358 parent_hash,
359 proposer: next_proposer,
360 });
361 }
362
363 #[instrument(skip_all, err(level = Level::WARN), fields(sender = %sender, msg_bytes = message.len()))]
364 async fn on_network_message(
365 &mut self,
366 sender: PublicKey,
367 message: bytes::Bytes,
368 network_tx: &mut impl Sender<PublicKey = PublicKey>,
369 ) -> eyre::Result<()> {
370 let message =
371 SubblocksMessage::decode(message).wrap_err("failed to decode network message")?;
372
373 let subblock = match message {
374 SubblocksMessage::Subblock(subblock) => subblock,
375 SubblocksMessage::Ack(ack) => {
377 if let PendingSubblock::Built(built) = &mut self.our_subblock
378 && built.proposer == sender
379 && ack == built.subblock.signature_hash()
380 {
381 debug!("received acknowledgement from the next proposer");
382 built.stop_broadcasting();
383 } else {
384 warn!(%ack, "received invalid acknowledgement");
385 }
386
387 return Ok(());
388 }
389 };
390
391 let Some(tip) = self.tip() else {
392 return Err(eyre::eyre!("missing tip of the chain"));
393 };
394
395 eyre::ensure!(
397 subblock.parent_hash == tip,
398 "invalid subblock parent, expected {tip}, got {}",
399 subblock.parent_hash
400 );
401
402 let _ = network_tx
408 .send(
409 Recipients::One(sender.clone()),
410 SubblocksMessage::Ack(subblock.signature_hash()).encode(),
411 true,
412 )
413 .await;
414
415 debug!("validating new subblock");
416
417 let node = self.node.clone();
419 let validated_subblocks_tx = self.actions_tx.clone();
420 let scheme_provider = self.scheme_provider.clone();
421 let epoch_length = self.epoch_length;
422 let span = Span::current();
423 self.context.clone().shared(true).spawn(move |_| {
424 validate_subblock(
425 sender.clone(),
426 node,
427 subblock,
428 validated_subblocks_tx,
429 scheme_provider,
430 epoch_length,
431 )
432 .instrument(span)
433 });
434
435 Ok(())
436 }
437
438 #[instrument(skip_all, fields(subblock.validator = %subblock.validator(), subblock.parent_hash = %subblock.parent_hash))]
439 fn on_validated_subblock(&mut self, subblock: RecoveredSubBlock) {
440 if Some(subblock.parent_hash) != self.tip() {
442 return;
443 }
444
445 debug!(subblock = ?subblock, "validated subblock");
446
447 self.subblocks.insert(subblock.validator(), subblock);
448 }
449
450 #[instrument(skip_all)]
451 async fn on_built_subblock(
452 &mut self,
453 subblock: Result<RecoveredSubBlock, commonware_runtime::Error>,
454 next_proposer: PublicKey,
455 ) {
456 let subblock = match subblock {
457 Ok(subblock) => subblock,
458 Err(error) => {
459 warn!(%error, "failed to build subblock");
460 return;
461 }
462 };
463
464 if Some(subblock.parent_hash) != self.tip() {
465 return;
466 }
467
468 self.our_subblock = PendingSubblock::Built(BuiltSubblock {
469 subblock,
470 proposer: next_proposer,
471 broadcast_interval: Box::pin(futures::future::ready(())),
473 });
474 }
475
476 #[instrument(skip_all)]
477 async fn broadcast_built_subblock(
478 &mut self,
479 network_tx: &mut impl Sender<PublicKey = PublicKey>,
480 ) {
481 let PendingSubblock::Built(built) = &mut self.our_subblock else {
482 return;
483 };
484
485 built.broadcast_interval = Box::pin(self.context.sleep(self.subblock_broadcast_interval));
487
488 debug!(
489 ?built.subblock,
490 next_proposer = %built.proposer,
491 "sending subblock to the next proposer"
492 );
493
494 if built.proposer != self.signer.public_key() {
495 let _ = network_tx
496 .send(
497 Recipients::One(built.proposer.clone()),
498 SubblocksMessage::Subblock((*built.subblock).clone()).encode(),
499 true,
500 )
501 .await;
502 } else {
503 let subblock = built.subblock.clone();
504 built.stop_broadcasting();
505 self.on_validated_subblock(subblock);
506 }
507 }
508}
509
510#[derive(Debug)]
512enum Message {
513 GetSubBlocks {
518 parent: BlockHash,
520 response: std::sync::mpsc::SyncSender<Vec<RecoveredSubBlock>>,
522 },
523
524 Consensus(Box<Activity<Scheme<PublicKey, MinSig>, Digest>>),
526
527 ValidatedSubblock(RecoveredSubBlock),
529}
530
531#[derive(Default)]
533enum PendingSubblock {
534 #[default]
536 None,
537 Task(BuildSubblockTask),
539 Built(BuiltSubblock),
541}
542
543impl PendingSubblock {
544 fn take_task(&mut self) -> Option<BuildSubblockTask> {
546 if let Self::Task(task) = std::mem::take(self) {
547 Some(task)
548 } else {
549 None
550 }
551 }
552
553 fn parent_hash(&self) -> Option<BlockHash> {
555 match self {
556 Self::Task(task) => Some(task.parent_hash),
557 Self::Built(built) => Some(built.subblock.parent_hash),
558 Self::None => None,
559 }
560 }
561
562 fn target_proposer(&self) -> Option<&PublicKey> {
564 match self {
565 Self::Task(task) => Some(&task.proposer),
566 Self::Built(built) => Some(&built.proposer),
567 Self::None => None,
568 }
569 }
570}
571
572struct BuildSubblockTask {
574 handle: Handle<RecoveredSubBlock>,
576 parent_hash: BlockHash,
578 proposer: PublicKey,
580}
581
582struct BuiltSubblock {
584 subblock: RecoveredSubBlock,
586 proposer: PublicKey,
588 broadcast_interval: Pin<Box<dyn Future<Output = ()> + Send>>,
590}
591
592impl BuiltSubblock {
593 fn stop_broadcasting(&mut self) {
595 self.broadcast_interval = Box::pin(futures::future::pending());
596 }
597}
598
599#[derive(Debug)]
601enum SubblocksMessage {
602 Subblock(SignedSubBlock),
604 Ack(B256),
606}
607
608impl SubblocksMessage {
609 fn encode(self) -> bytes::Bytes {
611 match self {
612 Self::Subblock(subblock) => alloy_rlp::encode(&subblock).into(),
613 Self::Ack(hash) => bytes::Bytes::copy_from_slice(hash.as_ref()),
614 }
615 }
616
617 fn decode(message: bytes::Bytes) -> alloy_rlp::Result<Self> {
619 if message.len() == 32 {
620 let hash = B256::from_slice(&message);
621 Ok(Self::Ack(hash))
622 } else {
623 let subblock = SignedSubBlock::decode(&mut &*message)?;
624 Ok(Self::Subblock(subblock))
625 }
626 }
627}
628
629#[derive(Clone)]
631pub(crate) struct Mailbox {
632 tx: mpsc::UnboundedSender<Message>,
633}
634
635impl Mailbox {
636 pub(crate) fn get_subblocks(
637 &self,
638 parent: BlockHash,
639 ) -> Result<Vec<RecoveredSubBlock>, RecvError> {
640 let (tx, rx) = std::sync::mpsc::sync_channel(1);
641 let _ = self.tx.unbounded_send(Message::GetSubBlocks {
642 parent,
643 response: tx,
644 });
645 rx.recv()
646 }
647}
648
649impl Reporter for Mailbox {
650 type Activity = Activity<Scheme<PublicKey, MinSig>, Digest>;
651
652 async fn report(&mut self, activity: Self::Activity) -> () {
653 let _ = self
654 .tx
655 .unbounded_send(Message::Consensus(Box::new(activity)));
656 }
657}
658
659fn evm_at_block(
660 node: &TempoFullNode,
661 hash: BlockHash,
662) -> eyre::Result<TempoEvm<State<StateProviderDatabase<StateProviderBox>>>> {
663 let db = State::builder()
664 .with_database(StateProviderDatabase::new(
665 node.provider.state_by_block_hash(hash)?,
666 ))
667 .build();
668 let header = node
669 .provider
670 .find_block_by_hash(hash, BlockSource::Any)?
671 .ok_or(ProviderError::BestBlockNotFound)?;
672
673 Ok(node.evm_config.evm_for_block(db, &header)?)
674}
675
676#[instrument(skip_all, fields(parent_hash = %parent_hash))]
680async fn build_subblock(
681 transactions: Arc<Mutex<IndexMap<TxHash, Arc<Recovered<TempoTxEnvelope>>>>>,
682 node: TempoFullNode,
683 parent_hash: BlockHash,
684 num_validators: usize,
685 signer: PrivateKey,
686 fee_recipient: Address,
687 timeout: Duration,
688) -> RecoveredSubBlock {
689 let start = Instant::now();
690
691 let (transactions, senders) = match evm_at_block(&node, parent_hash) {
692 Ok(mut evm) => {
693 let mut selected_transactions = Vec::new();
694 let mut senders = Vec::new();
695 let mut gas_left =
696 evm.block().gas_limit / TEMPO_SHARED_GAS_DIVISOR / num_validators as u64;
697
698 let txs = transactions.lock().clone();
699 for (tx_hash, tx) in txs {
700 if tx.gas_limit() > gas_left {
701 continue;
702 }
703 if let Err(err) = evm.transact_commit(&*tx) {
704 warn!(%err, tx_hash = %tx_hash, "invalid subblock candidate transaction");
705 transactions.lock().swap_remove(&tx_hash);
707 continue;
708 }
709
710 gas_left -= tx.gas_limit();
711 selected_transactions.push(tx.inner().clone());
712 senders.push(tx.signer());
713
714 if start.elapsed() > timeout {
715 break;
716 }
717 }
718
719 (selected_transactions, senders)
720 }
721 Err(err) => {
722 warn!(%err, "failed to build an evm at block, building an empty subblock");
723
724 Default::default()
725 }
726 };
727
728 let subblock = SubBlock {
729 version: SubBlockVersion::V1,
730 fee_recipient,
731 parent_hash,
732 transactions,
733 };
734
735 let signature = signer.sign(None, subblock.signature_hash().as_slice());
736 let signed_subblock = SignedSubBlock {
737 inner: subblock,
738 signature: Bytes::copy_from_slice(signature.as_ref()),
739 };
740
741 RecoveredSubBlock::new_unchecked(
742 signed_subblock,
743 senders,
744 B256::from_slice(&signer.public_key()),
745 )
746}
747
748#[instrument(skip_all, err(level = Level::WARN), fields(sender = %sender))]
756async fn validate_subblock(
757 sender: PublicKey,
758 node: TempoFullNode,
759 subblock: SignedSubBlock,
760 actions_tx: mpsc::UnboundedSender<Message>,
761 scheme_provider: SchemeProvider,
762 epoch_length: u64,
763) -> eyre::Result<()> {
764 let Ok(signature) =
765 Signature::decode(&mut subblock.signature.as_ref()).wrap_err("invalid signature")
766 else {
767 return Err(eyre::eyre!("invalid signature"));
768 };
769
770 if !sender.verify(None, subblock.signature_hash().as_slice(), &signature) {
771 return Err(eyre::eyre!("invalid signature"));
772 }
773
774 if subblock.transactions.iter().any(|tx| {
775 tx.subblock_proposer()
776 .is_none_or(|proposer| !proposer.matches(&sender))
777 }) {
778 return Err(eyre::eyre!(
779 "all transactions must specify the subblock validator"
780 ));
781 }
782
783 let subblock = subblock.try_into_recovered(B256::from_slice(&sender))?;
785
786 let mut evm = evm_at_block(&node, subblock.parent_hash)?;
787
788 let epoch = utils::epoch(epoch_length, evm.block().number.to::<u64>() + 1);
789 let scheme = scheme_provider
790 .scheme(epoch)
791 .ok_or_eyre("scheme not found")?;
792
793 eyre::ensure!(
794 scheme.participants().iter().any(|p| p == &sender),
795 "sender is not a validator"
796 );
797
798 let max_size = MAX_RLP_BLOCK_SIZE
802 / TEMPO_SHARED_GAS_DIVISOR as usize
803 / scheme.participants().len() as usize;
804 if subblock.total_tx_size() > max_size {
805 warn!(
806 size = subblock.total_tx_size(),
807 max_size, "subblock is too large, skipping"
808 );
809 return Ok(());
810 }
811
812 for tx in subblock.transactions_recovered() {
813 if let Err(err) = evm.transact_commit(tx) {
814 return Err(eyre::eyre!("transaction failed to execute: {err:?}"));
815 }
816 }
817
818 let _ = actions_tx.unbounded_send(Message::ValidatedSubblock(subblock));
819
820 Ok(())
821}