Skip to main content

tempo_commonware_node/follow/upstream/
mod.rs

1//! Actors to communicate with the upstream node.
2//!
3//! Maintains a regular connection to an upstream node over websocker
4//! or `in_process::Actor` as an in-process actor working off of channels.
5
6use commonware_consensus::Reporter;
7use commonware_runtime::{Clock, ContextCell, Metrics, Spawner};
8use futures::{stream, stream::StreamExt as _};
9use tempo_node::rpc::consensus::Event;
10use tokio::sync::mpsc;
11
12use crate::utils::OptionFuture;
13
14mod actor;
15pub mod in_process;
16mod ingress;
17
18pub(crate) use actor::Actor;
19pub use ingress::Mailbox;
20
21/// An actor that can be started with reporters that receive consensus RPC events.
22pub trait UpstreamActor: Send + 'static {
23    fn start(self, reporter: impl Reporter<Activity = Event>) -> commonware_runtime::Handle<()>;
24}
25
26impl<TContext> UpstreamActor for Actor<TContext>
27where
28    TContext: Clock + Metrics + Spawner,
29{
30    fn start(self, reporter: impl Reporter<Activity = Event>) -> commonware_runtime::Handle<()> {
31        self.start(reporter)
32    }
33}
34
35impl<TContext> UpstreamActor for in_process::Actor<TContext>
36where
37    TContext: Clock + Metrics + Spawner,
38{
39    fn start(self, reporter: impl Reporter<Activity = Event>) -> commonware_runtime::Handle<()> {
40        self.start(reporter)
41    }
42}
43
44pub(crate) fn init<TContext>(
45    context: TContext,
46    config: Config,
47) -> (Actor<TContext>, ingress::Mailbox) {
48    let (tx, rx) = mpsc::unbounded_channel();
49    let mailbox = ingress::Mailbox::new(tx);
50
51    let url = Box::leak(Box::<str>::from(config.upstream_url));
52    let actor = Actor {
53        context: ContextCell::new(context),
54        connection: None,
55        mailbox: rx,
56        url,
57        pending_connect: OptionFuture::none(),
58        pending_stream: OptionFuture::none(),
59        event_stream: stream::empty::<Result<Event, serde_json::Error>>()
60            .boxed()
61            .fuse(),
62        waiters: Vec::new(),
63    };
64
65    (actor, mailbox)
66}
67
68pub(crate) struct Config {
69    /// The URL to connect to.
70    pub(crate) upstream_url: String,
71}