1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
use std::cell::UnsafeCell;
use std::rc::Rc;
use std::os::raw::c_void;

use crate::Sender;
use crate::priv_traits::*;

pub struct Dedup<'a, T>{
    last_value: Option<T>,
    sender: Sender<'a,T>,
}

impl<'a, T> Dedup<'a,T>{
    pub fn new() -> Dedup<'a,T>{
        Dedup{
            last_value: None,
            sender: Sender::new(),
        }
    }
}

impl<'a, T> StreamInner<'a,T> for Dedup<'a,T>{
    #[inline]
    fn push(&mut self, st: Rc<UnsafeCell<dyn SinkInner<'a,T>+'a>>){
        self.sender.push(st)
    }

    #[inline]
    fn send(&mut self, t: T){
        self.sender.send(t)
    }
}

impl<'a, T> Remove for Dedup<'a,T>{
    fn remove_raw(&mut self, st: *const c_void){
        self.sender.remove_raw(st)
    }
}

impl<'a, T: PartialEq + Clone> SinkInner<'a, T> for Dedup<'a,T>{
    #[inline]
    fn call(&mut self, t: T) -> Result<(),()>{
        if let Some(prev) = self.last_value.as_ref(){
            if *prev == t {
                return Ok(())
            }
        }

        self.sender.send(t.clone());
        self.last_value = Some(t);
        Ok(())
    }

    #[inline]
    fn is_alive(&self) -> bool {
        true
    }
}


pub struct DedupBy<'a, T, F, T2>{
    last_value: Option<T2>,
    f: F,
    sender: Sender<'a,T>,
}

impl<'a,T, F, T2> DedupBy<'a,T, F, T2>{
    pub fn new(f: F) -> DedupBy<'a,T, F, T2>{
        DedupBy{
            last_value: None,
            f,
            sender: Sender::new(),
        }
    }
}

impl<'a,T, F, T2> StreamInner<'a,T> for DedupBy<'a,T, F, T2>{
    #[inline]
    fn push(&mut self, st: Rc<UnsafeCell<dyn SinkInner<'a,T>+'a>>){
        self.sender.push(st)
    }

    #[inline]
    fn send(&mut self, t: T){
        self.sender.send(t)
    }
}

impl<'a,T, F, T2> Remove for DedupBy<'a,T, F, T2>{
    fn remove_raw(&mut self, st: *const c_void){
        self.sender.remove_raw(st)
    }
}

impl<'a,T, T2: PartialEq, F: Fn(&T) -> T2> SinkInner<'a, T> for DedupBy<'a,T, F, T2>{
    #[inline]
    fn call(&mut self, t: T) -> Result<(),()>{
        let next = (self.f)(&t);
        if let Some(prev) = self.last_value.as_ref(){
            if *prev == next {
                return Ok(())
            }
        }

        self.sender.send(t);
        self.last_value = Some(next);
        Ok(())
    }

    #[inline]
    fn is_alive(&self) -> bool {
        true
    }
}