tempo_consensus/follow/upstream/
actor.rs1use std::{sync::Arc, time::Duration};
2
3use commonware_consensus::{Reporter, types::Height};
4use commonware_runtime::{Clock, ContextCell, Metrics, Spawner, spawn_cell};
5use eyre::{Report, WrapErr as _};
6use futures::{
7 FutureExt as _, StreamExt as _,
8 future::{BoxFuture, Either},
9 stream::{self, Fuse, FusedStream},
10};
11use jsonrpsee::{
12 core::{client, client::Subscription},
13 ws_client::{WsClient, WsClientBuilder},
14};
15use rand_08::Rng as _;
16use tempo_node::rpc::consensus::{CertifiedBlock, Event, Query, TempoConsensusApiClient};
17use tempo_telemetry_util::display_duration;
18use tokio::{
19 select,
20 sync::{mpsc, oneshot},
21};
22use tracing::{debug, debug_span, instrument, warn, warn_span};
23
24use crate::utils::OptionFuture;
25
26pub(super) type EventStream =
27 Either<stream::Empty<Result<Event, serde_json::Error>>, Fuse<Subscription<Event>>>;
28
29const RECONNECT_BACKOFF_FACTOR: u64 = 2;
30const RECONNECT_MAX_BACKOFF: Duration = Duration::from_secs(20);
31const RECONNECT_JITTER: Duration = Duration::from_secs(1);
32
33pub(crate) struct Actor<TContext> {
38 pub(super) context: ContextCell<TContext>,
39 pub(super) connection: Option<Arc<WsClient>>,
40 pub(super) mailbox: mpsc::UnboundedReceiver<super::ingress::Message>,
41 pub(super) url: &'static str,
42 pub(super) pending_connect: OptionFuture<BoxFuture<'static, (u64, eyre::Result<WsClient>)>>,
43 pub(super) pending_stream:
44 OptionFuture<BoxFuture<'static, Result<Subscription<Event>, client::Error>>>,
45 pub(super) event_stream: EventStream,
46 pub(super) waiters: Vec<(Height, oneshot::Sender<Option<CertifiedBlock>>)>,
48}
49
50impl<TContext> Actor<TContext>
51where
52 TContext: Clock + Metrics + Spawner,
53{
54 pub(crate) fn start(
55 mut self,
56 reporter: impl Reporter<Activity = Event>,
57 ) -> commonware_runtime::Handle<()> {
58 spawn_cell!(self.context, self.run(reporter))
59 }
60
61 async fn run(mut self, mut reporter: impl Reporter<Activity = Event>) {
62 loop {
63 self.reconnect_or_resubscribe();
64 self.drain_waiters();
65
66 select!(
67 biased;
68
69 (attempts, client) = &mut self.pending_connect => {
70 match client {
71 Ok(client) => {
72 let client = Arc::new(client);
73 self.connection.replace(client);
74 }
75 Err(reason) => {
76 let reconnect_in = reconnect_delay(attempts);
77 warn_span!("reconnect").in_scope(|| warn!(
78 %reason,
79 attempts,
80 reconnect_in = %display_duration(reconnect_in),
81 url = self.url,
82 "connecting to upstream node failed, attempting again",
83 ));
84 self.pending_connect.replace({
85 let context = self.context.clone();
86 let url = self.url;
87 async move {
88 context.sleep(reconnect_in).await;
89 connect(url, attempts.saturating_add(1)).await
90 }.boxed()
91 });
92 }
93 }
94 }
95
96 stream = &mut self.pending_stream => {
97 match stream {
98 Ok(stream) => {
99 debug_span!("consensus_event_subscription")
100 .in_scope(|| debug!("subscription for consensus events established"));
101 self.event_stream = active_event_stream(stream);
102 }
103 Err(error) => {
104 warn_span!("event_subscription").in_scope(|| warn!(
105 reason = %Report::new(error),
106 "failed subscribing to events; reconnecting to upstream node"
107 ));
108 self.connection.take();
109 self.event_stream = inactive_event_stream();
110 }
111 }
112 }
113
114 event = self.event_stream.next(), if !self.event_stream.is_terminated() => {
115 match event {
116 Some(Ok(event)) => {
117 debug_span!("consensus_event").in_scope(|| debug!(
118 ?event, "received consensus event, forwarding to reporter"
119 ));
120 reporter.report(event).await;
121 }
122 Some(Err(error)) => {
123 warn_span!("event").in_scope(|| warn!(
124 %error,
125 "event stream encountered an error",
126 ));
127 self.event_stream = inactive_event_stream();
128 }
129 None => {
130 warn_span!("event_subscription").in_scope(|| warn!(
131 url = self.url,
132 "event stream terminated",
133 ));
134 self.event_stream = inactive_event_stream();
135 }
136 }
137 }
138
139 Some(msg) = self.mailbox.recv() => {
140 match msg {
141 super::ingress::Message::GetFinalization { height, response, } => {
142 self.waiters.push((height, response));
143 }
144 }
145 }
146 );
147 }
148 }
149
150 #[instrument(skip_all)]
151 fn reconnect_or_resubscribe(&mut self) {
152 if self.pending_connect.is_some() || self.pending_stream.is_some() {
153 return;
154 }
155
156 let Some(client) = self.connection.clone() else {
157 self.pending_connect.replace(connect(self.url, 1));
158 return;
159 };
160
161 if !self.event_stream.is_terminated() {
162 return;
163 }
164
165 if client.is_connected() {
166 self.pending_stream.replace(subscribe(client));
167 } else {
168 warn!(url = self.url, "upstream client disconnected, reconnecting");
169 self.connection.take();
170 self.pending_connect.replace(connect(self.url, 1));
171 }
172 }
173
174 fn drain_waiters(&mut self) {
178 if self.pending_connect.is_some()
179 || self.pending_stream.is_some()
180 || self.event_stream.is_terminated()
181 {
182 return;
183 }
184
185 let Some(client) = &self.connection else {
186 return;
187 };
188 if !client.is_connected() {
189 return;
190 }
191
192 for (height, response) in self.waiters.drain(..) {
193 let client = client.clone();
194 self.context
195 .with_label("get_finalization")
196 .spawn(move |_| get_finalization(client, height, response));
197 }
198 }
199}
200
201fn connect(url: &'static str, attempts: u64) -> BoxFuture<'static, (u64, eyre::Result<WsClient>)> {
202 async move {
203 (
204 attempts,
205 WsClientBuilder::default()
206 .build(&url)
207 .await
208 .map_err(Report::new),
209 )
210 }
211 .boxed()
212}
213
214fn subscribe(
215 client: Arc<WsClient>,
216) -> BoxFuture<'static, Result<Subscription<Event>, client::Error>> {
217 async move { client.subscribe_events().await }.boxed()
218}
219
220pub(super) fn inactive_event_stream() -> EventStream {
221 Either::Left(stream::empty())
222}
223
224fn active_event_stream(stream: Subscription<Event>) -> EventStream {
225 Either::Right(stream.fuse())
226}
227
228fn reconnect_delay(attempts: u64) -> Duration {
229 reconnect_backoff(attempts) + random_jitter()
230}
231
232fn reconnect_backoff(attempts: u64) -> Duration {
233 let backoff_secs = attempts.saturating_mul(RECONNECT_BACKOFF_FACTOR);
234 let backoff = Duration::from_secs(backoff_secs);
235
236 backoff.min(RECONNECT_MAX_BACKOFF)
237}
238
239fn random_jitter() -> Duration {
240 let max_jitter_millis = RECONNECT_JITTER.as_millis() as u64;
241 Duration::from_millis(rand_08::thread_rng().gen_range(0..=max_jitter_millis))
242}
243
244#[instrument(skip_all, fields(%height), err)]
245async fn get_finalization(
246 client: Arc<WsClient>,
247 height: Height,
248 response: oneshot::Sender<Option<CertifiedBlock>>,
249) -> eyre::Result<()> {
250 let finalization = client
253 .get_finalization(Query::Height(height.get()))
254 .await
255 .wrap_err("failed getting finalization")?;
256 response
257 .send(Some(finalization))
258 .map_err(|_| eyre::eyre!("receiver went away"))
259}
260
261#[cfg(test)]
262mod tests {
263 use super::*;
264
265 #[test]
266 fn reconnect_backoff_linearly_increases_and_caps() {
267 assert_eq!(reconnect_backoff(0), Duration::from_secs(0));
268 assert_eq!(reconnect_backoff(1), Duration::from_secs(2));
269 assert_eq!(reconnect_backoff(2), Duration::from_secs(4));
270 assert_eq!(reconnect_backoff(3), Duration::from_secs(6));
271 assert_eq!(reconnect_backoff(4), Duration::from_secs(8));
272 assert_eq!(reconnect_backoff(5), Duration::from_secs(10));
273 assert_eq!(reconnect_backoff(10), RECONNECT_MAX_BACKOFF);
274 assert_eq!(reconnect_backoff(u64::MAX), RECONNECT_MAX_BACKOFF);
275 }
276}