Skip to main content

tempo_commonware_node/feed/
state.rs

1//! Shared state for the feed module.
2
3use crate::alias::marshal;
4use alloy_consensus::BlockHeader as _;
5use alloy_primitives::hex;
6use commonware_codec::{Encode, ReadExt as _};
7use commonware_consensus::{
8    marshal::Identifier,
9    types::{Epoch, Epocher as _, FixedEpocher, Height, Round, View},
10};
11use commonware_cryptography::bls12381::primitives::variant::{MinSig, Variant};
12use parking_lot::RwLock;
13use reth_provider::HeaderProvider as _;
14use reth_rpc_convert::transaction::FromConsensusHeader;
15use std::sync::{Arc, OnceLock};
16use tempo_alloy::rpc::TempoHeaderResponse;
17use tempo_dkg_onchain_artifacts::OnchainDkgOutcome;
18use tempo_node::{
19    TempoFullNode,
20    rpc::consensus::{
21        CertifiedBlock, ConsensusFeed, ConsensusState, Event, IdentityProofError,
22        IdentityTransition, IdentityTransitionResponse, Query, TransitionProofData,
23    },
24};
25use tokio::sync::broadcast;
26use tracing::instrument;
27
28const BROADCAST_CHANNEL_SIZE: usize = 1024;
29
30/// Internal shared state for the feed.
31pub(super) struct FeedState {
32    /// Latest notarized block.
33    pub(super) latest_notarized: Option<CertifiedBlock>,
34    /// Latest finalized block.
35    pub(super) latest_finalized: Option<CertifiedBlock>,
36}
37
38/// Cached identity transition chain.
39///
40/// Stores transitions from a starting epoch back towards genesis.
41/// Can be extended for newer epochs or subsectioned for older queries.
42#[derive(Clone, Default)]
43struct IdentityTransitionCache {
44    /// The epoch from which the chain was built (inclusive).
45    from_epoch: u64,
46    /// The earliest epoch we walked to (0 if we reached genesis).
47    to_epoch: u64,
48    /// Identity at `from_epoch`.
49    identity: String,
50    /// Cached transitions, ordered newest to oldest.
51    transitions: Arc<Vec<IdentityTransition>>,
52}
53
54/// Handle to shared feed state.
55///
56/// This handle can be cloned and used by both:
57/// - The feed actor (to update state when processing Activity)
58/// - RPC handlers (implements `ConsensusFeed`)
59#[derive(Clone)]
60pub struct FeedStateHandle {
61    state: Arc<RwLock<FeedState>>,
62    marshal: Arc<OnceLock<marshal::Mailbox>>,
63    epocher: Arc<OnceLock<FixedEpocher>>,
64    execution_node: Arc<OnceLock<TempoFullNode>>,
65    events_tx: broadcast::Sender<Event>,
66    /// Cache for identity transition proofs to avoid re-walking the chain.
67    identity_cache: Arc<RwLock<Option<IdentityTransitionCache>>>,
68}
69
70impl FeedStateHandle {
71    /// Create a new feed state handle.
72    ///
73    /// The marshal mailbox can be set later using `set_marshal`.
74    /// Until set, historical finalization lookups will return `None`.
75    pub fn new() -> Self {
76        let (events_tx, _) = broadcast::channel(BROADCAST_CHANNEL_SIZE);
77        Self {
78            state: Arc::new(RwLock::new(FeedState {
79                latest_notarized: None,
80                latest_finalized: None,
81            })),
82            marshal: Arc::new(OnceLock::new()),
83            epocher: Arc::new(OnceLock::new()),
84            execution_node: Arc::new(OnceLock::new()),
85            events_tx,
86            identity_cache: Arc::new(RwLock::new(None)),
87        }
88    }
89
90    /// Set the marshal mailbox for historical finalization lookups. Should only be called once.
91    pub(crate) fn set_marshal(&self, marshal: marshal::Mailbox) {
92        let _ = self.marshal.set(marshal);
93    }
94
95    /// Set the epocher for epoch boundary calculations. Should only be called once.
96    pub(crate) fn set_epocher(&self, epocher: FixedEpocher) {
97        let _ = self.epocher.set(epocher);
98    }
99
100    /// Set the execution node for header lookups. Should only be called once.
101    pub(crate) fn set_execution_node(&self, execution_node: TempoFullNode) {
102        let _ = self.execution_node.set(execution_node);
103    }
104
105    /// Get the broadcast sender for events.
106    pub(super) fn events_tx(&self) -> &broadcast::Sender<Event> {
107        &self.events_tx
108    }
109
110    /// Get read access to the internal state.
111    pub(super) fn read(&self) -> parking_lot::RwLockReadGuard<'_, FeedState> {
112        self.state.read()
113    }
114
115    /// Get write access to the internal state.
116    pub(super) fn write(&self) -> parking_lot::RwLockWriteGuard<'_, FeedState> {
117        self.state.write()
118    }
119
120    /// Get the marshal mailbox, logging if not yet set.
121    fn marshal(&self) -> Option<marshal::Mailbox> {
122        let marshal = self.marshal.get().cloned();
123        if marshal.is_none() {
124            tracing::debug!("marshal not yet set");
125        }
126        marshal
127    }
128
129    /// Get the epocher, logging if not yet set.
130    fn epocher(&self) -> Option<FixedEpocher> {
131        let epocher = self.epocher.get().cloned();
132        if epocher.is_none() {
133            tracing::debug!("epocher not yet set");
134        }
135        epocher
136    }
137
138    /// Ensure the identity cache covers `start_epoch` by walking backwards
139    /// if needed. After this returns, the cache is guaranteed to contain
140    /// transition data covering `start_epoch` (as far back as available data allows).
141    #[instrument(skip_all, fields(start_epoch), err)]
142    async fn try_fill_transitions(
143        &self,
144        marshal: &mut marshal::Mailbox,
145        execution: &TempoFullNode,
146        epocher: &FixedEpocher,
147        start_epoch: u64,
148    ) -> Result<(), IdentityProofError> {
149        // Check if the cache already covers this epoch
150        let cached = self.identity_cache.read().clone();
151        if let Some(cache) = &cached
152            && (cache.to_epoch..=cache.from_epoch).contains(&start_epoch)
153        {
154            return Ok(());
155        }
156
157        // Identity active at epoch N is set by the last block of epoch N-1
158        let epoch_outcome = get_outcome(execution, epocher, start_epoch.saturating_sub(1))?;
159        let epoch_pubkey = *epoch_outcome.sharing().public();
160
161        // Fast path: if the identity matches the cached one, no new transitions
162        if let Some(cache) = &cached
163            && start_epoch > cache.from_epoch
164        {
165            let cache_pubkey_bytes = hex::decode(&cache.identity)
166                .map_err(|_| IdentityProofError::MalformedData(cache.from_epoch))?;
167            let cache_pubkey =
168                <MinSig as Variant>::Public::read(&mut cache_pubkey_bytes.as_slice())
169                    .map_err(|_| IdentityProofError::MalformedData(cache.from_epoch))?;
170
171            if cache_pubkey == epoch_pubkey {
172                let mut updated = cache.clone();
173                updated.from_epoch = start_epoch;
174
175                *self.identity_cache.write() = Some(updated);
176                return Ok(());
177            }
178        }
179
180        // Walk backwards to find all identity transitions
181        let mut transitions = Vec::new();
182        let mut pubkey = epoch_pubkey;
183        let mut search_epoch = start_epoch.saturating_sub(1);
184        while search_epoch > 0 {
185            // Absorb cached transitions and stop.
186            if let Some(cache) = &cached
187                && search_epoch <= cache.from_epoch
188            {
189                transitions.extend(cache.transitions.iter().cloned());
190                search_epoch = cache.to_epoch;
191                // We dont continue downwards past to_epoch since the walk only stops if
192                // DKG parsing fails (Internal Error state) or data is unavailable (Pruned).
193                // Both which are not recoverable in the current runtime.
194                break;
195            }
196
197            let prev_outcome = get_outcome(execution, epocher, search_epoch - 1)?;
198            let prev_pubkey = *prev_outcome.sharing().public();
199
200            // If keys differ, there was a full DKG at search_epoch
201            if pubkey != prev_pubkey {
202                let height = epocher
203                    .last(Epoch::new(search_epoch))
204                    .expect("fixed epocher is valid for all epochs");
205
206                let Some(header) = execution
207                    .provider
208                    .sealed_header(height.get())
209                    .ok()
210                    .flatten()
211                else {
212                    tracing::info!(
213                        height = height.get(),
214                        search_epoch,
215                        "stopping identity transition walk early (header not available)"
216                    );
217                    break;
218                };
219
220                let Some(finalization) = marshal.get_finalization(height).await else {
221                    tracing::info!(
222                        height = height.get(),
223                        search_epoch,
224                        "stopping identity transition walk early (finalization pruned)"
225                    );
226                    break;
227                };
228
229                transitions.push(IdentityTransition {
230                    transition_epoch: search_epoch,
231                    old_identity: hex::encode(prev_pubkey.encode()),
232                    new_identity: hex::encode(pubkey.encode()),
233                    proof: Some(TransitionProofData {
234                        header: TempoHeaderResponse::from_consensus_header(header, 0),
235                        finalization_certificate: hex::encode(finalization.encode()),
236                    }),
237                });
238            }
239
240            pubkey = prev_pubkey;
241            search_epoch -= 1;
242        }
243
244        // Append genesis identity as terminal marker when we reached it.
245        if search_epoch == 0 {
246            let has_genesis = transitions
247                .last()
248                .is_some_and(|t| t.transition_epoch == 0 && t.proof.is_none());
249
250            if !has_genesis {
251                match get_outcome(execution, epocher, 0) {
252                    Ok(genesis_outcome) => {
253                        let genesis_pubkey = *genesis_outcome.sharing().public();
254                        let genesis_identity = hex::encode(genesis_pubkey.encode());
255                        transitions.push(IdentityTransition {
256                            transition_epoch: 0,
257                            old_identity: genesis_identity.clone(),
258                            new_identity: genesis_identity,
259                            proof: None,
260                        });
261                    }
262                    Err(err) => {
263                        tracing::debug!(
264                            ?err,
265                            "failed to fetch genesis outcome; omitting genesis marker"
266                        );
267                    }
268                }
269            }
270        }
271
272        // Build updated cache. The walk absorbs cached transitions in the correct order
273        let new_cache = if let Some(c) = &cached {
274            IdentityTransitionCache {
275                from_epoch: start_epoch.max(c.from_epoch),
276                to_epoch: search_epoch.min(c.to_epoch),
277                transitions: Arc::new(transitions),
278                identity: if start_epoch >= c.from_epoch {
279                    hex::encode(epoch_pubkey.encode())
280                } else {
281                    c.identity.clone()
282                },
283            }
284        } else {
285            IdentityTransitionCache {
286                from_epoch: start_epoch,
287                to_epoch: search_epoch,
288                identity: hex::encode(epoch_pubkey.encode()),
289                transitions: Arc::new(transitions),
290            }
291        };
292
293        *self.identity_cache.write() = Some(new_cache);
294        Ok(())
295    }
296}
297
298impl Default for FeedStateHandle {
299    fn default() -> Self {
300        Self::new()
301    }
302}
303
304impl std::fmt::Debug for FeedStateHandle {
305    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
306        let state = self.state.read();
307        f.debug_struct("FeedStateHandle")
308            .field("latest_notarized", &state.latest_notarized)
309            .field("latest_finalized", &state.latest_finalized)
310            .field("marshal_set", &self.marshal.get().is_some())
311            .field("execution_node_set", &self.execution_node.get().is_some())
312            .field("subscriber_count", &self.events_tx.receiver_count())
313            .finish()
314    }
315}
316
317impl ConsensusFeed for FeedStateHandle {
318    async fn get_finalization(&self, query: Query) -> Option<CertifiedBlock> {
319        match query {
320            Query::Latest => {
321                let block = self.state.read().latest_finalized.clone()?;
322                Some(block)
323            }
324            Query::Height(height) => {
325                let height = Height::new(height);
326                let marshal = self.marshal()?;
327
328                let finalization = marshal.get_finalization(height).await?;
329                let block = marshal.get_block(height).await?;
330
331                Some(CertifiedBlock {
332                    epoch: finalization.proposal.round.epoch().get(),
333                    view: finalization.proposal.round.view().get(),
334                    block: block.into_inner().into_block(),
335                    digest: finalization.proposal.payload.0,
336                    certificate: hex::encode(finalization.encode()),
337                })
338            }
339        }
340    }
341
342    async fn get_latest(&self) -> ConsensusState {
343        let (finalized, mut notarized) = {
344            let state = self.state.read();
345            (
346                state.latest_finalized.clone(),
347                state.latest_notarized.clone(),
348            )
349        };
350
351        let finalized_round = finalized
352            .as_ref()
353            .map(|f| Round::new(Epoch::new(f.epoch), View::new(f.view)));
354
355        let notarized_round = notarized
356            .as_ref()
357            .map(|n| Round::new(Epoch::new(n.epoch), View::new(n.view)));
358
359        // Only include the notarization if it is ahead.
360        if finalized_round.is_some_and(|f| notarized_round.is_none_or(|n| n <= f)) {
361            notarized = None;
362        }
363
364        ConsensusState {
365            finalized,
366            notarized,
367        }
368    }
369
370    async fn subscribe(&self) -> Option<broadcast::Receiver<Event>> {
371        Some(self.events_tx.subscribe())
372    }
373
374    async fn get_identity_transition_proof(
375        &self,
376        from_epoch: Option<u64>,
377        full: bool,
378    ) -> Result<IdentityTransitionResponse, IdentityProofError> {
379        let Some((mut marshal, epocher)) = self.marshal().zip(self.epocher()) else {
380            return Err(IdentityProofError::NotReady);
381        };
382        let Some(execution_node) = self.execution_node.get() else {
383            return Err(IdentityProofError::NotReady);
384        };
385
386        // Determine starting epoch (from param, or latest finalized)
387        let start_epoch = if let Some(epoch) = from_epoch {
388            epoch
389        } else {
390            marshal
391                .get_info(Identifier::Latest)
392                .await
393                .and_then(|(h, _)| epocher.containing(h))
394                .ok_or(IdentityProofError::NotReady)?
395                .epoch()
396                .get()
397        };
398
399        // Ensure cached transitions are up to date
400        self.try_fill_transitions(&mut marshal, execution_node, &epocher, start_epoch)
401            .await?;
402
403        let cache = self
404            .identity_cache
405            .read()
406            .clone()
407            .ok_or(IdentityProofError::NotReady)?;
408
409        // Filter transitions to only include those at or before start_epoch
410        let transitions: Vec<_> = cache
411            .transitions
412            .iter()
413            .filter(|t| t.transition_epoch <= start_epoch)
414            .cloned()
415            .collect();
416
417        // Determine identity at start_epoch by finding the closest transition
418        // AFTER start_epoch and using its old_identity (the key before that change).
419        // Transitions are newest-to-oldest, so the last match is the closest.
420        let identity = cache
421            .transitions
422            .iter()
423            .filter(|t| t.transition_epoch > start_epoch)
424            .last()
425            .map(|t| t.old_identity.clone())
426            .unwrap_or_else(|| cache.identity.clone());
427
428        // If not full, only return the most recent real transition (exclude genesis marker)
429        let transitions = if full {
430            transitions
431        } else {
432            transitions
433                .into_iter()
434                .filter(|t| t.transition_epoch > 0)
435                .take(1)
436                .collect()
437        };
438
439        Ok(IdentityTransitionResponse {
440            identity,
441            transitions,
442        })
443    }
444}
445
446/// Fetch last block of epoch and decode DKG outcome.
447fn get_outcome(
448    execution: &TempoFullNode,
449    epocher: &FixedEpocher,
450    epoch: u64,
451) -> Result<OnchainDkgOutcome, IdentityProofError> {
452    let height = epocher
453        .last(Epoch::new(epoch))
454        .expect("fixed epocher is valid for all epochs");
455    let header = execution
456        .provider
457        .header_by_number(height.get())
458        .ok()
459        .flatten()
460        .ok_or(IdentityProofError::PrunedData(height.get()))?;
461    OnchainDkgOutcome::read(&mut header.extra_data().as_ref())
462        .map_err(|_| IdentityProofError::MalformedData(height.get()))
463}