tempo_commonware_node/feed/
actor.rs1use alloy_primitives::hex;
17use commonware_codec::Encode;
18use commonware_consensus::{
19 simplex::{scheme::bls12381_threshold::vrf::Scheme, types::Activity},
20 types::{Epoch, FixedEpocher, Round, View},
21};
22use commonware_cryptography::{bls12381::primitives::variant::MinSig, ed25519::PublicKey};
23use commonware_macros::select;
24use commonware_runtime::{ContextCell, Handle, Spawner, spawn_cell};
25use commonware_utils::channel::oneshot;
26use eyre::eyre;
27use futures::{FutureExt, StreamExt};
28use std::{
29 collections::BTreeMap,
30 future::Future,
31 pin::Pin,
32 task::{Context, Poll},
33 time::{SystemTime, UNIX_EPOCH},
34};
35use tempo_node::{
36 TempoFullNode,
37 rpc::consensus::{CertifiedBlock, Event},
38};
39use tracing::{error, info_span, instrument, warn, warn_span};
40
41use super::state::FeedStateHandle;
42use crate::{
43 alias::marshal,
44 consensus::{Digest, block::Block},
45 utils::OptionFuture,
46};
47
48pub(super) type FeedActivity = Activity<Scheme<PublicKey, MinSig>, Digest>;
50
51pub(super) type Receiver = futures::channel::mpsc::UnboundedReceiver<FeedActivity>;
53
54struct PendingSubscription {
58 round: Round,
59 activity: Option<FeedActivity>,
60 block_rx: oneshot::Receiver<Block>,
61}
62
63impl PendingSubscription {
64 fn new(round: Round, activity: FeedActivity, block_rx: oneshot::Receiver<Block>) -> Self {
65 Self {
66 round,
67 activity: Some(activity),
68 block_rx,
69 }
70 }
71}
72
73impl Future for PendingSubscription {
74 type Output = eyre::Result<(Round, FeedActivity, Block)>;
75
76 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
77 match self.block_rx.poll_unpin(cx) {
78 Poll::Ready(Ok(block)) => {
79 let activity = self.activity.take().expect("polled after completion");
80 Poll::Ready(Ok((self.round, activity, block)))
81 }
82 Poll::Ready(Err(_)) => Poll::Ready(Err(eyre::eyre!("block subscription cancelled"))),
83 Poll::Pending => Poll::Pending,
84 }
85 }
86}
87
88pub(crate) struct Actor<TContext> {
89 context: ContextCell<TContext>,
91 receiver: Receiver,
93 state: FeedStateHandle,
95 marshal: marshal::Mailbox,
97 pending: BTreeMap<Round, PendingSubscription>,
100}
101
102impl<TContext: Spawner> Actor<TContext> {
103 pub(crate) fn new(
107 context: TContext,
108 marshal: marshal::Mailbox,
109 epocher: FixedEpocher,
110 execution_node: TempoFullNode,
111 receiver: Receiver,
112 state: FeedStateHandle,
113 ) -> Self {
114 state.set_marshal(marshal.clone());
115 state.set_epocher(epocher);
116 state.set_execution_node(execution_node);
117
118 Self {
119 context: ContextCell::new(context),
120 receiver,
121 state,
122 marshal,
123 pending: BTreeMap::new(),
124 }
125 }
126
127 pub(crate) fn start(mut self) -> Handle<()> {
129 spawn_cell!(self.context, self.run().await)
130 }
131
132 async fn run(&mut self) {
137 let reason = loop {
138 let mut oldest = OptionFuture::from(self.pending.pop_first().map(|(_, p)| p));
141
142 select!(
143 result = &mut oldest => {
144 match result {
145 Ok((_, activity, block)) => self.handle_activity(activity, block),
146 Err(error) => warn_span!("feed_actor").in_scope(||
147 warn!(%error, "did not get pending block")
148 ),
149 }
150 },
151
152 activity = self.receiver.next() => {
153 let Some(activity) = activity else {
154 break eyre!("mailbox closed");
155 };
156
157 if let Some(p) = oldest.take() {
158 self.pending.insert(p.round, p);
159 }
160 self.subscribe(activity).await;
161 },
162 );
163 };
164
165 info_span!("feed_actor").in_scope(|| error!(%reason, "shutting down"));
166 }
167
168 async fn subscribe(&mut self, activity: FeedActivity) {
169 let (round, payload) = match &activity {
170 Activity::Notarization(n) => (n.proposal.round, n.proposal.payload),
171 Activity::Finalization(f) => (f.proposal.round, f.proposal.payload),
172 _ => return,
173 };
174
175 match &activity {
179 Activity::Finalization(_) => self.pending.retain(|&r, p| {
180 matches!(&p.activity, Some(Activity::Finalization(_))) || r > round
181 }),
182 Activity::Notarization(_)
183 if self
184 .state
185 .read()
186 .latest_finalized
187 .as_ref()
188 .map(|f| Round::new(Epoch::new(f.epoch), View::new(f.view)))
189 .is_none_or(|f| f < round) => {}
190
191 _ => return,
192 }
193
194 let block_rx = self.marshal.subscribe_by_digest(Some(round), payload).await;
195 let pending = PendingSubscription::new(round, activity, block_rx);
196 self.pending.insert(round, pending);
197 }
198
199 #[instrument(skip_all, fields(activity = ?activity))]
200 fn handle_activity(&self, activity: FeedActivity, consensus_block: Block) {
201 let block = consensus_block.into_inner().into_block();
202 let (round, digest, certificate) = match activity.clone() {
203 Activity::Notarization(notarization) => (
204 notarization.proposal.round,
205 notarization.proposal.payload.0,
206 notarization.encode(),
207 ),
208 Activity::Finalization(finalization) => (
209 finalization.proposal.round,
210 finalization.proposal.payload.0,
211 finalization.encode(),
212 ),
213 _ => return,
214 };
215
216 let certified = CertifiedBlock {
217 epoch: round.epoch().get(),
218 view: round.view().get(),
219 block,
220 digest,
221 certificate: hex::encode(certificate),
222 };
223
224 let mut state = self.state.write();
225 let latest_finalized_round = state
226 .latest_finalized
227 .as_ref()
228 .map(|b| Round::new(Epoch::new(b.epoch), View::new(b.view)));
229 let latest_notarized_round = state
230 .latest_notarized
231 .as_ref()
232 .map(|b| Round::new(Epoch::new(b.epoch), View::new(b.view)));
233
234 match activity {
236 Activity::Notarization(_) => {
237 let _ = self.state.events_tx().send(Event::Notarized {
238 block: certified.clone(),
239 seen: now_millis(),
240 });
241
242 if latest_finalized_round.is_none_or(|r| r < round)
243 && latest_notarized_round.is_none_or(|r| r < round)
244 {
245 state.latest_notarized = Some(certified);
246 }
247 }
248
249 Activity::Finalization(_) => {
250 let _ = self.state.events_tx().send(Event::Finalized {
251 block: certified.clone(),
252 seen: now_millis(),
253 });
254
255 if latest_finalized_round.is_none_or(|r| r < round) {
256 if latest_notarized_round.is_none_or(|r| r < round) {
257 state.latest_notarized = None;
258 }
259
260 state.latest_finalized = Some(certified);
261 }
262 }
263 _ => {}
264 }
265 }
266}
267
268fn now_millis() -> u64 {
270 SystemTime::now()
271 .duration_since(UNIX_EPOCH)
272 .map(|d| d.as_millis() as u64)
273 .unwrap_or(0)
274}