Skip to main content

tempo_commonware_node/feed/
actor.rs

1//! Feed actor implementation.
2//!
3//! This actor:
4//! - Receives consensus activity (notarizations, finalizations)
5//! - Updates shared state (accessible by RPC handlers)
6//! - Broadcasts events to subscribers
7//!
8//! Block resolution uses [`marshal::Mailbox::subscribe_by_digest`] to wait for the block
9//! to become available, avoiding a race where the block hasn't been stored yet
10//! when the activity arrives.
11//!
12//! The actor always polls the oldest (lowest-round) pending subscription so
13//! that events are emitted in order. Notarizations are dropped when a finalization
14//!  at a higher-or-equal round is pending, since the finalization supersedes them.
15
16use alloy_primitives::hex;
17use commonware_codec::Encode;
18use commonware_consensus::{
19    simplex::{scheme::bls12381_threshold::vrf::Scheme, types::Activity},
20    types::{Epoch, FixedEpocher, Round, View},
21};
22use commonware_cryptography::{bls12381::primitives::variant::MinSig, ed25519::PublicKey};
23use commonware_macros::select;
24use commonware_runtime::{ContextCell, Handle, Spawner, spawn_cell};
25use commonware_utils::channel::oneshot;
26use eyre::eyre;
27use futures::{FutureExt, StreamExt};
28use std::{
29    collections::BTreeMap,
30    future::Future,
31    pin::Pin,
32    task::{Context, Poll},
33    time::{SystemTime, UNIX_EPOCH},
34};
35use tempo_node::{
36    TempoFullNode,
37    rpc::consensus::{CertifiedBlock, Event},
38};
39use tracing::{error, info_span, instrument, warn, warn_span};
40
41use super::state::FeedStateHandle;
42use crate::{
43    alias::marshal,
44    consensus::{Digest, block::Block},
45    utils::OptionFuture,
46};
47
48/// Type alias for the activity type used by the feed actor.
49pub(super) type FeedActivity = Activity<Scheme<PublicKey, MinSig>, Digest>;
50
51/// Receiver for activity messages.
52pub(super) type Receiver = futures::channel::mpsc::UnboundedReceiver<FeedActivity>;
53
54/// A pending block subscription paired with its originating activity.
55///
56/// Resolves to `(Round, FeedActivity, Block)` when the block becomes available.
57struct PendingSubscription {
58    round: Round,
59    activity: Option<FeedActivity>,
60    block_rx: oneshot::Receiver<Block>,
61}
62
63impl PendingSubscription {
64    fn new(round: Round, activity: FeedActivity, block_rx: oneshot::Receiver<Block>) -> Self {
65        Self {
66            round,
67            activity: Some(activity),
68            block_rx,
69        }
70    }
71}
72
73impl Future for PendingSubscription {
74    type Output = eyre::Result<(Round, FeedActivity, Block)>;
75
76    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
77        match self.block_rx.poll_unpin(cx) {
78            Poll::Ready(Ok(block)) => {
79                let activity = self.activity.take().expect("polled after completion");
80                Poll::Ready(Ok((self.round, activity, block)))
81            }
82            Poll::Ready(Err(_)) => Poll::Ready(Err(eyre::eyre!("block subscription cancelled"))),
83            Poll::Pending => Poll::Pending,
84        }
85    }
86}
87
88pub(crate) struct Actor<TContext> {
89    /// Runtime context.
90    context: ContextCell<TContext>,
91    /// Receiver for activity messages.
92    receiver: Receiver,
93    /// Shared state handle.
94    state: FeedStateHandle,
95    /// Marshal mailbox for block lookups.
96    marshal: marshal::Mailbox,
97    /// Pending block subscriptions keyed by round. Since finalizations
98    /// must be delivered, pending subscriptions are bound by the marshal.
99    pending: BTreeMap<Round, PendingSubscription>,
100}
101
102impl<TContext: Spawner> Actor<TContext> {
103    /// Create a new feed actor.
104    ///
105    /// The actor receives Activity messages via `receiver` and updates the shared `state`.
106    pub(crate) fn new(
107        context: TContext,
108        marshal: marshal::Mailbox,
109        epocher: FixedEpocher,
110        execution_node: TempoFullNode,
111        receiver: Receiver,
112        state: FeedStateHandle,
113    ) -> Self {
114        state.set_marshal(marshal.clone());
115        state.set_epocher(epocher);
116        state.set_execution_node(execution_node);
117
118        Self {
119            context: ContextCell::new(context),
120            receiver,
121            state,
122            marshal,
123            pending: BTreeMap::new(),
124        }
125    }
126
127    /// Start the actor, returning a handle to the spawned task.
128    pub(crate) fn start(mut self) -> Handle<()> {
129        spawn_cell!(self.context, self.run().await)
130    }
131
132    /// Run the actor's main loop.
133    ///
134    /// The loop races the oldest pending block subscription
135    /// against incoming activity so events are emitted in order.
136    async fn run(&mut self) {
137        let reason = loop {
138            // We need a mutable reference to poll pending subscription. Thus if a new activity arrives,
139            // we also need to re-insert this popped subscription.
140            let mut oldest = OptionFuture::from(self.pending.pop_first().map(|(_, p)| p));
141
142            select!(
143                result = &mut oldest => {
144                    match result {
145                        Ok((_, activity, block)) => self.handle_activity(activity, block),
146                        Err(error) => warn_span!("feed_actor").in_scope(||
147                            warn!(%error, "did not get pending block")
148                        ),
149                    }
150                },
151
152                activity = self.receiver.next() => {
153                    let Some(activity) = activity else {
154                        break eyre!("mailbox closed");
155                    };
156
157                    if let Some(p) = oldest.take() {
158                        self.pending.insert(p.round, p);
159                    }
160                    self.subscribe(activity).await;
161                },
162            );
163        };
164
165        info_span!("feed_actor").in_scope(|| error!(%reason, "shutting down"));
166    }
167
168    async fn subscribe(&mut self, activity: FeedActivity) {
169        let (round, payload) = match &activity {
170            Activity::Notarization(n) => (n.proposal.round, n.proposal.payload),
171            Activity::Finalization(f) => (f.proposal.round, f.proposal.payload),
172            _ => return,
173        };
174
175        // Prune & filter incoming activity.
176        // - Incoming Finalization. Prune older subscriptions as we only care about latest information
177        // - Incoming Notarization. Only accept if ahead of the latest Finalization.
178        match &activity {
179            Activity::Finalization(_) => self.pending.retain(|&r, p| {
180                matches!(&p.activity, Some(Activity::Finalization(_))) || r > round
181            }),
182            Activity::Notarization(_)
183                if self
184                    .state
185                    .read()
186                    .latest_finalized
187                    .as_ref()
188                    .map(|f| Round::new(Epoch::new(f.epoch), View::new(f.view)))
189                    .is_none_or(|f| f < round) => {}
190
191            _ => return,
192        }
193
194        let block_rx = self.marshal.subscribe_by_digest(Some(round), payload).await;
195        let pending = PendingSubscription::new(round, activity, block_rx);
196        self.pending.insert(round, pending);
197    }
198
199    #[instrument(skip_all, fields(activity = ?activity))]
200    fn handle_activity(&self, activity: FeedActivity, consensus_block: Block) {
201        let block = consensus_block.into_inner().into_block();
202        let (round, digest, certificate) = match activity.clone() {
203            Activity::Notarization(notarization) => (
204                notarization.proposal.round,
205                notarization.proposal.payload.0,
206                notarization.encode(),
207            ),
208            Activity::Finalization(finalization) => (
209                finalization.proposal.round,
210                finalization.proposal.payload.0,
211                finalization.encode(),
212            ),
213            _ => return,
214        };
215
216        let certified = CertifiedBlock {
217            epoch: round.epoch().get(),
218            view: round.view().get(),
219            block,
220            digest,
221            certificate: hex::encode(certificate),
222        };
223
224        let mut state = self.state.write();
225        let latest_finalized_round = state
226            .latest_finalized
227            .as_ref()
228            .map(|b| Round::new(Epoch::new(b.epoch), View::new(b.view)));
229        let latest_notarized_round = state
230            .latest_notarized
231            .as_ref()
232            .map(|b| Round::new(Epoch::new(b.epoch), View::new(b.view)));
233
234        // Update state and broadcast events
235        match activity {
236            Activity::Notarization(_) => {
237                let _ = self.state.events_tx().send(Event::Notarized {
238                    block: certified.clone(),
239                    seen: now_millis(),
240                });
241
242                if latest_finalized_round.is_none_or(|r| r < round)
243                    && latest_notarized_round.is_none_or(|r| r < round)
244                {
245                    state.latest_notarized = Some(certified);
246                }
247            }
248
249            Activity::Finalization(_) => {
250                let _ = self.state.events_tx().send(Event::Finalized {
251                    block: certified.clone(),
252                    seen: now_millis(),
253                });
254
255                if latest_finalized_round.is_none_or(|r| r < round) {
256                    if latest_notarized_round.is_none_or(|r| r < round) {
257                        state.latest_notarized = None;
258                    }
259
260                    state.latest_finalized = Some(certified);
261                }
262            }
263            _ => {}
264        }
265    }
266}
267
268/// Get current Unix timestamp in milliseconds.
269fn now_millis() -> u64 {
270    SystemTime::now()
271        .duration_since(UNIX_EPOCH)
272        .map(|d| d.as_millis() as u64)
273        .unwrap_or(0)
274}