1pub(crate) mod marshal {
5 use std::{num::NonZeroUsize, sync::Arc};
6
7 use commonware_consensus::{
8 marshal::{self, core, standard::Standard},
9 simplex::{scheme::bls12381_threshold::vrf::Scheme, types::Finalization},
10 types::{FixedEpocher, Height, ViewDelta},
11 };
12 use commonware_cryptography::{bls12381::primitives::variant::MinSig, ed25519::PublicKey};
13 use commonware_parallel::Sequential;
14 use commonware_runtime::{
15 BufferPooler, Clock, Metrics, Spawner, Storage, buffer::paged::CacheRef,
16 };
17 use commonware_storage::archive::immutable;
18 use commonware_utils::acknowledgement::Exact;
19 use eyre::WrapErr as _;
20 use rand_08::{CryptoRng, Rng};
21 use reth_ethereum::provider::db::DatabaseEnv;
22 use reth_node_builder::NodeTypesWithDBAdapter;
23 use reth_provider::providers::BlockchainProvider;
24 use tempo_node::{TempoFullNode, node::TempoNode};
25 use tracing::{info, instrument};
26
27 use crate::{
28 consensus::{Digest, block::Block},
29 epoch::SchemeProvider,
30 storage::{self, Hybrid},
31 };
32
33 pub(crate) type Actor<TContext> = core::Actor<
34 TContext,
35 Standard<Block>,
36 SchemeProvider,
37 immutable::Archive<TContext, Digest, Finalization<Scheme<PublicKey, MinSig>, Digest>>,
38 Hybrid<TContext, BlockchainProvider<NodeTypesWithDBAdapter<TempoNode, DatabaseEnv>>>,
39 FixedEpocher,
40 Sequential,
41 Exact,
42 >;
43
44 pub(crate) type Mailbox = core::Mailbox<Scheme<PublicKey, MinSig>, Standard<Block>>;
45
46 pub(crate) struct Config {
49 pub partition_prefix: String,
51
52 pub mailbox_size: usize,
54
55 pub view_retention_timeout: ViewDelta,
60
61 pub max_pending_acks: NonZeroUsize,
64
65 pub finalized_blocks_retention: u64,
68
69 pub epoch_strategy: FixedEpocher,
71
72 pub scheme_provider: SchemeProvider,
76 }
77
78 pub(crate) struct Initialized<TContext>
81 where
82 TContext: Clock + Metrics + Spawner + Storage + BufferPooler + Rng + CryptoRng,
83 {
84 pub actor: Actor<TContext>,
86
87 pub mailbox: Mailbox,
89
90 pub last_finalized_height: Height,
95 }
96
97 #[instrument(
107 skip_all,
108 fields(partition_prefix = %config.partition_prefix),
109 err(Display)
110 )]
111 pub(crate) async fn init<TContext>(
112 context: TContext,
113 page_cache: CacheRef,
114 execution_node: Arc<TempoFullNode>,
115 config: Config,
116 ) -> eyre::Result<Initialized<TContext>>
117 where
118 TContext: Clock
119 + Metrics
120 + Spawner
121 + Storage
122 + BufferPooler
123 + Rng
124 + CryptoRng
125 + Clone
126 + Send
127 + 'static,
128 {
129 let finalizations_by_height = storage::init_finalizations_archive(
130 &context,
131 &config.partition_prefix,
132 page_cache.clone(),
133 )
134 .await
135 .wrap_err("failed to initialize finalizations by height archive")?;
136
137 let finalized_blocks = storage::init_finalized_blocks(
138 &context,
139 &config.partition_prefix,
140 page_cache.clone(),
141 execution_node.provider.clone(),
142 config.finalized_blocks_retention,
143 )
144 .await
145 .wrap_err("failed to initialize hybrid finalized blocks store")?;
146
147 let (actor, mailbox, marshal_stored_height) = core::Actor::init(
148 context.with_label("marshal"),
149 finalizations_by_height,
150 finalized_blocks,
151 marshal::Config {
152 provider: config.scheme_provider,
153 epocher: config.epoch_strategy,
154 partition_prefix: config.partition_prefix,
155 mailbox_size: config.mailbox_size,
156 view_retention_timeout: config.view_retention_timeout,
157 prunable_items_per_section: storage::PRUNABLE_ITEMS_PER_SECTION,
158 page_cache,
159 replay_buffer: storage::REPLAY_BUFFER,
160 key_write_buffer: storage::WRITE_BUFFER,
161 value_write_buffer: storage::WRITE_BUFFER,
162 max_repair: storage::MAX_REPAIR,
163 max_pending_acks: config.max_pending_acks,
164 block_codec_config: (),
165 strategy: Sequential,
166 },
167 )
168 .await;
169
170 let reth_finalized_height = execution_node
175 .provider
176 .canonical_in_memory_state()
177 .get_finalized_num_hash()
178 .map(|nh| nh.number)
179 .unwrap_or(0);
180 let last_finalized_height = marshal_stored_height.max(Height::new(reth_finalized_height));
181 if last_finalized_height > marshal_stored_height {
182 info!(
183 marshal_stored = %marshal_stored_height,
184 reth_finalized = reth_finalized_height,
185 "advancing marshal sync floor to reth's finalized block"
186 );
187 mailbox.set_floor(last_finalized_height).await;
188 }
189
190 Ok(Initialized {
191 actor,
192 mailbox,
193 last_finalized_height,
194 })
195 }
196}