Skip to main content

tempo_commonware_node/follow/
driver.rs

1//! Follower sync driver.
2//!
3//! Subscribes to upstream finalization events and processes epoch boundary
4//! blocks for DKG scheme extraction. Non-boundary blocks are synced by Reth
5//! via P2P and fetched by marshal's gap-repair resolver on demand.
6
7use std::sync::Arc;
8
9use alloy_consensus::BlockHeader as _;
10use commonware_codec::{DecodeExt as _, ReadExt as _};
11use commonware_consensus::{
12    Epochable, Heightable as _, Reporter, marshal,
13    simplex::{
14        scheme::bls12381_threshold::vrf::Scheme,
15        types::{Activity, Finalization},
16    },
17    types::{Epoch, Epocher as _, FixedEpocher, Height, Round, View},
18};
19use commonware_cryptography::{
20    Signer as _,
21    bls12381::primitives::variant::MinSig,
22    ed25519::{self, PublicKey},
23};
24use commonware_math::algebra::Random as _;
25use commonware_runtime::{Clock, ContextCell, Spawner, spawn_cell};
26use commonware_utils::{Acknowledgement, vec::NonEmptyVec};
27use rand_08::{CryptoRng, Rng};
28
29use eyre::{OptionExt as _, Report, WrapErr as _};
30use reth_node_core::primitives::SealedBlock;
31use reth_provider::HeaderProvider as _;
32use tempo_node::{TempoFullNode, rpc::consensus::Event};
33use tokio::{select, sync::mpsc};
34use tracing::{debug, instrument, warn, warn_span};
35
36use crate::{
37    consensus::{Digest, block::Block},
38    epoch::SchemeProvider,
39    feed,
40};
41
42pub(super) fn try_init<TContext>(
43    context: TContext,
44    config: Config,
45) -> eyre::Result<(Driver<TContext>, Mailbox)> {
46    let (tx, rx) = mpsc::unbounded_channel();
47    let mailbox = Mailbox(tx);
48
49    // Use the last boundary block available in the execution layer as the
50    // trusted starting point.
51    //
52    // TODO: Provide a certificate with the latest boundary to not just trust
53    // but also verify.
54    let last_finalized_number = config
55        .execution_node
56        .provider
57        .canonical_in_memory_state()
58        .get_finalized_num_hash()
59        .map_or(0u64, |num_hash| num_hash.number);
60
61    let epoch_info = config
62        .epoch_strategy
63        .containing(Height::new(last_finalized_number))
64        .expect("strategy valid for all heights and epochs");
65
66    let last_boundary = if epoch_info.last().get() == last_finalized_number {
67        epoch_info.last()
68    } else if let Some(previous) = epoch_info.epoch().previous() {
69        config
70            .epoch_strategy
71            .last(previous)
72            .expect("strategy valid for all heights and epochs")
73    } else {
74        Height::zero()
75    };
76    let onchain_outcome = tempo_dkg_onchain_artifacts::OnchainDkgOutcome::read(
77        &mut config
78            .execution_node
79            .provider
80            .header_by_number(last_boundary.get())
81            .map_err(Report::new)
82            .and_then(|maybe_header| maybe_header.ok_or_eyre("execution layer did not have header"))
83            .wrap_err_with(|| {
84                format!(
85                    "cannot establish baseline - unable to read the header \
86                    from the last boundary block at height `{last_boundary}` \
87                    from the execution layer"
88                )
89            })?
90            .extra_data()
91            .as_ref(),
92    )
93    .wrap_err("the genesis header did not contain a DKG outcome")?;
94
95    config.scheme_provider.register(
96        onchain_outcome.epoch,
97        Scheme::certificate_verifier(
98            crate::config::NAMESPACE,
99            *onchain_outcome.sharing().public(),
100        ),
101    );
102
103    let actor = Driver {
104        context: ContextCell::new(context),
105        config,
106        mailbox: rx,
107        current_epoch: epoch_info.epoch(),
108        last_boundary,
109    };
110    Ok((actor, mailbox))
111}
112
113pub(super) struct Config {
114    pub(super) execution_node: Arc<TempoFullNode>,
115    pub(super) scheme_provider: SchemeProvider,
116
117    // TODO: What to do with this information?
118    pub(super) last_finalized_height: Height,
119
120    pub(super) marshal: crate::alias::marshal::Mailbox,
121    pub(super) feed: feed::Mailbox,
122    pub(super) epoch_strategy: FixedEpocher,
123}
124
125#[derive(Debug)]
126enum Message {
127    Event(Event),
128    Finalized(marshal::Update<Block>),
129}
130
131impl From<Event> for Message {
132    fn from(value: Event) -> Self {
133        Self::Event(value)
134    }
135}
136
137impl From<marshal::Update<Block>> for Message {
138    fn from(value: marshal::Update<Block>) -> Self {
139        Self::Finalized(value)
140    }
141}
142
143#[derive(Clone)]
144pub(super) struct Mailbox(mpsc::UnboundedSender<Message>);
145
146impl Mailbox {
147    pub(super) fn to_event_reporter(&self) -> EventReporter {
148        EventReporter(self.clone())
149    }
150
151    pub(super) fn to_marshal_reporter(&self) -> MarshalReporter {
152        MarshalReporter(self.clone())
153    }
154
155    fn send(&self, msg: impl Into<Message>) {
156        let _ = self.0.send(msg.into());
157    }
158}
159
160#[derive(Clone)]
161pub(super) struct EventReporter(Mailbox);
162
163impl Reporter for EventReporter {
164    type Activity = Event;
165
166    async fn report(&mut self, activity: Self::Activity) {
167        self.0.send(activity);
168    }
169}
170
171#[derive(Clone)]
172pub(super) struct MarshalReporter(Mailbox);
173
174impl Reporter for MarshalReporter {
175    type Activity = marshal::Update<Block>;
176
177    async fn report(&mut self, activity: Self::Activity) {
178        self.0.send(activity);
179    }
180}
181
182pub(super) struct Driver<TContext> {
183    context: ContextCell<TContext>,
184    config: Config,
185    mailbox: mpsc::UnboundedReceiver<Message>,
186
187    last_boundary: Height,
188    current_epoch: Epoch,
189}
190
191impl<C: Clock + Rng + CryptoRng> Driver<C>
192where
193    C: Spawner,
194{
195    pub(super) fn start(mut self) -> commonware_runtime::Handle<()> {
196        spawn_cell!(self.context, self.run())
197    }
198
199    async fn run(mut self) {
200        self.config.marshal.set_floor(self.last_boundary).await;
201        if self.heal_gap().await.is_err() {
202            return;
203        };
204
205        loop {
206            select!(
207                biased;
208
209                Some(message) = self.mailbox.recv() => {
210                    match message {
211                        Message::Event(event) => {
212                            // Emits an event on error.
213                            let _: Result<_, _> = self.process_event(event).await;
214                        }
215                        Message::Finalized(update) => {
216                            self.process_update(update).await;
217                        }
218                    }
219                }
220            );
221        }
222    }
223
224    /// Fills in the missing scheme if the execution layer did not persist.
225    #[instrument(skip_all, err(Display))]
226    async fn heal_gap(&mut self) -> eyre::Result<()> {
227        let current_consensus_epoch = self
228            .config
229            .epoch_strategy
230            .containing(self.config.last_finalized_height)
231            .expect("strategy is valid for all heights and epochs");
232        let current_execution_epoch = self
233            .config
234            .epoch_strategy
235            .containing(self.last_boundary)
236            .expect("strategy is valid for all heights and epochs");
237
238        if let Some(previous) = current_consensus_epoch.epoch().previous()
239            && previous > current_execution_epoch.epoch()
240        {
241            let last_consensus_boundary = self
242                .config
243                .epoch_strategy
244                .last(previous)
245                .expect("strategy is valid for all heights and epochs");
246            let boundary_block = self
247                .config
248                .marshal
249                .get_block(last_consensus_boundary)
250                .await
251                .ok_or_else(|| {
252                    eyre::eyre!(
253                        "cannot heal finalization gap; consensus layer is \
254                        ahead of execution layer, but consensus layer does not \
255                        have boundary block at height \
256                        `{last_consensus_boundary}`"
257                    )
258                })?;
259
260            let onchain_outcome = tempo_dkg_onchain_artifacts::OnchainDkgOutcome::read(
261                &mut boundary_block.header().extra_data().as_ref(),
262            )
263            .wrap_err_with(|| {
264                format!(
265                    "the boundary block at height `{last_consensus_boundary}` \
266                contained no or a malformed DKG outcome"
267                )
268            })?;
269
270            self.config.scheme_provider.register(
271                onchain_outcome.epoch,
272                Scheme::certificate_verifier(
273                    crate::config::NAMESPACE,
274                    *onchain_outcome.sharing().public(),
275                ),
276            );
277        } else {
278            debug!("no gap detected");
279        }
280
281        Ok(())
282    }
283
284    #[instrument(skip_all, err(Display))]
285    async fn process_event(&mut self, event: Event) -> eyre::Result<()> {
286        let Event::Finalized {
287            block: certified, ..
288        } = event
289        else {
290            return Ok(());
291        };
292
293        // TODO: ensure well-formedness at the type level so we don't need extra
294        // decoding here.
295        let finalization = alloy_primitives::hex::decode(&certified.certificate)
296            .map_err(Report::new)
297            .and_then(|bytes| {
298                Finalization::<Scheme<PublicKey, MinSig>, Digest>::decode(&*bytes)
299                    .map_err(Report::new)
300            })
301            .wrap_err("event contained a malformed finalization certificate")?;
302
303        if finalization.epoch() > self.current_epoch {
304            let boundary_height = self
305                .config
306                .epoch_strategy
307                .last(self.current_epoch)
308                .expect("strategy is valid for all epochs and heights");
309
310            self.config
311                .marshal
312                .hint_finalized(
313                    boundary_height,
314                    // XXX: we know for a fact that the resolver used by the marshal
315                    // actor ignores the target, so we just give it a dummy key.
316                    NonEmptyVec::new(ed25519::PrivateKey::random(&mut self.context).public_key()),
317                )
318                .await;
319
320            if let Some(one_before_boundary) = boundary_height.previous() {
321                self.config.marshal.set_floor(one_before_boundary).await;
322            }
323
324            return Ok(());
325        }
326
327        let height = certified.block.number();
328        let consensus_block = Block::from_execution_block(SealedBlock::seal_slow(certified.block));
329
330        // Store the Finalized Block
331        let round = Round::new(Epoch::new(certified.epoch), View::new(certified.view));
332        let activity = Activity::Finalization(finalization);
333        if !self.config.marshal.verified(round, consensus_block).await {
334            warn_span!("follow_driver").in_scope(
335                || warn!(?round, %height, "marshal refused to persist the verified block"),
336            )
337        }
338
339        self.config.marshal.report(activity.clone()).await;
340        self.config.feed.report(activity).await;
341        Ok(())
342    }
343
344    #[instrument(skip_all)]
345    async fn process_update(&mut self, update: marshal::Update<Block>) {
346        let marshal::Update::Block(block, ack) = update else {
347            return;
348        };
349        let epoch_info = self
350            .config
351            .epoch_strategy
352            .containing(block.height())
353            .expect("strategy valid for all heights");
354        if epoch_info.last() == block.height() {
355            let onchain_outcome = tempo_dkg_onchain_artifacts::OnchainDkgOutcome::read(
356                &mut block.header().extra_data().as_ref(),
357            )
358            .expect("boundary blocks must contain DKG outcomes");
359            self.config.scheme_provider.register(
360                onchain_outcome.epoch,
361                Scheme::certificate_verifier(
362                    crate::config::NAMESPACE,
363                    *onchain_outcome.network_identity(),
364                ),
365            );
366            self.current_epoch = onchain_outcome.epoch;
367        }
368        ack.acknowledge();
369    }
370}