1use std::sync::Arc;
8
9use alloy_consensus::BlockHeader as _;
10use commonware_codec::{DecodeExt as _, ReadExt as _};
11use commonware_consensus::{
12 Epochable, Heightable as _, Reporter, marshal,
13 simplex::{
14 scheme::bls12381_threshold::vrf::Scheme,
15 types::{Activity, Finalization},
16 },
17 types::{Epoch, Epocher as _, FixedEpocher, Height},
18};
19use commonware_cryptography::{
20 Signer as _,
21 bls12381::primitives::variant::MinSig,
22 certificate::Provider,
23 ed25519::{self, PublicKey},
24};
25use commonware_math::algebra::Random as _;
26use commonware_parallel::Sequential;
27use commonware_runtime::{Clock, ContextCell, Spawner, spawn_cell};
28use commonware_utils::{Acknowledgement, vec::NonEmptyVec};
29use rand_08::{CryptoRng, Rng};
30
31use eyre::{OptionExt as _, Report, WrapErr as _, bail, ensure};
32use reth_provider::HeaderProvider as _;
33use tempo_chainspec::NetworkIdentity;
34use tempo_node::{TempoFullNode, rpc::consensus::Event};
35use tokio::{select, sync::mpsc};
36use tracing::{debug, instrument, warn, warn_span};
37
38use crate::{
39 consensus::{Digest, block::Block},
40 epoch::SchemeProvider,
41 feed,
42};
43
44pub(super) fn try_init<TContext>(
45 context: TContext,
46 config: Config,
47) -> eyre::Result<(Driver<TContext>, Mailbox)> {
48 let (tx, rx) = mpsc::unbounded_channel();
49 let mailbox = Mailbox(tx);
50
51 let last_finalized_number = config
57 .execution_node
58 .provider
59 .canonical_in_memory_state()
60 .get_finalized_num_hash()
61 .map_or(0u64, |num_hash| num_hash.number);
62
63 let epoch_info = config
64 .epoch_strategy
65 .containing(Height::new(last_finalized_number))
66 .expect("strategy valid for all heights and epochs");
67
68 let last_boundary = if epoch_info.last().get() == last_finalized_number {
69 epoch_info.last()
70 } else if let Some(previous) = epoch_info.epoch().previous() {
71 config
72 .epoch_strategy
73 .last(previous)
74 .expect("strategy valid for all heights and epochs")
75 } else {
76 Height::zero()
77 };
78 let onchain_outcome = tempo_dkg_onchain_artifacts::OnchainDkgOutcome::read(
79 &mut config
80 .execution_node
81 .provider
82 .header_by_number(last_boundary.get())
83 .map_err(Report::new)
84 .and_then(|maybe_header| maybe_header.ok_or_eyre("execution layer did not have header"))
85 .wrap_err_with(|| {
86 format!(
87 "cannot establish baseline - unable to read the header \
88 from the last boundary block at height `{last_boundary}` \
89 from the execution layer"
90 )
91 })?
92 .extra_data()
93 .as_ref(),
94 )
95 .wrap_err_with(|| {
96 format!("the last boundary (`{last_boundary}`) block header did not contain a DKG outcome")
97 })?;
98
99 config.scheme_provider.register(
100 onchain_outcome.epoch,
101 Scheme::certificate_verifier(
102 crate::config::NAMESPACE,
103 *onchain_outcome.sharing().public(),
104 ),
105 );
106
107 let network_scheme = Arc::new(Scheme::certificate_verifier(
108 crate::config::NAMESPACE,
109 config.network_identity.identity,
110 ));
111
112 let actor = Driver {
113 context: ContextCell::new(context),
114 config,
115 mailbox: rx,
116 current_epoch: epoch_info.epoch(),
117 last_boundary,
118 network_scheme,
119 };
120 Ok((actor, mailbox))
121}
122
123pub(super) struct Config {
124 pub(super) execution_node: Arc<TempoFullNode>,
125 pub(super) scheme_provider: SchemeProvider,
126 pub(super) network_identity: NetworkIdentity,
127
128 pub(super) last_finalized_height: Height,
130
131 pub(super) marshal: crate::alias::marshal::Mailbox,
132 pub(super) feed: feed::Mailbox,
133 pub(super) epoch_strategy: FixedEpocher,
134}
135
136#[derive(Debug)]
137enum Message {
138 Event(Box<Event>),
139 Finalized(marshal::Update<Block>),
140}
141
142impl From<Event> for Message {
143 fn from(value: Event) -> Self {
144 Self::Event(Box::new(value))
145 }
146}
147
148impl From<marshal::Update<Block>> for Message {
149 fn from(value: marshal::Update<Block>) -> Self {
150 Self::Finalized(value)
151 }
152}
153
154#[derive(Clone)]
155pub(super) struct Mailbox(mpsc::UnboundedSender<Message>);
156
157impl Mailbox {
158 pub(super) fn to_event_reporter(&self) -> EventReporter {
159 EventReporter(self.clone())
160 }
161
162 pub(super) fn to_marshal_reporter(&self) -> MarshalReporter {
163 MarshalReporter(self.clone())
164 }
165
166 fn send(&self, msg: impl Into<Message>) {
167 let _ = self.0.send(msg.into());
168 }
169}
170
171#[derive(Clone)]
172pub(super) struct EventReporter(Mailbox);
173
174impl Reporter for EventReporter {
175 type Activity = Event;
176
177 async fn report(&mut self, activity: Self::Activity) {
178 self.0.send(activity);
179 }
180}
181
182#[derive(Clone)]
183pub(super) struct MarshalReporter(Mailbox);
184
185impl Reporter for MarshalReporter {
186 type Activity = marshal::Update<Block>;
187
188 async fn report(&mut self, activity: Self::Activity) {
189 self.0.send(activity);
190 }
191}
192
193pub(super) struct Driver<TContext> {
194 context: ContextCell<TContext>,
195 config: Config,
196 mailbox: mpsc::UnboundedReceiver<Message>,
197
198 last_boundary: Height,
199 current_epoch: Epoch,
200 network_scheme: Arc<Scheme<PublicKey, MinSig>>,
201}
202
203impl<C: Clock + Rng + CryptoRng> Driver<C>
204where
205 C: Spawner,
206{
207 pub(super) fn start(mut self) -> commonware_runtime::Handle<()> {
208 spawn_cell!(self.context, self.run())
209 }
210
211 async fn run(mut self) {
212 self.config.marshal.set_floor(self.last_boundary).await;
213 if self.heal_gap().await.is_err() {
214 return;
215 };
216
217 loop {
218 select!(
219 biased;
220
221 Some(message) = self.mailbox.recv() => {
222 match message {
223 Message::Event(event) => {
224 let _: Result<_, _> = self.process_event(*event).await;
226 }
227 Message::Finalized(update) => {
228 self.process_update(update).await;
229 }
230 }
231 }
232 );
233 }
234 }
235
236 #[instrument(skip_all, err(Display))]
238 async fn heal_gap(&mut self) -> eyre::Result<()> {
239 let current_consensus_epoch = self
240 .config
241 .epoch_strategy
242 .containing(self.config.last_finalized_height)
243 .expect("strategy is valid for all heights and epochs");
244
245 let current_execution_epoch = self
246 .config
247 .epoch_strategy
248 .containing(self.last_boundary)
249 .expect("strategy is valid for all heights and epochs");
250
251 if let Some(previous) = current_consensus_epoch.epoch().previous()
252 && previous > current_execution_epoch.epoch()
253 {
254 let last_consensus_boundary = self
255 .config
256 .epoch_strategy
257 .last(previous)
258 .expect("strategy is valid for all heights and epochs");
259
260 let Some(boundary_block) = self.config.marshal.get_block(last_consensus_boundary).await
261 else {
262 let consensus_epoch = current_consensus_epoch.epoch();
263 let execution_epoch = current_execution_epoch.epoch();
264 warn!(
265 "cannot heal finalization gap; consensus layer epoch {consensus_epoch} is ahead \
266 of execution layer epoch {execution_epoch}, but the consensus layer does not have \
267 the boundary block at height `{last_consensus_boundary}`. The node likely previously skipped \
268 epoch boundaries via the network identity and will continue to try use it to verify finalizations"
269 );
270
271 return Ok(());
272 };
273
274 let onchain_outcome = tempo_dkg_onchain_artifacts::OnchainDkgOutcome::read(
275 &mut boundary_block.header().extra_data().as_ref(),
276 )
277 .wrap_err_with(|| {
278 format!(
279 "the boundary block at height `{last_consensus_boundary}` \
280 contained no or a malformed DKG outcome"
281 )
282 })?;
283
284 self.config.scheme_provider.register(
285 onchain_outcome.epoch,
286 Scheme::certificate_verifier(
287 crate::config::NAMESPACE,
288 *onchain_outcome.sharing().public(),
289 ),
290 );
291 } else {
292 debug!("no gap detected");
293 }
294
295 Ok(())
296 }
297
298 #[instrument(skip_all, err(Display))]
299 async fn process_event(&mut self, event: Event) -> eyre::Result<()> {
300 let Event::Finalized {
301 block: certified, ..
302 } = event
303 else {
304 return Ok(());
305 };
306
307 let finalization = alloy_primitives::hex::decode(&certified.certificate)
309 .map_err(Report::new)
310 .and_then(|bytes| {
311 Finalization::<Scheme<PublicKey, MinSig>, Digest>::decode(&*bytes)
312 .map_err(Report::new)
313 })
314 .wrap_err("event contained a malformed finalization certificate")?;
315
316 let height = Height::new(certified.block.number());
317 let consensus_block = Block::from_execution_block_unchecked(certified.block, None);
318 ensure!(
319 finalization.proposal.payload == consensus_block.digest(),
320 "mismatch in finalization and block digest"
321 );
322
323 let finalization_epoch = finalization.epoch();
324 if finalization_epoch > self.current_epoch {
325 let stub_peers =
326 NonEmptyVec::new(ed25519::PrivateKey::random(&mut self.context).public_key());
327
328 let boundary_height = self
329 .config
330 .epoch_strategy
331 .last(self.current_epoch)
332 .expect("strategy is valid for all epochs and heights");
333
334 debug!(
335 %self.current_epoch,
336 %boundary_height,
337 "hinting to sync system that a finalization certificate might be \
338 available for our current epoch",
339 );
340
341 self.config
344 .marshal
345 .hint_finalized(boundary_height, stub_peers.clone())
346 .await;
347
348 if let Some(one_before_boundary) = boundary_height.previous() {
349 self.config.marshal.set_floor(one_before_boundary).await;
350 }
351
352 let network_identity = self.config.network_identity.clone();
353 if finalization_epoch.get() < network_identity.from_epoch {
354 return Ok(());
355 }
356 }
357
358 let can_use_network_identity_fallback =
359 finalization_epoch.get() >= self.config.network_identity.from_epoch;
360
361 let scheme = match self.config.scheme_provider.scoped(finalization_epoch) {
362 Some(scheme) => scheme,
363 None if can_use_network_identity_fallback => self.network_scheme.clone(),
364 None => bail!(
365 "finalization epoch `{finalization_epoch}` behind network identity starting epoch `{}`; current epoch `{}`",
366 self.config.network_identity.from_epoch,
367 self.current_epoch
368 ),
369 };
370
371 if finalization.verify(&mut self.context, &scheme, &Sequential) {
374 let round = finalization.round();
375 let activity = Activity::Finalization(finalization);
376 if !self.config.marshal.verified(round, consensus_block).await {
377 warn_span!("follow_driver").in_scope(
378 || warn!(?round, %height, "marshal refused to persist the verified block"),
379 )
380 }
381
382 if let Some(one_before_block) = height.previous() {
383 self.config.marshal.set_floor(one_before_block).await;
384 }
385
386 self.config.marshal.report(activity.clone()).await;
387 self.config.feed.report(activity).await;
388 } else {
389 debug!(%finalization_epoch, %height, "failed finalization certificate verification")
390 }
391
392 Ok(())
393 }
394
395 #[instrument(skip_all)]
396 async fn process_update(&mut self, update: marshal::Update<Block>) {
397 let marshal::Update::Block(block, ack) = update else {
398 return;
399 };
400
401 let epoch_info = self
402 .config
403 .epoch_strategy
404 .containing(block.height())
405 .expect("strategy valid for all heights");
406
407 if epoch_info.last() == block.height() {
408 let onchain_outcome = tempo_dkg_onchain_artifacts::OnchainDkgOutcome::read(
409 &mut block.header().extra_data().as_ref(),
410 )
411 .expect("boundary blocks must contain DKG outcomes");
412
413 let network_identity = &self.config.network_identity;
414 if onchain_outcome.epoch.get() >= network_identity.from_epoch
415 && network_identity.identity != *onchain_outcome.network_identity()
416 {
417 warn!(
418 compiled_from_epoch = network_identity.from_epoch,
419 onchain_epoch = %onchain_outcome.epoch,
420 compiled_network_identity = %network_identity.identity,
421 onchain_network_identity = %onchain_outcome.network_identity(),
422 "Network identity differs from the onchain DKG outcome!!! Update the binary with the latest network identity"
423 );
424 }
425
426 self.config.scheme_provider.register(
427 onchain_outcome.epoch,
428 Scheme::certificate_verifier(
429 crate::config::NAMESPACE,
430 *onchain_outcome.network_identity(),
431 ),
432 );
433
434 self.current_epoch = onchain_outcome.epoch;
435 } else {
436 self.current_epoch = epoch_info.epoch();
438 }
439
440 ack.acknowledge();
441 }
442}