Skip to main content

tempo_consensus/
alias.rs

1//! A collection of aliases and shared initialization for frequently used
2//! (primarily commonware) types.
3
4pub(crate) mod marshal {
5    use std::{num::NonZeroUsize, sync::Arc};
6
7    use commonware_consensus::{
8        marshal::{self, core, standard::Standard},
9        simplex::{scheme::bls12381_threshold::vrf::Scheme, types::Finalization},
10        types::{FixedEpocher, Height, ViewDelta},
11    };
12    use commonware_cryptography::{bls12381::primitives::variant::MinSig, ed25519::PublicKey};
13    use commonware_parallel::Sequential;
14    use commonware_runtime::{
15        BufferPooler, Clock, Metrics, Spawner, Storage, buffer::paged::CacheRef,
16    };
17    use commonware_storage::archive::immutable;
18    use commonware_utils::acknowledgement::Exact;
19    use eyre::WrapErr as _;
20    use rand_08::{CryptoRng, Rng};
21    use reth_ethereum::provider::db::DatabaseEnv;
22    use reth_node_builder::NodeTypesWithDBAdapter;
23    use reth_provider::providers::BlockchainProvider;
24    use tempo_node::{TempoFullNode, node::TempoNode};
25    use tracing::{info, instrument};
26
27    use crate::{
28        consensus::{Digest, block::Block},
29        epoch::SchemeProvider,
30        storage::{self, Hybrid},
31    };
32
33    pub(crate) type Actor<TContext> = core::Actor<
34        TContext,
35        Standard<Block>,
36        SchemeProvider,
37        immutable::Archive<TContext, Digest, Finalization<Scheme<PublicKey, MinSig>, Digest>>,
38        Hybrid<TContext, BlockchainProvider<NodeTypesWithDBAdapter<TempoNode, DatabaseEnv>>>,
39        FixedEpocher,
40        Sequential,
41        Exact,
42    >;
43
44    pub(crate) type Mailbox = core::Mailbox<Scheme<PublicKey, MinSig>, Standard<Block>>;
45
46    /// Settings shared by both engines when initializing the marshal actor
47    /// and its backing finalized-blocks store.
48    pub(crate) struct Config {
49        /// Partition prefix shared with the engine's other on-disk archives.
50        pub partition_prefix: String,
51
52        /// Marshal mailbox capacity.
53        pub mailbox_size: usize,
54
55        /// Minimum number of views to retain temporary marshal data after a
56        /// block is processed. The two engines pick very different values for
57        /// this — consensus keeps state around long enough to serve peers,
58        /// follow mode does not — so the caller computes it.
59        pub view_retention_timeout: ViewDelta,
60
61        /// Maximum number of marshal-dispatched blocks the application may
62        /// buffer before acknowledging.
63        pub max_pending_acks: NonZeroUsize,
64
65        /// Number of recently finalized blocks retained in the prunable
66        /// archive. Older blocks are served from reth via [`Hybrid`].
67        pub finalized_blocks_retention: u64,
68
69        /// Epoch length / boundary configuration.
70        pub epoch_strategy: FixedEpocher,
71
72        /// Provider for epoch-specific signing schemes used by marshal to
73        /// verify finalizations. The same instance is shared with the rest of
74        /// the engine, so the caller passes it in.
75        pub scheme_provider: SchemeProvider,
76    }
77
78    /// Marshal actor + mailbox + the height marshal will resume from,
79    /// returned by [`init`].
80    pub(crate) struct Initialized<TContext>
81    where
82        TContext: Clock + Metrics + Spawner + Storage + BufferPooler + Rng + CryptoRng,
83    {
84        /// The marshal actor, ready to be started.
85        pub actor: Actor<TContext>,
86
87        /// Mailbox for sending messages to [`Self::actor`].
88        pub mailbox: Mailbox,
89
90        /// `max(marshal_stored_height, reth_finalized_height)` after
91        /// advancing marshal's sync floor to that height. The engine uses
92        /// this to seed the executor and other actors that need to know
93        /// where the chain starts replaying from.
94        pub last_finalized_height: Height,
95    }
96
97    /// Initialize the marshal actor and its backing finalized-blocks store
98    /// (the finalizations-by-height archive plus the [`Hybrid`] finalized
99    /// blocks store), and advance marshal's sync floor to
100    /// `max(marshal_stored_height, reth_finalized_height)`.
101    ///
102    /// Both the consensus and follow engines must initialize marshal in
103    /// exactly the same way so that nodes can switch modes without data
104    /// migration. Use this function to maintain that invariant; differences
105    /// between the two engines belong in [`Config`].
106    #[instrument(
107        skip_all,
108        fields(partition_prefix = %config.partition_prefix),
109        err(Display)
110    )]
111    pub(crate) async fn init<TContext>(
112        context: TContext,
113        page_cache: CacheRef,
114        execution_node: Arc<TempoFullNode>,
115        config: Config,
116    ) -> eyre::Result<Initialized<TContext>>
117    where
118        TContext: Clock
119            + Metrics
120            + Spawner
121            + Storage
122            + BufferPooler
123            + Rng
124            + CryptoRng
125            + Clone
126            + Send
127            + 'static,
128    {
129        let finalizations_by_height = storage::init_finalizations_archive(
130            &context,
131            &config.partition_prefix,
132            page_cache.clone(),
133        )
134        .await
135        .wrap_err("failed to initialize finalizations by height archive")?;
136
137        let finalized_blocks = storage::init_finalized_blocks(
138            &context,
139            &config.partition_prefix,
140            page_cache.clone(),
141            execution_node.provider.clone(),
142            config.finalized_blocks_retention,
143        )
144        .await
145        .wrap_err("failed to initialize hybrid finalized blocks store")?;
146
147        let (actor, mailbox, marshal_stored_height) = core::Actor::init(
148            context.with_label("marshal"),
149            finalizations_by_height,
150            finalized_blocks,
151            marshal::Config {
152                provider: config.scheme_provider,
153                epocher: config.epoch_strategy,
154                partition_prefix: config.partition_prefix,
155                mailbox_size: config.mailbox_size,
156                view_retention_timeout: config.view_retention_timeout,
157                prunable_items_per_section: storage::PRUNABLE_ITEMS_PER_SECTION,
158                page_cache,
159                replay_buffer: storage::REPLAY_BUFFER,
160                key_write_buffer: storage::WRITE_BUFFER,
161                value_write_buffer: storage::WRITE_BUFFER,
162                max_repair: storage::MAX_REPAIR,
163                max_pending_acks: config.max_pending_acks,
164                block_codec_config: (),
165                strategy: Sequential,
166            },
167        )
168        .await;
169
170        // Floor marshal at reth's last finalized block so we don't try to
171        // re-sync history that the execution layer already finalized. The
172        // mailbox message is buffered until the actor starts; `set_floor` only
173        // ever advances, so sending it unconditionally is safe.
174        let reth_finalized_height = execution_node
175            .provider
176            .canonical_in_memory_state()
177            .get_finalized_num_hash()
178            .map(|nh| nh.number)
179            .unwrap_or(0);
180        let last_finalized_height = marshal_stored_height.max(Height::new(reth_finalized_height));
181        if last_finalized_height > marshal_stored_height {
182            info!(
183                marshal_stored = %marshal_stored_height,
184                reth_finalized = reth_finalized_height,
185                "advancing marshal sync floor to reth's finalized block"
186            );
187            mailbox.set_floor(last_finalized_height).await;
188        }
189
190        Ok(Initialized {
191            actor,
192            mailbox,
193            last_finalized_height,
194        })
195    }
196}