Skip to main content

tempo_consensus/follow/
driver.rs

1//! Follower sync driver.
2//!
3//! Subscribes to upstream finalization events and processes epoch boundary
4//! blocks for DKG scheme extraction. Non-boundary blocks are synced by Reth
5//! via P2P and fetched by marshal's gap-repair resolver on demand.
6
7use std::sync::Arc;
8
9use alloy_consensus::BlockHeader as _;
10use commonware_codec::{DecodeExt as _, ReadExt as _};
11use commonware_consensus::{
12    Epochable, Heightable as _, Reporter, marshal,
13    simplex::{
14        scheme::bls12381_threshold::vrf::Scheme,
15        types::{Activity, Finalization},
16    },
17    types::{Epoch, Epocher as _, FixedEpocher, Height},
18};
19use commonware_cryptography::{
20    Signer as _,
21    bls12381::primitives::variant::MinSig,
22    certificate::Provider,
23    ed25519::{self, PublicKey},
24};
25use commonware_math::algebra::Random as _;
26use commonware_parallel::Sequential;
27use commonware_runtime::{Clock, ContextCell, Spawner, spawn_cell};
28use commonware_utils::{Acknowledgement, vec::NonEmptyVec};
29use rand_08::{CryptoRng, Rng};
30
31use eyre::{OptionExt as _, Report, WrapErr as _, bail, ensure};
32use reth_provider::HeaderProvider as _;
33use tempo_chainspec::NetworkIdentity;
34use tempo_node::{TempoFullNode, rpc::consensus::Event};
35use tokio::{select, sync::mpsc};
36use tracing::{debug, instrument, warn, warn_span};
37
38use crate::{
39    consensus::{Digest, block::Block},
40    epoch::SchemeProvider,
41    feed,
42};
43
44pub(super) fn try_init<TContext>(
45    context: TContext,
46    config: Config,
47) -> eyre::Result<(Driver<TContext>, Mailbox)> {
48    let (tx, rx) = mpsc::unbounded_channel();
49    let mailbox = Mailbox(tx);
50
51    // Use the last boundary block available in the execution layer as the
52    // trusted starting point.
53    //
54    // TODO: Provide a certificate with the latest boundary to not just trust
55    // but also verify.
56    let last_finalized_number = config
57        .execution_node
58        .provider
59        .canonical_in_memory_state()
60        .get_finalized_num_hash()
61        .map_or(0u64, |num_hash| num_hash.number);
62
63    let epoch_info = config
64        .epoch_strategy
65        .containing(Height::new(last_finalized_number))
66        .expect("strategy valid for all heights and epochs");
67
68    let last_boundary = if epoch_info.last().get() == last_finalized_number {
69        epoch_info.last()
70    } else if let Some(previous) = epoch_info.epoch().previous() {
71        config
72            .epoch_strategy
73            .last(previous)
74            .expect("strategy valid for all heights and epochs")
75    } else {
76        Height::zero()
77    };
78    let onchain_outcome = tempo_dkg_onchain_artifacts::OnchainDkgOutcome::read(
79        &mut config
80            .execution_node
81            .provider
82            .header_by_number(last_boundary.get())
83            .map_err(Report::new)
84            .and_then(|maybe_header| maybe_header.ok_or_eyre("execution layer did not have header"))
85            .wrap_err_with(|| {
86                format!(
87                    "cannot establish baseline - unable to read the header \
88                    from the last boundary block at height `{last_boundary}` \
89                    from the execution layer"
90                )
91            })?
92            .extra_data()
93            .as_ref(),
94    )
95    .wrap_err_with(|| {
96        format!("the last boundary (`{last_boundary}`) block header did not contain a DKG outcome")
97    })?;
98
99    config.scheme_provider.register(
100        onchain_outcome.epoch,
101        Scheme::certificate_verifier(
102            crate::config::NAMESPACE,
103            *onchain_outcome.sharing().public(),
104        ),
105    );
106
107    let network_scheme = Arc::new(Scheme::certificate_verifier(
108        crate::config::NAMESPACE,
109        config.network_identity.identity,
110    ));
111
112    let actor = Driver {
113        context: ContextCell::new(context),
114        config,
115        mailbox: rx,
116        current_epoch: epoch_info.epoch(),
117        last_boundary,
118        network_scheme,
119    };
120    Ok((actor, mailbox))
121}
122
123pub(super) struct Config {
124    pub(super) execution_node: Arc<TempoFullNode>,
125    pub(super) scheme_provider: SchemeProvider,
126    pub(super) network_identity: NetworkIdentity,
127
128    // TODO: What to do with this information?
129    pub(super) last_finalized_height: Height,
130
131    pub(super) marshal: crate::alias::marshal::Mailbox,
132    pub(super) feed: feed::Mailbox,
133    pub(super) epoch_strategy: FixedEpocher,
134}
135
136#[derive(Debug)]
137enum Message {
138    Event(Box<Event>),
139    Finalized(marshal::Update<Block>),
140}
141
142impl From<Event> for Message {
143    fn from(value: Event) -> Self {
144        Self::Event(Box::new(value))
145    }
146}
147
148impl From<marshal::Update<Block>> for Message {
149    fn from(value: marshal::Update<Block>) -> Self {
150        Self::Finalized(value)
151    }
152}
153
154#[derive(Clone)]
155pub(super) struct Mailbox(mpsc::UnboundedSender<Message>);
156
157impl Mailbox {
158    pub(super) fn to_event_reporter(&self) -> EventReporter {
159        EventReporter(self.clone())
160    }
161
162    pub(super) fn to_marshal_reporter(&self) -> MarshalReporter {
163        MarshalReporter(self.clone())
164    }
165
166    fn send(&self, msg: impl Into<Message>) {
167        let _ = self.0.send(msg.into());
168    }
169}
170
171#[derive(Clone)]
172pub(super) struct EventReporter(Mailbox);
173
174impl Reporter for EventReporter {
175    type Activity = Event;
176
177    async fn report(&mut self, activity: Self::Activity) {
178        self.0.send(activity);
179    }
180}
181
182#[derive(Clone)]
183pub(super) struct MarshalReporter(Mailbox);
184
185impl Reporter for MarshalReporter {
186    type Activity = marshal::Update<Block>;
187
188    async fn report(&mut self, activity: Self::Activity) {
189        self.0.send(activity);
190    }
191}
192
193pub(super) struct Driver<TContext> {
194    context: ContextCell<TContext>,
195    config: Config,
196    mailbox: mpsc::UnboundedReceiver<Message>,
197
198    last_boundary: Height,
199    current_epoch: Epoch,
200    network_scheme: Arc<Scheme<PublicKey, MinSig>>,
201}
202
203impl<C: Clock + Rng + CryptoRng> Driver<C>
204where
205    C: Spawner,
206{
207    pub(super) fn start(mut self) -> commonware_runtime::Handle<()> {
208        spawn_cell!(self.context, self.run())
209    }
210
211    async fn run(mut self) {
212        self.config.marshal.set_floor(self.last_boundary).await;
213        if self.heal_gap().await.is_err() {
214            return;
215        };
216
217        loop {
218            select!(
219                biased;
220
221                Some(message) = self.mailbox.recv() => {
222                    match message {
223                        Message::Event(event) => {
224                            // Emits an event on error.
225                            let _: Result<_, _> = self.process_event(*event).await;
226                        }
227                        Message::Finalized(update) => {
228                            self.process_update(update).await;
229                        }
230                    }
231                }
232            );
233        }
234    }
235
236    /// Fills in the missing scheme if the execution layer did not persist.
237    #[instrument(skip_all, err(Display))]
238    async fn heal_gap(&mut self) -> eyre::Result<()> {
239        let current_consensus_epoch = self
240            .config
241            .epoch_strategy
242            .containing(self.config.last_finalized_height)
243            .expect("strategy is valid for all heights and epochs");
244
245        let current_execution_epoch = self
246            .config
247            .epoch_strategy
248            .containing(self.last_boundary)
249            .expect("strategy is valid for all heights and epochs");
250
251        if let Some(previous) = current_consensus_epoch.epoch().previous()
252            && previous > current_execution_epoch.epoch()
253        {
254            let last_consensus_boundary = self
255                .config
256                .epoch_strategy
257                .last(previous)
258                .expect("strategy is valid for all heights and epochs");
259
260            let Some(boundary_block) = self.config.marshal.get_block(last_consensus_boundary).await
261            else {
262                let consensus_epoch = current_consensus_epoch.epoch();
263                let execution_epoch = current_execution_epoch.epoch();
264                warn!(
265                    "cannot heal finalization gap; consensus layer epoch {consensus_epoch} is ahead \
266                    of execution layer epoch {execution_epoch}, but the consensus layer does not have \
267                    the boundary block at height `{last_consensus_boundary}`. The node likely previously skipped \
268                    epoch boundaries via the network identity and will continue to try use it to verify finalizations"
269                );
270
271                return Ok(());
272            };
273
274            let onchain_outcome = tempo_dkg_onchain_artifacts::OnchainDkgOutcome::read(
275                &mut boundary_block.header().extra_data().as_ref(),
276            )
277            .wrap_err_with(|| {
278                format!(
279                    "the boundary block at height `{last_consensus_boundary}` \
280                contained no or a malformed DKG outcome"
281                )
282            })?;
283
284            self.config.scheme_provider.register(
285                onchain_outcome.epoch,
286                Scheme::certificate_verifier(
287                    crate::config::NAMESPACE,
288                    *onchain_outcome.sharing().public(),
289                ),
290            );
291        } else {
292            debug!("no gap detected");
293        }
294
295        Ok(())
296    }
297
298    #[instrument(skip_all, err(Display))]
299    async fn process_event(&mut self, event: Event) -> eyre::Result<()> {
300        let Event::Finalized {
301            block: certified, ..
302        } = event
303        else {
304            return Ok(());
305        };
306
307        // TODO: ensure well-formedness at the type level so we don't need extra decoding here.
308        let finalization = alloy_primitives::hex::decode(&certified.certificate)
309            .map_err(Report::new)
310            .and_then(|bytes| {
311                Finalization::<Scheme<PublicKey, MinSig>, Digest>::decode(&*bytes)
312                    .map_err(Report::new)
313            })
314            .wrap_err("event contained a malformed finalization certificate")?;
315
316        let height = Height::new(certified.block.number());
317        let consensus_block = Block::from_execution_block_unchecked(certified.block, None);
318        ensure!(
319            finalization.proposal.payload == consensus_block.digest(),
320            "mismatch in finalization and block digest"
321        );
322
323        let finalization_epoch = finalization.epoch();
324        if finalization_epoch > self.current_epoch {
325            let stub_peers =
326                NonEmptyVec::new(ed25519::PrivateKey::random(&mut self.context).public_key());
327
328            let boundary_height = self
329                .config
330                .epoch_strategy
331                .last(self.current_epoch)
332                .expect("strategy is valid for all epochs and heights");
333
334            debug!(
335                %self.current_epoch,
336                %boundary_height,
337                "hinting to sync system that a finalization certificate might be \
338                available for our current epoch",
339            );
340
341            // In the event our network identity cannot verify this finalization,
342            // hint the boundary of the current epoch to progress.
343            self.config
344                .marshal
345                .hint_finalized(boundary_height, stub_peers.clone())
346                .await;
347
348            if let Some(one_before_boundary) = boundary_height.previous() {
349                self.config.marshal.set_floor(one_before_boundary).await;
350            }
351
352            let network_identity = self.config.network_identity.clone();
353            if finalization_epoch.get() < network_identity.from_epoch {
354                return Ok(());
355            }
356        }
357
358        let can_use_network_identity_fallback =
359            finalization_epoch.get() >= self.config.network_identity.from_epoch;
360
361        let scheme = match self.config.scheme_provider.scoped(finalization_epoch) {
362            Some(scheme) => scheme,
363            None if can_use_network_identity_fallback => self.network_scheme.clone(),
364            None => bail!(
365                "finalization epoch `{finalization_epoch}` behind network identity starting epoch `{}`; current epoch `{}`",
366                self.config.network_identity.from_epoch,
367                self.current_epoch
368            ),
369        };
370
371        // If we can accept this cert, jump to it and set the floor as the
372        // upstream may have pruned any intermediatery blocks.
373        if finalization.verify(&mut self.context, &scheme, &Sequential) {
374            let round = finalization.round();
375            let activity = Activity::Finalization(finalization);
376            if !self.config.marshal.verified(round, consensus_block).await {
377                warn_span!("follow_driver").in_scope(
378                    || warn!(?round, %height, "marshal refused to persist the verified block"),
379                )
380            }
381
382            if let Some(one_before_block) = height.previous() {
383                self.config.marshal.set_floor(one_before_block).await;
384            }
385
386            self.config.marshal.report(activity.clone()).await;
387            self.config.feed.report(activity).await;
388        } else {
389            debug!(%finalization_epoch, %height, "failed finalization certificate verification")
390        }
391
392        Ok(())
393    }
394
395    #[instrument(skip_all)]
396    async fn process_update(&mut self, update: marshal::Update<Block>) {
397        let marshal::Update::Block(block, ack) = update else {
398            return;
399        };
400
401        let epoch_info = self
402            .config
403            .epoch_strategy
404            .containing(block.height())
405            .expect("strategy valid for all heights");
406
407        if epoch_info.last() == block.height() {
408            let onchain_outcome = tempo_dkg_onchain_artifacts::OnchainDkgOutcome::read(
409                &mut block.header().extra_data().as_ref(),
410            )
411            .expect("boundary blocks must contain DKG outcomes");
412
413            let network_identity = &self.config.network_identity;
414            if onchain_outcome.epoch.get() >= network_identity.from_epoch
415                && network_identity.identity != *onchain_outcome.network_identity()
416            {
417                warn!(
418                    compiled_from_epoch = network_identity.from_epoch,
419                    onchain_epoch = %onchain_outcome.epoch,
420                    compiled_network_identity = %network_identity.identity,
421                    onchain_network_identity = %onchain_outcome.network_identity(),
422                    "Network identity differs from the onchain DKG outcome!!! Update the binary with the latest network identity"
423                );
424            }
425
426            self.config.scheme_provider.register(
427                onchain_outcome.epoch,
428                Scheme::certificate_verifier(
429                    crate::config::NAMESPACE,
430                    *onchain_outcome.network_identity(),
431                ),
432            );
433
434            self.current_epoch = onchain_outcome.epoch;
435        } else {
436            // If not a boundary block, we may have fast forwarded
437            self.current_epoch = epoch_info.epoch();
438        }
439
440        ack.acknowledge();
441    }
442}