tempo_commonware_node/follow/
driver.rs1use std::sync::Arc;
8
9use alloy_consensus::BlockHeader as _;
10use commonware_codec::{DecodeExt as _, ReadExt as _};
11use commonware_consensus::{
12 Epochable, Heightable as _, Reporter, marshal,
13 simplex::{
14 scheme::bls12381_threshold::vrf::Scheme,
15 types::{Activity, Finalization},
16 },
17 types::{Epoch, Epocher as _, FixedEpocher, Height, Round, View},
18};
19use commonware_cryptography::{
20 Signer as _,
21 bls12381::primitives::variant::MinSig,
22 ed25519::{self, PublicKey},
23};
24use commonware_math::algebra::Random as _;
25use commonware_runtime::{Clock, ContextCell, Spawner, spawn_cell};
26use commonware_utils::{Acknowledgement, vec::NonEmptyVec};
27use rand_08::{CryptoRng, Rng};
28
29use eyre::{OptionExt as _, Report, WrapErr as _};
30use reth_node_core::primitives::SealedBlock;
31use reth_provider::HeaderProvider as _;
32use tempo_node::{TempoFullNode, rpc::consensus::Event};
33use tokio::{select, sync::mpsc};
34use tracing::{debug, instrument, warn, warn_span};
35
36use crate::{
37 consensus::{Digest, block::Block},
38 epoch::SchemeProvider,
39 feed,
40};
41
42pub(super) fn try_init<TContext>(
43 context: TContext,
44 config: Config,
45) -> eyre::Result<(Driver<TContext>, Mailbox)> {
46 let (tx, rx) = mpsc::unbounded_channel();
47 let mailbox = Mailbox(tx);
48
49 let last_finalized_number = config
55 .execution_node
56 .provider
57 .canonical_in_memory_state()
58 .get_finalized_num_hash()
59 .map_or(0u64, |num_hash| num_hash.number);
60
61 let epoch_info = config
62 .epoch_strategy
63 .containing(Height::new(last_finalized_number))
64 .expect("strategy valid for all heights and epochs");
65
66 let last_boundary = if epoch_info.last().get() == last_finalized_number {
67 epoch_info.last()
68 } else if let Some(previous) = epoch_info.epoch().previous() {
69 config
70 .epoch_strategy
71 .last(previous)
72 .expect("strategy valid for all heights and epochs")
73 } else {
74 Height::zero()
75 };
76 let onchain_outcome = tempo_dkg_onchain_artifacts::OnchainDkgOutcome::read(
77 &mut config
78 .execution_node
79 .provider
80 .header_by_number(last_boundary.get())
81 .map_err(Report::new)
82 .and_then(|maybe_header| maybe_header.ok_or_eyre("execution layer did not have header"))
83 .wrap_err_with(|| {
84 format!(
85 "cannot establish baseline - unable to read the header \
86 from the last boundary block at height `{last_boundary}` \
87 from the execution layer"
88 )
89 })?
90 .extra_data()
91 .as_ref(),
92 )
93 .wrap_err("the genesis header did not contain a DKG outcome")?;
94
95 config.scheme_provider.register(
96 onchain_outcome.epoch,
97 Scheme::certificate_verifier(
98 crate::config::NAMESPACE,
99 *onchain_outcome.sharing().public(),
100 ),
101 );
102
103 let actor = Driver {
104 context: ContextCell::new(context),
105 config,
106 mailbox: rx,
107 current_epoch: epoch_info.epoch(),
108 last_boundary,
109 };
110 Ok((actor, mailbox))
111}
112
113pub(super) struct Config {
114 pub(super) execution_node: Arc<TempoFullNode>,
115 pub(super) scheme_provider: SchemeProvider,
116
117 pub(super) last_finalized_height: Height,
119
120 pub(super) marshal: crate::alias::marshal::Mailbox,
121 pub(super) feed: feed::Mailbox,
122 pub(super) epoch_strategy: FixedEpocher,
123}
124
125#[derive(Debug)]
126enum Message {
127 Event(Event),
128 Finalized(marshal::Update<Block>),
129}
130
131impl From<Event> for Message {
132 fn from(value: Event) -> Self {
133 Self::Event(value)
134 }
135}
136
137impl From<marshal::Update<Block>> for Message {
138 fn from(value: marshal::Update<Block>) -> Self {
139 Self::Finalized(value)
140 }
141}
142
143#[derive(Clone)]
144pub(super) struct Mailbox(mpsc::UnboundedSender<Message>);
145
146impl Mailbox {
147 pub(super) fn to_event_reporter(&self) -> EventReporter {
148 EventReporter(self.clone())
149 }
150
151 pub(super) fn to_marshal_reporter(&self) -> MarshalReporter {
152 MarshalReporter(self.clone())
153 }
154
155 fn send(&self, msg: impl Into<Message>) {
156 let _ = self.0.send(msg.into());
157 }
158}
159
160#[derive(Clone)]
161pub(super) struct EventReporter(Mailbox);
162
163impl Reporter for EventReporter {
164 type Activity = Event;
165
166 async fn report(&mut self, activity: Self::Activity) {
167 self.0.send(activity);
168 }
169}
170
171#[derive(Clone)]
172pub(super) struct MarshalReporter(Mailbox);
173
174impl Reporter for MarshalReporter {
175 type Activity = marshal::Update<Block>;
176
177 async fn report(&mut self, activity: Self::Activity) {
178 self.0.send(activity);
179 }
180}
181
182pub(super) struct Driver<TContext> {
183 context: ContextCell<TContext>,
184 config: Config,
185 mailbox: mpsc::UnboundedReceiver<Message>,
186
187 last_boundary: Height,
188 current_epoch: Epoch,
189}
190
191impl<C: Clock + Rng + CryptoRng> Driver<C>
192where
193 C: Spawner,
194{
195 pub(super) fn start(mut self) -> commonware_runtime::Handle<()> {
196 spawn_cell!(self.context, self.run())
197 }
198
199 async fn run(mut self) {
200 self.config.marshal.set_floor(self.last_boundary).await;
201 if self.heal_gap().await.is_err() {
202 return;
203 };
204
205 loop {
206 select!(
207 biased;
208
209 Some(message) = self.mailbox.recv() => {
210 match message {
211 Message::Event(event) => {
212 let _: Result<_, _> = self.process_event(event).await;
214 }
215 Message::Finalized(update) => {
216 self.process_update(update).await;
217 }
218 }
219 }
220 );
221 }
222 }
223
224 #[instrument(skip_all, err(Display))]
226 async fn heal_gap(&mut self) -> eyre::Result<()> {
227 let current_consensus_epoch = self
228 .config
229 .epoch_strategy
230 .containing(self.config.last_finalized_height)
231 .expect("strategy is valid for all heights and epochs");
232 let current_execution_epoch = self
233 .config
234 .epoch_strategy
235 .containing(self.last_boundary)
236 .expect("strategy is valid for all heights and epochs");
237
238 if let Some(previous) = current_consensus_epoch.epoch().previous()
239 && previous > current_execution_epoch.epoch()
240 {
241 let last_consensus_boundary = self
242 .config
243 .epoch_strategy
244 .last(previous)
245 .expect("strategy is valid for all heights and epochs");
246 let boundary_block = self
247 .config
248 .marshal
249 .get_block(last_consensus_boundary)
250 .await
251 .ok_or_else(|| {
252 eyre::eyre!(
253 "cannot heal finalization gap; consensus layer is \
254 ahead of execution layer, but consensus layer does not \
255 have boundary block at height \
256 `{last_consensus_boundary}`"
257 )
258 })?;
259
260 let onchain_outcome = tempo_dkg_onchain_artifacts::OnchainDkgOutcome::read(
261 &mut boundary_block.header().extra_data().as_ref(),
262 )
263 .wrap_err_with(|| {
264 format!(
265 "the boundary block at height `{last_consensus_boundary}` \
266 contained no or a malformed DKG outcome"
267 )
268 })?;
269
270 self.config.scheme_provider.register(
271 onchain_outcome.epoch,
272 Scheme::certificate_verifier(
273 crate::config::NAMESPACE,
274 *onchain_outcome.sharing().public(),
275 ),
276 );
277 } else {
278 debug!("no gap detected");
279 }
280
281 Ok(())
282 }
283
284 #[instrument(skip_all, err(Display))]
285 async fn process_event(&mut self, event: Event) -> eyre::Result<()> {
286 let Event::Finalized {
287 block: certified, ..
288 } = event
289 else {
290 return Ok(());
291 };
292
293 let finalization = alloy_primitives::hex::decode(&certified.certificate)
296 .map_err(Report::new)
297 .and_then(|bytes| {
298 Finalization::<Scheme<PublicKey, MinSig>, Digest>::decode(&*bytes)
299 .map_err(Report::new)
300 })
301 .wrap_err("event contained a malformed finalization certificate")?;
302
303 if finalization.epoch() > self.current_epoch {
304 let boundary_height = self
305 .config
306 .epoch_strategy
307 .last(self.current_epoch)
308 .expect("strategy is valid for all epochs and heights");
309
310 self.config
311 .marshal
312 .hint_finalized(
313 boundary_height,
314 NonEmptyVec::new(ed25519::PrivateKey::random(&mut self.context).public_key()),
317 )
318 .await;
319
320 if let Some(one_before_boundary) = boundary_height.previous() {
321 self.config.marshal.set_floor(one_before_boundary).await;
322 }
323
324 return Ok(());
325 }
326
327 let height = certified.block.number();
328 let consensus_block = Block::from_execution_block(SealedBlock::seal_slow(certified.block));
329
330 let round = Round::new(Epoch::new(certified.epoch), View::new(certified.view));
332 let activity = Activity::Finalization(finalization);
333 if !self.config.marshal.verified(round, consensus_block).await {
334 warn_span!("follow_driver").in_scope(
335 || warn!(?round, %height, "marshal refused to persist the verified block"),
336 )
337 }
338
339 self.config.marshal.report(activity.clone()).await;
340 self.config.feed.report(activity).await;
341 Ok(())
342 }
343
344 #[instrument(skip_all)]
345 async fn process_update(&mut self, update: marshal::Update<Block>) {
346 let marshal::Update::Block(block, ack) = update else {
347 return;
348 };
349 let epoch_info = self
350 .config
351 .epoch_strategy
352 .containing(block.height())
353 .expect("strategy valid for all heights");
354 if epoch_info.last() == block.height() {
355 let onchain_outcome = tempo_dkg_onchain_artifacts::OnchainDkgOutcome::read(
356 &mut block.header().extra_data().as_ref(),
357 )
358 .expect("boundary blocks must contain DKG outcomes");
359 self.config.scheme_provider.register(
360 onchain_outcome.epoch,
361 Scheme::certificate_verifier(
362 crate::config::NAMESPACE,
363 *onchain_outcome.network_identity(),
364 ),
365 );
366 self.current_epoch = onchain_outcome.epoch;
367 }
368 ack.acknowledge();
369 }
370}