tempo_commonware_node/follow/upstream/
in_process.rs1use std::{sync::Arc, time::Duration};
6
7use commonware_consensus::{Reporter, types::Height};
8use commonware_runtime::{Clock, ContextCell, Metrics, Spawner, spawn_cell};
9use futures::{
10 FutureExt as _, StreamExt as _,
11 stream::{self, BoxStream, Fuse},
12};
13use tempo_node::{
14 TempoFullNode,
15 rpc::consensus::{CertifiedBlock, ConsensusFeed as _, Event, Query},
16};
17use tokio::{
18 select,
19 sync::{mpsc, oneshot},
20};
21use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
22use tracing::{debug, debug_span, info, instrument};
23
24use crate::{feed::FeedStateHandle, utils::OptionFuture};
25
26use super::ingress::{Mailbox, Message};
27
28pub struct Config {
29 pub execution_node: Arc<TempoFullNode>,
30 pub feed: FeedStateHandle,
31}
32
33pub fn init<TContext>(context: TContext, config: Config) -> (Actor<TContext>, Mailbox) {
34 let (tx, rx) = mpsc::unbounded_channel();
35 let mailbox = Mailbox::new(tx);
36
37 let actor = Actor {
38 context: ContextCell::new(context),
39 config,
40 event_stream: stream::empty::<Result<Event, BroadcastStreamRecvError>>()
41 .boxed()
42 .fuse(),
43 mailbox: rx,
44 waiters: Vec::new(),
45 };
46 (actor, mailbox)
47}
48
49pub struct Actor<TContext> {
50 context: ContextCell<TContext>,
51 config: Config,
52 event_stream: Fuse<BoxStream<'static, Result<Event, BroadcastStreamRecvError>>>,
53 mailbox: mpsc::UnboundedReceiver<Message>,
54
55 waiters: Vec<(Height, oneshot::Sender<Option<CertifiedBlock>>)>,
56}
57
58impl<TContext> Actor<TContext>
59where
60 TContext: Clock + Metrics + Spawner,
61{
62 pub(crate) fn start(
63 mut self,
64 reporter: impl Reporter<Activity = Event>,
65 ) -> commonware_runtime::Handle<()> {
66 spawn_cell!(self.context, self.run(reporter))
67 }
68
69 async fn run(mut self, mut reporter: impl Reporter<Activity = Event>) {
70 let feed = self.config.feed.clone();
71 let context = self.context.clone();
72 let mut pending_subscription = OptionFuture::some(
73 async move {
74 loop {
75 if let Some(subscription) = feed.subscribe().await {
76 break subscription;
77 }
78 info!("feed state not yet ready, retrying in 1s");
79 context.sleep(Duration::from_secs(1)).await;
80 }
81 }
82 .boxed(),
83 );
84 let mut connected = false;
85 loop {
86 select!(
87 biased;
88
89 stream = &mut pending_subscription, if pending_subscription.is_some() => {
90 debug_span!("consensus_event_subscription")
91 .in_scope(|| debug!("subscription for consensus events established"));
92 self.event_stream = tokio_stream::wrappers::BroadcastStream::new(stream).boxed().fuse();
93 connected = true;
94 }
95
96 Some(event) = self.event_stream.next() => {
97 debug_span!("consensus_event").in_scope(|| debug!(
98 ?event, "received consensus event, forwarding to reporter"
99 ));
100 match event {
101 Ok(event) => reporter.report(event).await,
102 Err(BroadcastStreamRecvError::Lagged(events_skipped)) => {
103 debug_span!("subscription").in_scope(|| debug!(
104 events_skipped,
105 "lagged behind and skipped some events",
106 ));
107 },
108 }
109 }
110
111 Some(msg) = self.mailbox.recv() => {
112 match msg {
113 super::ingress::Message::GetFinalization { height, response, } => {
114 self.waiters.push((height, response));
115 }
116 }
117 }
118 );
119 if connected {
120 for (height, response) in self.waiters.drain(..) {
121 let feed = self.config.feed.clone();
122 self.context
123 .with_label("get_finalization")
124 .spawn(move |_| get_finalization(feed, height, response));
125 }
126 }
127 }
128 }
129}
130
131#[instrument(skip_all, fields(%height), err)]
132async fn get_finalization(
133 client: FeedStateHandle,
134 height: Height,
135 response: oneshot::Sender<Option<CertifiedBlock>>,
136) -> eyre::Result<()> {
137 let finalization = match client.get_finalization(Query::Height(height.get())).await {
140 tempo_node::rpc::consensus::types::Response::Success(val) => Some(val),
141 tempo_node::rpc::consensus::types::Response::NotReady => {
142 panic!("for in-process execution the feed should be immediately available")
143 }
144 tempo_node::rpc::consensus::types::Response::Missing(_) => None,
145 };
146 response
147 .send(finalization)
148 .map_err(|_| eyre::eyre!("receiver went away"))
149}