tempo_commonware_node/follow/upstream/
actor.rs1use std::{sync::Arc, time::Duration};
2
3use commonware_consensus::{Reporter, types::Height};
4use commonware_runtime::{Clock, ContextCell, Metrics, Spawner, spawn_cell};
5use eyre::{Report, WrapErr as _};
6use futures::{
7 FutureExt as _, StreamExt as _,
8 future::BoxFuture,
9 stream::{BoxStream, Fuse},
10};
11use jsonrpsee::{
12 core::{client, client::Subscription},
13 ws_client::{WsClient, WsClientBuilder},
14};
15use tempo_node::rpc::consensus::{CertifiedBlock, Event, Query, TempoConsensusApiClient};
16use tempo_telemetry_util::display_duration;
17use tokio::{
18 select,
19 sync::{mpsc, oneshot},
20};
21use tracing::{debug, debug_span, instrument, warn, warn_span};
22
23use crate::utils::OptionFuture;
24
25type EventStream = Fuse<BoxStream<'static, Result<Event, serde_json::Error>>>;
26
27pub(crate) struct Actor<TContext> {
32 pub(super) context: ContextCell<TContext>,
33 pub(super) connection: Option<Arc<WsClient>>,
34 pub(super) mailbox: mpsc::UnboundedReceiver<super::ingress::Message>,
35 pub(super) url: &'static str,
36 pub(super) pending_connect: OptionFuture<BoxFuture<'static, (u64, eyre::Result<WsClient>)>>,
37 pub(super) pending_stream:
38 OptionFuture<BoxFuture<'static, Result<Subscription<Event>, client::Error>>>,
39 pub(super) event_stream: EventStream,
40 pub(super) waiters: Vec<(Height, oneshot::Sender<Option<CertifiedBlock>>)>,
42}
43
44impl<TContext> Actor<TContext>
45where
46 TContext: Clock + Metrics + Spawner,
47{
48 pub(crate) fn start(
49 mut self,
50 reporter: impl Reporter<Activity = Event>,
51 ) -> commonware_runtime::Handle<()> {
52 spawn_cell!(self.context, self.run(reporter))
53 }
54
55 async fn run(mut self, mut reporter: impl Reporter<Activity = Event>) {
56 self.pending_connect.replace({
57 let url = self.url;
58 async move {
59 (
60 1,
61 WsClientBuilder::default()
62 .build(&url)
63 .await
64 .map_err(Report::new),
65 )
66 }
67 .boxed()
68 });
69 loop {
70 select!(
71 biased;
72
73 error = OptionFuture::new(self.connection.as_ref().map(|c| c.on_disconnect()))
74 => {
75 warn_span!("connection").in_scope(|| warn!(
76 reason = %Report::new(error),
77 url = self.url,
78 "connection to upstream node disconnected, attempting reconnect",
79 ));
80 self.connection.take();
81 self.pending_stream.take();
82 self.pending_connect.replace({
83 let url = self.url;
84 async move {
85 (1, WsClientBuilder::default().build(&url).await.map_err(Report::new))
86 }.boxed()
87 });
88 }
89
90 (attempts, client) = &mut self.pending_connect => {
91 match client {
92 Ok(client) => {
93 let client = Arc::new(client);
94 self.connection.replace(client.clone());
95 self.pending_stream.replace(async move {
96 client.subscribe_events().await
97 }.boxed());
98 }
99 Err(reason) => {
100 let reconnect_in = Duration::from_secs(attempts.saturating_mul(1).min(20));
101 warn_span!("reconnect").in_scope(|| warn!(
102 %reason,
103 attempts,
104 reconnect_in = %display_duration(reconnect_in),
105 url = self.url,
106 "connecting to upstream node failed, attempting again",
107 ));
108 self.pending_connect.replace({
109 let context = self.context.clone();
110 let url = self.url;
111 async move {
112 context.sleep(reconnect_in).await;
113 (1, WsClientBuilder::default().build(&url).await.map_err(Report::new))
114 }.boxed()
115 });
116 }
117 }
118 }
119
120 stream = &mut self.pending_stream => {
121 match stream {
122 Ok(stream) => {
123 debug_span!("consensus_event_subscription")
124 .in_scope(|| debug!("subscription for consensus events established"));
125 self.event_stream = stream.boxed().fuse();
126 }
127 Err(error) => {
128 warn_span!("event_subscription").in_scope(|| warn!(
129 reason = %Report::new(error),
130 "failed subscribing to events; retrying"
131 ));
132 if let Some(client) = self.connection.clone() {
133 self.pending_stream.replace(async move {
134 client.subscribe_events().await
135 }.boxed());
136 }
137 }
138 }
139 }
140
141 Some(event) = self.event_stream.next() => {
142 debug_span!("consensus_event").in_scope(|| debug!(
143 ?event, "received consensus event, forwarding to reporter"
144 ));
145 match event {
146 Ok(event) => reporter.report(event).await,
147 Err(error) => warn_span!("event").in_scope(|| warn!(
148 %error,
149 "event stream encountered an error",
150 )),
151 }
152 }
153
154 Some(msg) = self.mailbox.recv() => {
155 match msg {
156 super::ingress::Message::GetFinalization { height, response, } => {
157 self.waiters.push((height, response));
158 }
159 }
160 }
161 );
162
163 if let Some(client) = &self.connection {
164 for (height, response) in self.waiters.drain(..) {
165 let client = client.clone();
166 self.context
167 .with_label("get_finalization")
168 .spawn(move |_| get_finalization(client, height, response));
169 }
170 }
171 }
172 }
173}
174
175#[instrument(skip_all, fields(%height), err)]
176async fn get_finalization(
177 client: Arc<WsClient>,
178 height: Height,
179 response: oneshot::Sender<Option<CertifiedBlock>>,
180) -> eyre::Result<()> {
181 let finalization = client
184 .get_finalization(Query::Height(height.get()))
185 .await
186 .wrap_err("failed getting finalization")?;
187 response
188 .send(Some(finalization))
189 .map_err(|_| eyre::eyre!("receiver went away"))
190}