tempo_commonware_node/feed/
state.rs1use crate::alias::marshal;
4use alloy_consensus::BlockHeader as _;
5use alloy_primitives::hex;
6use commonware_codec::{Encode, ReadExt as _};
7use commonware_consensus::{
8 marshal::Identifier,
9 types::{Epoch, Epocher as _, FixedEpocher, Height, Round, View},
10};
11use commonware_cryptography::bls12381::primitives::variant::{MinSig, Variant};
12use parking_lot::RwLock;
13use reth_provider::HeaderProvider as _;
14use reth_rpc_convert::transaction::FromConsensusHeader;
15use std::sync::{Arc, OnceLock};
16use tempo_alloy::rpc::TempoHeaderResponse;
17use tempo_dkg_onchain_artifacts::OnchainDkgOutcome;
18use tempo_node::{
19 TempoFullNode,
20 rpc::consensus::{
21 CertifiedBlock, ConsensusFeed, ConsensusState, Event, IdentityProofError,
22 IdentityTransition, IdentityTransitionResponse, Query, TransitionProofData,
23 },
24};
25use tokio::sync::broadcast;
26use tracing::instrument;
27
28const BROADCAST_CHANNEL_SIZE: usize = 1024;
29
30pub(super) struct FeedState {
32 pub(super) latest_notarized: Option<CertifiedBlock>,
34 pub(super) latest_finalized: Option<CertifiedBlock>,
36}
37
38#[derive(Clone, Default)]
43struct IdentityTransitionCache {
44 from_epoch: u64,
46 to_epoch: u64,
48 identity: String,
50 transitions: Arc<Vec<IdentityTransition>>,
52}
53
54#[derive(Clone)]
60pub struct FeedStateHandle {
61 state: Arc<RwLock<FeedState>>,
62 marshal: Arc<OnceLock<marshal::Mailbox>>,
63 epocher: Arc<OnceLock<FixedEpocher>>,
64 execution_node: Arc<OnceLock<TempoFullNode>>,
65 events_tx: broadcast::Sender<Event>,
66 identity_cache: Arc<RwLock<Option<IdentityTransitionCache>>>,
68}
69
70impl FeedStateHandle {
71 pub fn new() -> Self {
76 let (events_tx, _) = broadcast::channel(BROADCAST_CHANNEL_SIZE);
77 Self {
78 state: Arc::new(RwLock::new(FeedState {
79 latest_notarized: None,
80 latest_finalized: None,
81 })),
82 marshal: Arc::new(OnceLock::new()),
83 epocher: Arc::new(OnceLock::new()),
84 execution_node: Arc::new(OnceLock::new()),
85 events_tx,
86 identity_cache: Arc::new(RwLock::new(None)),
87 }
88 }
89
90 pub(crate) fn set_marshal(&self, marshal: marshal::Mailbox) {
92 let _ = self.marshal.set(marshal);
93 }
94
95 pub(crate) fn set_epocher(&self, epocher: FixedEpocher) {
97 let _ = self.epocher.set(epocher);
98 }
99
100 pub(crate) fn set_execution_node(&self, execution_node: TempoFullNode) {
102 let _ = self.execution_node.set(execution_node);
103 }
104
105 pub(super) fn events_tx(&self) -> &broadcast::Sender<Event> {
107 &self.events_tx
108 }
109
110 pub(super) fn read(&self) -> parking_lot::RwLockReadGuard<'_, FeedState> {
112 self.state.read()
113 }
114
115 pub(super) fn write(&self) -> parking_lot::RwLockWriteGuard<'_, FeedState> {
117 self.state.write()
118 }
119
120 fn marshal(&self) -> Option<marshal::Mailbox> {
122 let marshal = self.marshal.get().cloned();
123 if marshal.is_none() {
124 tracing::debug!("marshal not yet set");
125 }
126 marshal
127 }
128
129 fn epocher(&self) -> Option<FixedEpocher> {
131 let epocher = self.epocher.get().cloned();
132 if epocher.is_none() {
133 tracing::debug!("epocher not yet set");
134 }
135 epocher
136 }
137
138 #[instrument(skip_all, fields(start_epoch), err)]
142 async fn try_fill_transitions(
143 &self,
144 marshal: &mut marshal::Mailbox,
145 execution: &TempoFullNode,
146 epocher: &FixedEpocher,
147 start_epoch: u64,
148 ) -> Result<(), IdentityProofError> {
149 let cached = self.identity_cache.read().clone();
151 if let Some(cache) = &cached
152 && (cache.to_epoch..=cache.from_epoch).contains(&start_epoch)
153 {
154 return Ok(());
155 }
156
157 let epoch_outcome = get_outcome(execution, epocher, start_epoch.saturating_sub(1))?;
159 let epoch_pubkey = *epoch_outcome.sharing().public();
160
161 if let Some(cache) = &cached
163 && start_epoch > cache.from_epoch
164 {
165 let cache_pubkey_bytes = hex::decode(&cache.identity)
166 .map_err(|_| IdentityProofError::MalformedData(cache.from_epoch))?;
167 let cache_pubkey =
168 <MinSig as Variant>::Public::read(&mut cache_pubkey_bytes.as_slice())
169 .map_err(|_| IdentityProofError::MalformedData(cache.from_epoch))?;
170
171 if cache_pubkey == epoch_pubkey {
172 let mut updated = cache.clone();
173 updated.from_epoch = start_epoch;
174
175 *self.identity_cache.write() = Some(updated);
176 return Ok(());
177 }
178 }
179
180 let mut transitions = Vec::new();
182 let mut pubkey = epoch_pubkey;
183 let mut search_epoch = start_epoch.saturating_sub(1);
184 while search_epoch > 0 {
185 if let Some(cache) = &cached
187 && search_epoch <= cache.from_epoch
188 {
189 transitions.extend(cache.transitions.iter().cloned());
190 search_epoch = cache.to_epoch;
191 break;
195 }
196
197 let prev_outcome = get_outcome(execution, epocher, search_epoch - 1)?;
198 let prev_pubkey = *prev_outcome.sharing().public();
199
200 if pubkey != prev_pubkey {
202 let height = epocher
203 .last(Epoch::new(search_epoch))
204 .expect("fixed epocher is valid for all epochs");
205
206 let Some(header) = execution
207 .provider
208 .sealed_header(height.get())
209 .ok()
210 .flatten()
211 else {
212 tracing::info!(
213 height = height.get(),
214 search_epoch,
215 "stopping identity transition walk early (header not available)"
216 );
217 break;
218 };
219
220 let Some(finalization) = marshal.get_finalization(height).await else {
221 tracing::info!(
222 height = height.get(),
223 search_epoch,
224 "stopping identity transition walk early (finalization pruned)"
225 );
226 break;
227 };
228
229 transitions.push(IdentityTransition {
230 transition_epoch: search_epoch,
231 old_identity: hex::encode(prev_pubkey.encode()),
232 new_identity: hex::encode(pubkey.encode()),
233 proof: Some(TransitionProofData {
234 header: TempoHeaderResponse::from_consensus_header(header, 0),
235 finalization_certificate: hex::encode(finalization.encode()),
236 }),
237 });
238 }
239
240 pubkey = prev_pubkey;
241 search_epoch -= 1;
242 }
243
244 if search_epoch == 0 {
246 let has_genesis = transitions
247 .last()
248 .is_some_and(|t| t.transition_epoch == 0 && t.proof.is_none());
249
250 if !has_genesis {
251 match get_outcome(execution, epocher, 0) {
252 Ok(genesis_outcome) => {
253 let genesis_pubkey = *genesis_outcome.sharing().public();
254 let genesis_identity = hex::encode(genesis_pubkey.encode());
255 transitions.push(IdentityTransition {
256 transition_epoch: 0,
257 old_identity: genesis_identity.clone(),
258 new_identity: genesis_identity,
259 proof: None,
260 });
261 }
262 Err(err) => {
263 tracing::debug!(
264 ?err,
265 "failed to fetch genesis outcome; omitting genesis marker"
266 );
267 }
268 }
269 }
270 }
271
272 let new_cache = if let Some(c) = &cached {
274 IdentityTransitionCache {
275 from_epoch: start_epoch.max(c.from_epoch),
276 to_epoch: search_epoch.min(c.to_epoch),
277 transitions: Arc::new(transitions),
278 identity: if start_epoch >= c.from_epoch {
279 hex::encode(epoch_pubkey.encode())
280 } else {
281 c.identity.clone()
282 },
283 }
284 } else {
285 IdentityTransitionCache {
286 from_epoch: start_epoch,
287 to_epoch: search_epoch,
288 identity: hex::encode(epoch_pubkey.encode()),
289 transitions: Arc::new(transitions),
290 }
291 };
292
293 *self.identity_cache.write() = Some(new_cache);
294 Ok(())
295 }
296}
297
298impl Default for FeedStateHandle {
299 fn default() -> Self {
300 Self::new()
301 }
302}
303
304impl std::fmt::Debug for FeedStateHandle {
305 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
306 let state = self.state.read();
307 f.debug_struct("FeedStateHandle")
308 .field("latest_notarized", &state.latest_notarized)
309 .field("latest_finalized", &state.latest_finalized)
310 .field("marshal_set", &self.marshal.get().is_some())
311 .field("execution_node_set", &self.execution_node.get().is_some())
312 .field("subscriber_count", &self.events_tx.receiver_count())
313 .finish()
314 }
315}
316
317impl ConsensusFeed for FeedStateHandle {
318 async fn get_finalization(&self, query: Query) -> Option<CertifiedBlock> {
319 match query {
320 Query::Latest => {
321 let block = self.state.read().latest_finalized.clone()?;
322 Some(block)
323 }
324 Query::Height(height) => {
325 let height = Height::new(height);
326 let marshal = self.marshal()?;
327
328 let finalization = marshal.get_finalization(height).await?;
329 let block = marshal.get_block(height).await?;
330
331 Some(CertifiedBlock {
332 epoch: finalization.proposal.round.epoch().get(),
333 view: finalization.proposal.round.view().get(),
334 block: block.into_inner().into_block(),
335 digest: finalization.proposal.payload.0,
336 certificate: hex::encode(finalization.encode()),
337 })
338 }
339 }
340 }
341
342 async fn get_latest(&self) -> ConsensusState {
343 let (finalized, mut notarized) = {
344 let state = self.state.read();
345 (
346 state.latest_finalized.clone(),
347 state.latest_notarized.clone(),
348 )
349 };
350
351 let finalized_round = finalized
352 .as_ref()
353 .map(|f| Round::new(Epoch::new(f.epoch), View::new(f.view)));
354
355 let notarized_round = notarized
356 .as_ref()
357 .map(|n| Round::new(Epoch::new(n.epoch), View::new(n.view)));
358
359 if finalized_round.is_some_and(|f| notarized_round.is_none_or(|n| n <= f)) {
361 notarized = None;
362 }
363
364 ConsensusState {
365 finalized,
366 notarized,
367 }
368 }
369
370 async fn subscribe(&self) -> Option<broadcast::Receiver<Event>> {
371 Some(self.events_tx.subscribe())
372 }
373
374 async fn get_identity_transition_proof(
375 &self,
376 from_epoch: Option<u64>,
377 full: bool,
378 ) -> Result<IdentityTransitionResponse, IdentityProofError> {
379 let Some((mut marshal, epocher)) = self.marshal().zip(self.epocher()) else {
380 return Err(IdentityProofError::NotReady);
381 };
382 let Some(execution_node) = self.execution_node.get() else {
383 return Err(IdentityProofError::NotReady);
384 };
385
386 let start_epoch = if let Some(epoch) = from_epoch {
388 epoch
389 } else {
390 marshal
391 .get_info(Identifier::Latest)
392 .await
393 .and_then(|(h, _)| epocher.containing(h))
394 .ok_or(IdentityProofError::NotReady)?
395 .epoch()
396 .get()
397 };
398
399 self.try_fill_transitions(&mut marshal, execution_node, &epocher, start_epoch)
401 .await?;
402
403 let cache = self
404 .identity_cache
405 .read()
406 .clone()
407 .ok_or(IdentityProofError::NotReady)?;
408
409 let transitions: Vec<_> = cache
411 .transitions
412 .iter()
413 .filter(|t| t.transition_epoch <= start_epoch)
414 .cloned()
415 .collect();
416
417 let identity = cache
421 .transitions
422 .iter()
423 .filter(|t| t.transition_epoch > start_epoch)
424 .last()
425 .map(|t| t.old_identity.clone())
426 .unwrap_or_else(|| cache.identity.clone());
427
428 let transitions = if full {
430 transitions
431 } else {
432 transitions
433 .into_iter()
434 .filter(|t| t.transition_epoch > 0)
435 .take(1)
436 .collect()
437 };
438
439 Ok(IdentityTransitionResponse {
440 identity,
441 transitions,
442 })
443 }
444}
445
446fn get_outcome(
448 execution: &TempoFullNode,
449 epocher: &FixedEpocher,
450 epoch: u64,
451) -> Result<OnchainDkgOutcome, IdentityProofError> {
452 let height = epocher
453 .last(Epoch::new(epoch))
454 .expect("fixed epocher is valid for all epochs");
455 let header = execution
456 .provider
457 .header_by_number(height.get())
458 .ok()
459 .flatten()
460 .ok_or(IdentityProofError::PrunedData(height.get()))?;
461 OnchainDkgOutcome::read(&mut header.extra_data().as_ref())
462 .map_err(|_| IdentityProofError::MalformedData(height.get()))
463}