1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
// Copyright (c) The Diem Core Contributors
// SPDX-License-Identifier: Apache-2.0
// Copyright 2021 Conflux Foundation. All rights reserved.
// Conflux is free software and distributed under GNU General Public License.
// See http://www.gnu.org/licenses/
#![forbid(unsafe_code)]
//! Generic pub/sub service framework
use anyhow::Result;
use channel::{
diem_channel::{self, Receiver, Sender},
message_queues::QueueStyle,
};
use diem_types::{
event::EventKey,
on_chain_config::{ConfigID, OnChainConfigPayload},
};
use std::collections::HashSet;
pub struct SubscriptionService<T, U> {
pub name: String,
subscribed_items: T,
sender: Sender<(), U>,
}
impl<T: Clone, U> SubscriptionService<T, U> {
/// Constructs an subscription object for `items`
/// Returns the subscription object, and the receiving end of a channel that
/// subscription will be sent to
pub fn subscribe(name: &str, items: T) -> (Self, Receiver<(), U>) {
let (sender, receiver) = diem_channel::new(QueueStyle::LIFO, 1, None);
(
Self {
name: name.to_string(),
sender,
subscribed_items: items,
},
receiver,
)
}
pub fn publish(&mut self, payload: U) -> Result<()> {
self.sender.push((), payload)
}
pub fn subscribed_items(&self) -> T { self.subscribed_items.clone() }
}
/// A subscription service for on-chain reconfiguration notifications from state
/// sync This is declared/implemented here instead of in `types/on_chain_config`
/// because when `subscription_service` crate is a dependency of `types`, the
/// build-dev fails
pub type ReconfigSubscription =
SubscriptionService<SubscriptionBundle, OnChainConfigPayload>;
#[derive(Clone)]
pub struct SubscriptionBundle {
pub configs: HashSet<ConfigID>,
pub events: HashSet<EventKey>,
}
impl SubscriptionBundle {
pub fn new(configs: Vec<ConfigID>, events: Vec<EventKey>) -> Self {
let configs = configs.into_iter().collect::<HashSet<_>>();
let events = events.into_iter().collect::<HashSet<_>>();
Self { configs, events }
}
}
impl ReconfigSubscription {
// Creates a subscription service named `name` that subscribes to changes in
// configs specified in `configs` and emission of events specified in
// `events` Returns (subscription service, endpoint that listens to the
// service)
pub fn subscribe_all(
name: &str, configs: Vec<ConfigID>, events: Vec<EventKey>,
) -> (Self, Receiver<(), OnChainConfigPayload>) {
let bundle = SubscriptionBundle::new(configs, events);
Self::subscribe(name, bundle)
}
}