1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
use densevec::DenseVec; use crate::stream_rc::StreamRc; use crate::sender_rc::{SenderRc, stream_rc}; use crate::StreamExt; pub struct Priority<'a, T>{ non_attended: DenseVec<StreamRc<'a, T>>, senders: DenseVec<SenderRc<'a, T>>, streams: DenseVec<StreamRc<'a, T>>, } impl<'a, T: Clone + 'a> Priority<'a, T>{ pub fn new(priority_0: StreamRc<'a, T>) -> Priority<'a, T> { let mut non_attended = DenseVec::new(); non_attended.insert(0, priority_0); Priority{ non_attended, senders: DenseVec::new(), streams: DenseVec::new(), } } pub fn set_non_attended<S: Into<StreamRc<'a, T>>>(&mut self, priority: usize, non_attended: S) { if self.non_attended.contains_key(priority){ panic!("Trying to insert already existing non attended priority") } let non_attended = non_attended.into(); if let Some((prio, next_sender)) = self.senders.iter_mut() .find(|(prio, _)| *prio > priority) { let sender = next_sender.clone(); self.streams.insert(prio, non_attended.clone().on_value(move |t| sender.send(t)).rc()); } self.non_attended.insert(priority, non_attended); } pub fn event_stream(&mut self, priority: usize) -> StreamRc<'a, T>{ if let Some(stream) = self.streams.get(priority) { return stream.clone(); } let mut last_stream = None; for (prio, stream) in self.non_attended.iter() { if prio < priority { last_stream = Some(stream); }else{ break } } let (sender, stream) = stream_rc(); if let Some(last_stream) = last_stream { self.senders.insert(priority, sender.clone()); self.streams.insert(priority, last_stream.clone().on_value(move |t| sender.send(t)).rc()); }else{ self.senders.insert(priority, sender.clone()); self.streams.insert(priority, stream.clone()); } stream } }