subscription_service/
lib.rs1#![forbid(unsafe_code)]
9
10use anyhow::Result;
13use channel::{
14 diem_channel::{self, Receiver, Sender},
15 message_queues::QueueStyle,
16};
17use diem_types::{
18 event::EventKey,
19 on_chain_config::{ConfigID, OnChainConfigPayload},
20};
21use std::collections::HashSet;
22
23pub struct SubscriptionService<T, U> {
24 pub name: String,
25 subscribed_items: T,
26 sender: Sender<(), U>,
27}
28
29impl<T: Clone, U> SubscriptionService<T, U> {
30 pub fn subscribe(name: &str, items: T) -> (Self, Receiver<(), U>) {
34 let (sender, receiver) = diem_channel::new(QueueStyle::LIFO, 1, None);
35 (
36 Self {
37 name: name.to_string(),
38 sender,
39 subscribed_items: items,
40 },
41 receiver,
42 )
43 }
44
45 pub fn publish(&mut self, payload: U) -> Result<()> {
46 self.sender.push((), payload)
47 }
48
49 pub fn subscribed_items(&self) -> T { self.subscribed_items.clone() }
50}
51
52pub type ReconfigSubscription =
57 SubscriptionService<SubscriptionBundle, OnChainConfigPayload>;
58
59#[derive(Clone)]
60pub struct SubscriptionBundle {
61 pub configs: HashSet<ConfigID>,
62 pub events: HashSet<EventKey>,
63}
64
65impl SubscriptionBundle {
66 pub fn new(configs: Vec<ConfigID>, events: Vec<EventKey>) -> Self {
67 let configs = configs.into_iter().collect::<HashSet<_>>();
68 let events = events.into_iter().collect::<HashSet<_>>();
69
70 Self { configs, events }
71 }
72}
73
74impl ReconfigSubscription {
75 pub fn subscribe_all(
80 name: &str, configs: Vec<ConfigID>, events: Vec<EventKey>,
81 ) -> (Self, Receiver<(), OnChainConfigPayload>) {
82 let bundle = SubscriptionBundle::new(configs, events);
83 Self::subscribe(name, bundle)
84 }
85}