Skip to main content

tempo_consensus/feed/
state.rs

1//! Shared state for the feed module.
2
3use crate::{alias::marshal, network_identity::NetworkIdentity};
4use alloy_consensus::BlockHeader as _;
5use alloy_primitives::hex;
6use commonware_codec::{Encode, ReadExt as _};
7use commonware_consensus::{
8    marshal::Identifier,
9    types::{Epoch, Epocher as _, FixedEpocher, Height, Round, View},
10};
11use parking_lot::RwLock;
12use reth_node_core::rpc::compat::FromConsensusHeader;
13use reth_provider::HeaderProvider as _;
14use std::sync::{Arc, OnceLock};
15use tempo_alloy::rpc::TempoHeaderResponse;
16use tempo_dkg_onchain_artifacts::OnchainDkgOutcome;
17use tempo_node::{
18    TempoFullNode,
19    rpc::consensus::{
20        CertifiedBlock, ConsensusFeed, ConsensusState, Event, IdentityProofError,
21        IdentityTransition, IdentityTransitionResponse, Query, TransitionProofData,
22        types::Response,
23    },
24};
25use tokio::sync::broadcast;
26use tracing::{Level, instrument};
27
28const BROADCAST_CHANNEL_SIZE: usize = 1024;
29
30/// Internal shared state for the feed.
31pub(super) struct FeedState {
32    /// Latest notarized block.
33    pub(super) latest_notarized: Option<CertifiedBlock>,
34    /// Latest finalized block.
35    pub(super) latest_finalized: Option<CertifiedBlock>,
36}
37
38/// Cached identity transition chain.
39///
40/// Stores transitions from a starting epoch back towards genesis.
41/// Can be extended for newer epochs or subsectioned for older queries.
42#[derive(Clone)]
43struct IdentityTransitionCache {
44    /// The epoch from which the chain was built (inclusive).
45    from_epoch: u64,
46    /// NetworkIdentity at `from_epoch`.
47    from_identity: NetworkIdentity,
48    /// The earliest epoch we walked to (0 if we reached genesis).
49    to_epoch: u64,
50    /// NetworkIdentity at `to_epoch`.
51    to_identity: NetworkIdentity,
52    /// Cached transitions, ordered newest to oldest.
53    transitions: Arc<Vec<IdentityTransition>>,
54}
55
56/// Handle to shared feed state.
57///
58/// This handle can be cloned and used by both:
59/// - The feed actor (to update state when processing Activity)
60/// - RPC handlers (implements `ConsensusFeed`)
61#[derive(Clone)]
62pub struct FeedStateHandle {
63    state: Arc<RwLock<FeedState>>,
64    marshal: Arc<OnceLock<marshal::Mailbox>>,
65    epocher: Arc<OnceLock<FixedEpocher>>,
66    execution_node: Arc<OnceLock<Arc<TempoFullNode>>>,
67    events_tx: broadcast::Sender<Event>,
68    /// Cache for identity transition proofs to avoid re-walking the chain.
69    identity_cache: Arc<RwLock<Option<IdentityTransitionCache>>>,
70}
71
72impl FeedStateHandle {
73    /// Create a new feed state handle.
74    ///
75    /// The marshal mailbox can be set later using `set_marshal`.
76    /// Until set, historical finalization lookups will return `None`.
77    pub fn new() -> Self {
78        let (events_tx, _) = broadcast::channel(BROADCAST_CHANNEL_SIZE);
79        Self {
80            state: Arc::new(RwLock::new(FeedState {
81                latest_notarized: None,
82                latest_finalized: None,
83            })),
84            marshal: Arc::new(OnceLock::new()),
85            epocher: Arc::new(OnceLock::new()),
86            execution_node: Arc::new(OnceLock::new()),
87            events_tx,
88            identity_cache: Arc::new(RwLock::new(None)),
89        }
90    }
91
92    /// Set the marshal mailbox for historical finalization lookups. Should only be called once.
93    pub(crate) fn set_marshal(&self, marshal: marshal::Mailbox) {
94        let _ = self.marshal.set(marshal);
95    }
96
97    /// Set the epocher for epoch boundary calculations. Should only be called once.
98    pub(crate) fn set_epocher(&self, epocher: FixedEpocher) {
99        let _ = self.epocher.set(epocher);
100    }
101
102    /// Set the execution node for header lookups. Should only be called once.
103    pub(crate) fn set_execution_node(&self, execution_node: Arc<TempoFullNode>) {
104        let _ = self.execution_node.set(execution_node);
105    }
106
107    /// Get the broadcast sender for events.
108    pub(super) fn events_tx(&self) -> &broadcast::Sender<Event> {
109        &self.events_tx
110    }
111
112    /// Get read access to the internal state.
113    pub(super) fn read(&self) -> parking_lot::RwLockReadGuard<'_, FeedState> {
114        self.state.read()
115    }
116
117    /// Get write access to the internal state.
118    pub(super) fn write(&self) -> parking_lot::RwLockWriteGuard<'_, FeedState> {
119        self.state.write()
120    }
121
122    /// Get the marshal mailbox, logging if not yet set.
123    fn marshal(&self) -> Option<marshal::Mailbox> {
124        let marshal = self.marshal.get().cloned();
125        if marshal.is_none() {
126            tracing::debug!("marshal not yet set");
127        }
128        marshal
129    }
130
131    /// Get the epocher, logging if not yet set.
132    fn epocher(&self) -> Option<FixedEpocher> {
133        let epocher = self.epocher.get().cloned();
134        if epocher.is_none() {
135            tracing::debug!("epocher not yet set");
136        }
137        epocher
138    }
139
140    /// Ensure the identity cache covers `start_epoch` by walking backwards
141    /// if needed. After this returns, the cache is guaranteed to contain
142    /// transition data covering `start_epoch` (as far back as available data allows).
143    #[instrument(skip_all, fields(start_epoch), err)]
144    async fn try_fill_transitions(
145        &self,
146        marshal: &mut marshal::Mailbox,
147        execution: &TempoFullNode,
148        epocher: &FixedEpocher,
149        start_epoch: u64,
150    ) -> Result<(), IdentityProofError> {
151        // Check if the cache already covers this epoch.
152        // If the cache is incomplete, skip the early return so we re-attempt
153        // the walk from where it previously stopped.
154        let cached = self.identity_cache.read().clone();
155        if let Some(cache) = &cached
156            && cache.to_epoch == 0
157            && (cache.to_epoch..=cache.from_epoch).contains(&start_epoch)
158        {
159            return Ok(());
160        }
161
162        // Identity active at epoch N is set by the last block of epoch N-1
163        let epoch_outcome = get_outcome(execution, epocher, start_epoch.saturating_sub(1))?;
164        let epoch_pubkey = *epoch_outcome.sharing().public();
165
166        // Fast path: if the identity matches the cached one and the cache is
167        // complete, just extend the upper bound — no new transitions needed.
168        if let Some(cache) = &cached
169            && start_epoch > cache.from_epoch
170            && cache.to_epoch == 0
171            && cache.from_identity.public_key() == epoch_pubkey
172        {
173            let mut updated = cache.clone();
174            updated.from_epoch = start_epoch;
175            *self.identity_cache.write() = Some(updated);
176            return Ok(());
177        }
178
179        // Walk backwards to find all identity transitions
180        let mut transitions = Vec::new();
181        let mut pubkey = epoch_pubkey;
182        let mut search_epoch = start_epoch.saturating_sub(1);
183        while search_epoch > 0 {
184            // Absorb cached transitions. If the cache reached genesis we can
185            // stop; otherwise update pubkey and fall through to continue the
186            // walk from where the cache left off.
187            if let Some(cache) = &cached
188                && search_epoch < cache.from_epoch
189                && search_epoch > cache.to_epoch
190            {
191                transitions.extend(cache.transitions.iter().cloned());
192                search_epoch = cache.to_epoch;
193                if cache.to_epoch == 0 {
194                    break;
195                }
196
197                pubkey = cache.to_identity.public_key();
198            }
199
200            let prev_outcome = match get_outcome(execution, epocher, search_epoch - 1) {
201                Ok(outcome) => outcome,
202                Err(IdentityProofError::PrunedData(height)) => {
203                    tracing::info!(
204                        %height,
205                        search_epoch = search_epoch - 1,
206                        "stopping identity transition walk early (header not available)"
207                    );
208                    break;
209                }
210                Err(e) => return Err(e),
211            };
212
213            // If keys differ, there was a full DKG at search_epoch
214            let prev_pubkey = *prev_outcome.sharing().public();
215            if pubkey != prev_pubkey {
216                let height = epocher
217                    .last(Epoch::new(search_epoch))
218                    .expect("fixed epocher is valid for all epochs");
219
220                let Some(header) = execution
221                    .provider
222                    .sealed_header(height.get())
223                    .ok()
224                    .flatten()
225                else {
226                    tracing::info!(
227                        height = height.get(),
228                        search_epoch,
229                        "stopping identity transition walk early (header not available)"
230                    );
231                    break;
232                };
233
234                let Some(finalization) = marshal.get_finalization(height).await else {
235                    tracing::info!(
236                        height = height.get(),
237                        search_epoch,
238                        "stopping identity transition walk early (finalization pruned)"
239                    );
240                    break;
241                };
242
243                if finalization.proposal.payload.0 != header.hash() {
244                    return Err(IdentityProofError::MalformedData(height.get()));
245                }
246
247                transitions.push(IdentityTransition {
248                    transition_epoch: search_epoch,
249                    old_identity: hex::encode(prev_pubkey.encode()),
250                    new_identity: hex::encode(pubkey.encode()),
251                    proof: Some(TransitionProofData {
252                        header: TempoHeaderResponse::from_consensus_header(header, 0),
253                        finalization_certificate: hex::encode(finalization.encode()),
254                    }),
255                });
256            }
257
258            pubkey = prev_pubkey;
259            search_epoch -= 1;
260        }
261
262        // Append genesis identity as terminal marker when we reached it.
263        if search_epoch == 0 {
264            let has_genesis = transitions
265                .last()
266                .is_some_and(|t| t.transition_epoch == 0 && t.proof.is_none());
267
268            if !has_genesis {
269                match get_outcome(execution, epocher, 0) {
270                    Ok(genesis_outcome) => {
271                        let genesis_pubkey = *genesis_outcome.sharing().public();
272                        let genesis_identity = hex::encode(genesis_pubkey.encode());
273                        transitions.push(IdentityTransition {
274                            transition_epoch: 0,
275                            old_identity: genesis_identity.clone(),
276                            new_identity: genesis_identity,
277                            proof: None,
278                        });
279                    }
280                    Err(err) => {
281                        tracing::debug!(
282                            ?err,
283                            "failed to fetch genesis outcome; omitting genesis marker"
284                        );
285                    }
286                }
287            }
288        }
289
290        // Build updated cache. The walk absorbs cached transitions in the correct order.
291        // `pubkey` is the identity at the point where the walk stopped.
292        let new_cache = if let Some(c) = &cached {
293            let (from, from_pubkey) = if start_epoch >= c.from_epoch {
294                (start_epoch, epoch_pubkey)
295            } else {
296                (c.from_epoch, c.from_identity.public_key())
297            };
298
299            IdentityTransitionCache {
300                from_epoch: from,
301                from_identity: NetworkIdentity(from_pubkey),
302                to_epoch: search_epoch,
303                to_identity: NetworkIdentity(pubkey),
304                transitions: Arc::new(transitions),
305            }
306        } else {
307            IdentityTransitionCache {
308                from_epoch: start_epoch,
309                from_identity: NetworkIdentity(epoch_pubkey),
310                to_epoch: search_epoch,
311                to_identity: NetworkIdentity(pubkey),
312                transitions: Arc::new(transitions),
313            }
314        };
315
316        *self.identity_cache.write() = Some(new_cache);
317        Ok(())
318    }
319}
320
321impl Default for FeedStateHandle {
322    fn default() -> Self {
323        Self::new()
324    }
325}
326
327impl std::fmt::Debug for FeedStateHandle {
328    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
329        let state = self.state.read();
330        f.debug_struct("FeedStateHandle")
331            .field("latest_notarized", &state.latest_notarized)
332            .field("latest_finalized", &state.latest_finalized)
333            .field("marshal_set", &self.marshal.get().is_some())
334            .field("execution_node_set", &self.execution_node.get().is_some())
335            .field("subscriber_count", &self.events_tx.receiver_count())
336            .finish()
337    }
338}
339
340impl ConsensusFeed for FeedStateHandle {
341    #[instrument(skip_all, fields(%query), ret(level = Level::DEBUG, Display))]
342    async fn get_finalization(&self, query: Query) -> Response<CertifiedBlock> {
343        match query {
344            Query::Latest => self
345                .state
346                .read()
347                .latest_finalized
348                .clone()
349                .map_or(Response::Missing("certifications"), Response::Success),
350            Query::Height(height) => 'process: {
351                let height = Height::new(height);
352                let Some(marshal) = self.marshal() else {
353                    break 'process Response::NotReady;
354                };
355
356                let Some(finalization) = marshal.get_finalization(height).await else {
357                    break 'process Response::Missing("certificate");
358                };
359                let Some(block) = marshal.get_block(height).await else {
360                    break 'process Response::Missing("block");
361                };
362
363                Response::Success(CertifiedBlock {
364                    epoch: finalization.proposal.round.epoch().get(),
365                    view: finalization.proposal.round.view().get(),
366                    block: block.into_inner().into_block(),
367                    digest: finalization.proposal.payload.0,
368                    certificate: hex::encode(finalization.encode()),
369                })
370            }
371        }
372    }
373
374    async fn get_latest(&self) -> ConsensusState {
375        let (finalized, mut notarized) = {
376            let state = self.state.read();
377            (
378                state.latest_finalized.clone(),
379                state.latest_notarized.clone(),
380            )
381        };
382
383        let finalized_round = finalized
384            .as_ref()
385            .map(|f| Round::new(Epoch::new(f.epoch), View::new(f.view)));
386
387        let notarized_round = notarized
388            .as_ref()
389            .map(|n| Round::new(Epoch::new(n.epoch), View::new(n.view)));
390
391        // Only include the notarization if it is ahead.
392        if finalized_round.is_some_and(|f| notarized_round.is_none_or(|n| n <= f)) {
393            notarized = None;
394        }
395
396        ConsensusState {
397            finalized,
398            notarized,
399        }
400    }
401
402    async fn subscribe(&self) -> Option<broadcast::Receiver<Event>> {
403        Some(self.events_tx.subscribe())
404    }
405
406    async fn get_identity_transition_proof(
407        &self,
408        from_epoch: Option<u64>,
409        full: bool,
410    ) -> Result<IdentityTransitionResponse, IdentityProofError> {
411        let Some((mut marshal, epocher)) = self.marshal().zip(self.epocher()) else {
412            return Err(IdentityProofError::NotReady);
413        };
414        let Some(execution_node) = self.execution_node.get() else {
415            return Err(IdentityProofError::NotReady);
416        };
417
418        // Determine starting epoch (from param, or latest finalized)
419        let start_epoch = if let Some(epoch) = from_epoch {
420            epoch
421        } else {
422            marshal
423                .get_info(Identifier::Latest)
424                .await
425                .and_then(|(h, _)| epocher.containing(h))
426                .ok_or(IdentityProofError::NotReady)?
427                .epoch()
428                .get()
429        };
430
431        // Ensure cached transitions are up to date
432        self.try_fill_transitions(&mut marshal, execution_node, &epocher, start_epoch)
433            .await?;
434
435        let cache = self
436            .identity_cache
437            .read()
438            .clone()
439            .ok_or(IdentityProofError::NotReady)?;
440
441        // Filter transitions to only include those at or before start_epoch
442        let transitions: Vec<_> = cache
443            .transitions
444            .iter()
445            .filter(|t| t.transition_epoch <= start_epoch)
446            .cloned()
447            .collect();
448
449        // Determine identity at start_epoch by finding the closest transition
450        // AFTER start_epoch and using its old_identity (the key before that change).
451        // Transitions are newest-to-oldest, so the last match is the closest.
452        let identity = cache
453            .transitions
454            .iter()
455            .filter(|t| t.transition_epoch > start_epoch)
456            .last()
457            .map(|t| t.old_identity.clone())
458            .unwrap_or_else(|| hex::encode(cache.from_identity.public_key().encode()));
459
460        // If not full, only return the most recent real transition (exclude genesis marker)
461        let transitions = if full {
462            transitions
463        } else {
464            transitions
465                .into_iter()
466                .filter(|t| t.transition_epoch > 0)
467                .take(1)
468                .collect()
469        };
470
471        Ok(IdentityTransitionResponse {
472            identity,
473            transitions,
474        })
475    }
476}
477
478/// Fetch last block of epoch and decode DKG outcome.
479fn get_outcome(
480    execution: &TempoFullNode,
481    epocher: &FixedEpocher,
482    epoch: u64,
483) -> Result<OnchainDkgOutcome, IdentityProofError> {
484    let height = epocher
485        .last(Epoch::new(epoch))
486        .expect("fixed epocher is valid for all epochs");
487
488    let header = execution
489        .provider
490        .header_by_number(height.get())
491        .ok()
492        .flatten()
493        .ok_or(IdentityProofError::PrunedData(height.get()))?;
494
495    OnchainDkgOutcome::read(&mut header.extra_data().as_ref())
496        .map_err(|_| IdentityProofError::MalformedData(height.get()))
497}