Skip to main content

tempo_commonware_node/feed/
state.rs

1//! Shared state for the feed module.
2
3use crate::alias::marshal;
4use alloy_consensus::BlockHeader as _;
5use alloy_primitives::hex;
6use commonware_codec::{Encode, ReadExt as _};
7use commonware_consensus::{
8    marshal::Identifier,
9    types::{Epoch, Epocher as _, FixedEpocher, Height, Round, View},
10};
11use commonware_cryptography::bls12381::primitives::variant::{MinSig, Variant};
12use parking_lot::RwLock;
13use reth_node_core::rpc::compat::FromConsensusHeader;
14use reth_provider::HeaderProvider as _;
15use std::sync::{Arc, OnceLock};
16use tempo_alloy::rpc::TempoHeaderResponse;
17use tempo_dkg_onchain_artifacts::OnchainDkgOutcome;
18use tempo_node::{
19    TempoFullNode,
20    rpc::consensus::{
21        CertifiedBlock, ConsensusFeed, ConsensusState, Event, IdentityProofError,
22        IdentityTransition, IdentityTransitionResponse, Query, TransitionProofData,
23        types::Response,
24    },
25};
26use tokio::sync::broadcast;
27use tracing::{Level, instrument};
28
29const BROADCAST_CHANNEL_SIZE: usize = 1024;
30
31/// Internal shared state for the feed.
32pub(super) struct FeedState {
33    /// Latest notarized block.
34    pub(super) latest_notarized: Option<CertifiedBlock>,
35    /// Latest finalized block.
36    pub(super) latest_finalized: Option<CertifiedBlock>,
37}
38
39/// Cached identity transition chain.
40///
41/// Stores transitions from a starting epoch back towards genesis.
42/// Can be extended for newer epochs or subsectioned for older queries.
43#[derive(Clone)]
44struct IdentityTransitionCache {
45    /// The epoch from which the chain was built (inclusive).
46    from_epoch: u64,
47    /// Public key at `from_epoch`.
48    from_pubkey: <MinSig as Variant>::Public,
49    /// The earliest epoch we walked to (0 if we reached genesis).
50    to_epoch: u64,
51    /// The public key at `to_epoch`.
52    to_pubkey: <MinSig as Variant>::Public,
53    /// Cached transitions, ordered newest to oldest.
54    transitions: Arc<Vec<IdentityTransition>>,
55}
56
57/// Handle to shared feed state.
58///
59/// This handle can be cloned and used by both:
60/// - The feed actor (to update state when processing Activity)
61/// - RPC handlers (implements `ConsensusFeed`)
62#[derive(Clone)]
63pub struct FeedStateHandle {
64    state: Arc<RwLock<FeedState>>,
65    marshal: Arc<OnceLock<marshal::Mailbox>>,
66    epocher: Arc<OnceLock<FixedEpocher>>,
67    execution_node: Arc<OnceLock<Arc<TempoFullNode>>>,
68    events_tx: broadcast::Sender<Event>,
69    /// Cache for identity transition proofs to avoid re-walking the chain.
70    identity_cache: Arc<RwLock<Option<IdentityTransitionCache>>>,
71}
72
73impl FeedStateHandle {
74    /// Create a new feed state handle.
75    ///
76    /// The marshal mailbox can be set later using `set_marshal`.
77    /// Until set, historical finalization lookups will return `None`.
78    pub fn new() -> Self {
79        let (events_tx, _) = broadcast::channel(BROADCAST_CHANNEL_SIZE);
80        Self {
81            state: Arc::new(RwLock::new(FeedState {
82                latest_notarized: None,
83                latest_finalized: None,
84            })),
85            marshal: Arc::new(OnceLock::new()),
86            epocher: Arc::new(OnceLock::new()),
87            execution_node: Arc::new(OnceLock::new()),
88            events_tx,
89            identity_cache: Arc::new(RwLock::new(None)),
90        }
91    }
92
93    /// Set the marshal mailbox for historical finalization lookups. Should only be called once.
94    pub(crate) fn set_marshal(&self, marshal: marshal::Mailbox) {
95        let _ = self.marshal.set(marshal);
96    }
97
98    /// Set the epocher for epoch boundary calculations. Should only be called once.
99    pub(crate) fn set_epocher(&self, epocher: FixedEpocher) {
100        let _ = self.epocher.set(epocher);
101    }
102
103    /// Set the execution node for header lookups. Should only be called once.
104    pub(crate) fn set_execution_node(&self, execution_node: Arc<TempoFullNode>) {
105        let _ = self.execution_node.set(execution_node);
106    }
107
108    /// Get the broadcast sender for events.
109    pub(super) fn events_tx(&self) -> &broadcast::Sender<Event> {
110        &self.events_tx
111    }
112
113    /// Get read access to the internal state.
114    pub(super) fn read(&self) -> parking_lot::RwLockReadGuard<'_, FeedState> {
115        self.state.read()
116    }
117
118    /// Get write access to the internal state.
119    pub(super) fn write(&self) -> parking_lot::RwLockWriteGuard<'_, FeedState> {
120        self.state.write()
121    }
122
123    /// Get the marshal mailbox, logging if not yet set.
124    fn marshal(&self) -> Option<marshal::Mailbox> {
125        let marshal = self.marshal.get().cloned();
126        if marshal.is_none() {
127            tracing::debug!("marshal not yet set");
128        }
129        marshal
130    }
131
132    /// Get the epocher, logging if not yet set.
133    fn epocher(&self) -> Option<FixedEpocher> {
134        let epocher = self.epocher.get().cloned();
135        if epocher.is_none() {
136            tracing::debug!("epocher not yet set");
137        }
138        epocher
139    }
140
141    /// Ensure the identity cache covers `start_epoch` by walking backwards
142    /// if needed. After this returns, the cache is guaranteed to contain
143    /// transition data covering `start_epoch` (as far back as available data allows).
144    #[instrument(skip_all, fields(start_epoch), err)]
145    async fn try_fill_transitions(
146        &self,
147        marshal: &mut marshal::Mailbox,
148        execution: &TempoFullNode,
149        epocher: &FixedEpocher,
150        start_epoch: u64,
151    ) -> Result<(), IdentityProofError> {
152        // Check if the cache already covers this epoch.
153        // If the cache is incomplete, skip the early return so we re-attempt
154        // the walk from where it previously stopped.
155        let cached = self.identity_cache.read().clone();
156        if let Some(cache) = &cached
157            && cache.to_epoch == 0
158            && (cache.to_epoch..=cache.from_epoch).contains(&start_epoch)
159        {
160            return Ok(());
161        }
162
163        // Identity active at epoch N is set by the last block of epoch N-1
164        let epoch_outcome = get_outcome(execution, epocher, start_epoch.saturating_sub(1))?;
165        let epoch_pubkey = *epoch_outcome.sharing().public();
166
167        // Fast path: if the identity matches the cached one and the cache is
168        // complete, just extend the upper bound — no new transitions needed.
169        if let Some(cache) = &cached
170            && start_epoch > cache.from_epoch
171            && cache.to_epoch == 0
172            && cache.from_pubkey == epoch_pubkey
173        {
174            let mut updated = cache.clone();
175            updated.from_epoch = start_epoch;
176            *self.identity_cache.write() = Some(updated);
177            return Ok(());
178        }
179
180        // Walk backwards to find all identity transitions
181        let mut transitions = Vec::new();
182        let mut pubkey = epoch_pubkey;
183        let mut search_epoch = start_epoch.saturating_sub(1);
184        while search_epoch > 0 {
185            // Absorb cached transitions. If the cache reached genesis we can
186            // stop; otherwise update pubkey and fall through to continue the
187            // walk from where the cache left off.
188            if let Some(cache) = &cached
189                && search_epoch < cache.from_epoch
190                && search_epoch > cache.to_epoch
191            {
192                transitions.extend(cache.transitions.iter().cloned());
193                search_epoch = cache.to_epoch;
194                if cache.to_epoch == 0 {
195                    break;
196                }
197
198                pubkey = cache.to_pubkey;
199            }
200
201            let prev_outcome = match get_outcome(execution, epocher, search_epoch - 1) {
202                Ok(outcome) => outcome,
203                Err(IdentityProofError::PrunedData(height)) => {
204                    tracing::info!(
205                        %height,
206                        search_epoch = search_epoch - 1,
207                        "stopping identity transition walk early (header not available)"
208                    );
209                    break;
210                }
211                Err(e) => return Err(e),
212            };
213
214            // If keys differ, there was a full DKG at search_epoch
215            let prev_pubkey = *prev_outcome.sharing().public();
216            if pubkey != prev_pubkey {
217                let height = epocher
218                    .last(Epoch::new(search_epoch))
219                    .expect("fixed epocher is valid for all epochs");
220
221                let Some(header) = execution
222                    .provider
223                    .sealed_header(height.get())
224                    .ok()
225                    .flatten()
226                else {
227                    tracing::info!(
228                        height = height.get(),
229                        search_epoch,
230                        "stopping identity transition walk early (header not available)"
231                    );
232                    break;
233                };
234
235                let Some(finalization) = marshal.get_finalization(height).await else {
236                    tracing::info!(
237                        height = height.get(),
238                        search_epoch,
239                        "stopping identity transition walk early (finalization pruned)"
240                    );
241                    break;
242                };
243
244                if finalization.proposal.payload.0 != header.hash() {
245                    return Err(IdentityProofError::MalformedData(height.get()));
246                }
247
248                transitions.push(IdentityTransition {
249                    transition_epoch: search_epoch,
250                    old_identity: hex::encode(prev_pubkey.encode()),
251                    new_identity: hex::encode(pubkey.encode()),
252                    proof: Some(TransitionProofData {
253                        header: TempoHeaderResponse::from_consensus_header(header, 0),
254                        finalization_certificate: hex::encode(finalization.encode()),
255                    }),
256                });
257            }
258
259            pubkey = prev_pubkey;
260            search_epoch -= 1;
261        }
262
263        // Append genesis identity as terminal marker when we reached it.
264        if search_epoch == 0 {
265            let has_genesis = transitions
266                .last()
267                .is_some_and(|t| t.transition_epoch == 0 && t.proof.is_none());
268
269            if !has_genesis {
270                match get_outcome(execution, epocher, 0) {
271                    Ok(genesis_outcome) => {
272                        let genesis_pubkey = *genesis_outcome.sharing().public();
273                        let genesis_identity = hex::encode(genesis_pubkey.encode());
274                        transitions.push(IdentityTransition {
275                            transition_epoch: 0,
276                            old_identity: genesis_identity.clone(),
277                            new_identity: genesis_identity,
278                            proof: None,
279                        });
280                    }
281                    Err(err) => {
282                        tracing::debug!(
283                            ?err,
284                            "failed to fetch genesis outcome; omitting genesis marker"
285                        );
286                    }
287                }
288            }
289        }
290
291        // Build updated cache. The walk absorbs cached transitions in the correct order.
292        // `pubkey` is the identity at the point where the walk stopped.
293        let new_cache = if let Some(c) = &cached {
294            let (from, from_pk) = if start_epoch >= c.from_epoch {
295                (start_epoch, epoch_pubkey)
296            } else {
297                (c.from_epoch, c.from_pubkey)
298            };
299
300            IdentityTransitionCache {
301                from_epoch: from,
302                from_pubkey: from_pk,
303                to_epoch: search_epoch,
304                to_pubkey: pubkey,
305                transitions: Arc::new(transitions),
306            }
307        } else {
308            IdentityTransitionCache {
309                from_epoch: start_epoch,
310                from_pubkey: epoch_pubkey,
311                to_epoch: search_epoch,
312                to_pubkey: pubkey,
313                transitions: Arc::new(transitions),
314            }
315        };
316
317        *self.identity_cache.write() = Some(new_cache);
318        Ok(())
319    }
320}
321
322impl Default for FeedStateHandle {
323    fn default() -> Self {
324        Self::new()
325    }
326}
327
328impl std::fmt::Debug for FeedStateHandle {
329    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
330        let state = self.state.read();
331        f.debug_struct("FeedStateHandle")
332            .field("latest_notarized", &state.latest_notarized)
333            .field("latest_finalized", &state.latest_finalized)
334            .field("marshal_set", &self.marshal.get().is_some())
335            .field("execution_node_set", &self.execution_node.get().is_some())
336            .field("subscriber_count", &self.events_tx.receiver_count())
337            .finish()
338    }
339}
340
341impl ConsensusFeed for FeedStateHandle {
342    #[instrument(skip_all, fields(%query), ret(level = Level::DEBUG, Display))]
343    async fn get_finalization(&self, query: Query) -> Response<CertifiedBlock> {
344        match query {
345            Query::Latest => self
346                .state
347                .read()
348                .latest_finalized
349                .clone()
350                .map_or(Response::Missing("certifications"), Response::Success),
351            Query::Height(height) => 'process: {
352                let height = Height::new(height);
353                let Some(marshal) = self.marshal() else {
354                    break 'process Response::NotReady;
355                };
356
357                let Some(finalization) = marshal.get_finalization(height).await else {
358                    break 'process Response::Missing("certificate");
359                };
360                let Some(block) = marshal.get_block(height).await else {
361                    break 'process Response::Missing("block");
362                };
363
364                Response::Success(CertifiedBlock {
365                    epoch: finalization.proposal.round.epoch().get(),
366                    view: finalization.proposal.round.view().get(),
367                    block: block.into_inner().into_block(),
368                    digest: finalization.proposal.payload.0,
369                    certificate: hex::encode(finalization.encode()),
370                })
371            }
372        }
373    }
374
375    async fn get_latest(&self) -> ConsensusState {
376        let (finalized, mut notarized) = {
377            let state = self.state.read();
378            (
379                state.latest_finalized.clone(),
380                state.latest_notarized.clone(),
381            )
382        };
383
384        let finalized_round = finalized
385            .as_ref()
386            .map(|f| Round::new(Epoch::new(f.epoch), View::new(f.view)));
387
388        let notarized_round = notarized
389            .as_ref()
390            .map(|n| Round::new(Epoch::new(n.epoch), View::new(n.view)));
391
392        // Only include the notarization if it is ahead.
393        if finalized_round.is_some_and(|f| notarized_round.is_none_or(|n| n <= f)) {
394            notarized = None;
395        }
396
397        ConsensusState {
398            finalized,
399            notarized,
400        }
401    }
402
403    async fn subscribe(&self) -> Option<broadcast::Receiver<Event>> {
404        Some(self.events_tx.subscribe())
405    }
406
407    async fn get_identity_transition_proof(
408        &self,
409        from_epoch: Option<u64>,
410        full: bool,
411    ) -> Result<IdentityTransitionResponse, IdentityProofError> {
412        let Some((mut marshal, epocher)) = self.marshal().zip(self.epocher()) else {
413            return Err(IdentityProofError::NotReady);
414        };
415        let Some(execution_node) = self.execution_node.get() else {
416            return Err(IdentityProofError::NotReady);
417        };
418
419        // Determine starting epoch (from param, or latest finalized)
420        let start_epoch = if let Some(epoch) = from_epoch {
421            epoch
422        } else {
423            marshal
424                .get_info(Identifier::Latest)
425                .await
426                .and_then(|(h, _)| epocher.containing(h))
427                .ok_or(IdentityProofError::NotReady)?
428                .epoch()
429                .get()
430        };
431
432        // Ensure cached transitions are up to date
433        self.try_fill_transitions(&mut marshal, execution_node, &epocher, start_epoch)
434            .await?;
435
436        let cache = self
437            .identity_cache
438            .read()
439            .clone()
440            .ok_or(IdentityProofError::NotReady)?;
441
442        // Filter transitions to only include those at or before start_epoch
443        let transitions: Vec<_> = cache
444            .transitions
445            .iter()
446            .filter(|t| t.transition_epoch <= start_epoch)
447            .cloned()
448            .collect();
449
450        // Determine identity at start_epoch by finding the closest transition
451        // AFTER start_epoch and using its old_identity (the key before that change).
452        // Transitions are newest-to-oldest, so the last match is the closest.
453        let identity = cache
454            .transitions
455            .iter()
456            .filter(|t| t.transition_epoch > start_epoch)
457            .last()
458            .map(|t| t.old_identity.clone())
459            .unwrap_or_else(|| hex::encode(cache.from_pubkey.encode()));
460
461        // If not full, only return the most recent real transition (exclude genesis marker)
462        let transitions = if full {
463            transitions
464        } else {
465            transitions
466                .into_iter()
467                .filter(|t| t.transition_epoch > 0)
468                .take(1)
469                .collect()
470        };
471
472        Ok(IdentityTransitionResponse {
473            identity,
474            transitions,
475        })
476    }
477}
478
479/// Fetch last block of epoch and decode DKG outcome.
480fn get_outcome(
481    execution: &TempoFullNode,
482    epocher: &FixedEpocher,
483    epoch: u64,
484) -> Result<OnchainDkgOutcome, IdentityProofError> {
485    let height = epocher
486        .last(Epoch::new(epoch))
487        .expect("fixed epocher is valid for all epochs");
488
489    let header = execution
490        .provider
491        .header_by_number(height.get())
492        .ok()
493        .flatten()
494        .ok_or(IdentityProofError::PrunedData(height.get()))?;
495
496    OnchainDkgOutcome::read(&mut header.extra_data().as_ref())
497        .map_err(|_| IdentityProofError::MalformedData(height.get()))
498}