tempo_commonware_node/follow/upstream/
mod.rs1use commonware_consensus::Reporter;
7use commonware_runtime::{Clock, ContextCell, Metrics, Spawner};
8use futures::{stream, stream::StreamExt as _};
9use tempo_node::rpc::consensus::Event;
10use tokio::sync::mpsc;
11
12use crate::utils::OptionFuture;
13
14mod actor;
15pub mod in_process;
16mod ingress;
17
18pub(crate) use actor::Actor;
19pub use ingress::Mailbox;
20
21pub trait UpstreamActor: Send + 'static {
23 fn start(self, reporter: impl Reporter<Activity = Event>) -> commonware_runtime::Handle<()>;
24}
25
26impl<TContext> UpstreamActor for Actor<TContext>
27where
28 TContext: Clock + Metrics + Spawner,
29{
30 fn start(self, reporter: impl Reporter<Activity = Event>) -> commonware_runtime::Handle<()> {
31 self.start(reporter)
32 }
33}
34
35impl<TContext> UpstreamActor for in_process::Actor<TContext>
36where
37 TContext: Clock + Metrics + Spawner,
38{
39 fn start(self, reporter: impl Reporter<Activity = Event>) -> commonware_runtime::Handle<()> {
40 self.start(reporter)
41 }
42}
43
44pub(crate) fn init<TContext>(
45 context: TContext,
46 config: Config,
47) -> (Actor<TContext>, ingress::Mailbox) {
48 let (tx, rx) = mpsc::unbounded_channel();
49 let mailbox = ingress::Mailbox::new(tx);
50
51 let url = Box::leak(Box::<str>::from(config.upstream_url));
52 let actor = Actor {
53 context: ContextCell::new(context),
54 connection: None,
55 mailbox: rx,
56 url,
57 pending_connect: OptionFuture::none(),
58 pending_stream: OptionFuture::none(),
59 event_stream: stream::empty::<Result<Event, serde_json::Error>>()
60 .boxed()
61 .fuse(),
62 waiters: Vec::new(),
63 };
64
65 (actor, mailbox)
66}
67
68pub(crate) struct Config {
69 pub(crate) upstream_url: String,
71}