use std::cell::UnsafeCell;
use std::rc::Rc;
use std::mem;
#[cfg(feature="numbers")]
use num_traits;
#[cfg(feature="numbers")]
use std::ops::{Add, Sub, Mul, Div};
use std::time;
use crate:: priv_traits::*;
use crate::on_value::OnValue;
use crate::map::Map;
use crate::filter_map::FilterMap;
use crate::fold::Fold;
use crate::scan::Scan;
use crate::property;
use crate::iter::{TryIter, TryIterLastFrame};
use crate::filter::Filter;
use crate::flat_map::FlatMap;
use crate::partition::Partition;
use crate::partition_map::{PartitionMap, Either};
use crate::take::Take;
use crate::take_while::TakeWhile;
use crate::skip_while::SkipWhile;
use crate::skip::Skip;
use crate::dedup::{Dedup, DedupBy};
use crate::delay::Delay;
use crate::inspect::Inspect;
use crate::once::Once;
use crate::utils::*;
use crate::sender_rc::SenderInner;
use crate::parameter::Parameter;
use crate::dedup_or::DedupOr;
use crate::{Stream, StreamRc, stream};
#[cfg(feature = "async")]
use crate::r#async::ReceiverAsync;
pub trait StreamExt<'a, T>: WithInner<'a,T> {
fn on_value<'b, F: FnMut(T) + 'b>(self, f: F) -> Stream<'b,T>
where T: Clone + 'a, 'a: 'b, Self: Sized
{
Stream::from_parent(self, Rc::new(UnsafeCell::new(OnValue::new(f))))
}
fn map<'b, T3, F>(self, f: F) -> Stream<'b,T3>
where
T: 'a,
T3: 'b,
F: FnMut(T) -> T3 + 'b,
'a: 'b,
Self: Sized
{
Stream::from_parent(self, Rc::new(UnsafeCell::new(Map::new(f))))
}
fn filter<'b, F: FnMut(&T) -> bool + 'b>(self, f: F) -> Stream<'b,T>
where T: 'a, 'a: 'b, Self: Sized
{
Stream::from_parent(self, Rc::new(UnsafeCell::new(Filter::new(f))))
}
fn filter_by<'b,'c>(self, p: property::Property<'b, bool>) -> Stream<'c, T>
where
T: 'a,
'a: 'b,
'b: 'c,
Self: Sized
{
self.filter(move |_| *p.as_ref())
}
fn partition<'b, F>(self, f: F) -> (Stream<'b, T>, Stream<'b, T>)
where
F: FnMut(&T) -> bool + 'b,
'a: 'b,
Self: Sized,
T: 'a
{
let (filter, stream_false) = Partition::new(f);
let stream = Stream::from_parent(self, Rc::new(UnsafeCell::new(filter)));
(stream, stream_false)
}
fn filter_map<'b, T3: 'b, F: FnMut(T) -> Option<T3> + 'b>(self, f: F) -> Stream<'b,T3>
where T: 'a, 'a: 'b, Self: Sized
{
Stream::from_parent(self, Rc::new(UnsafeCell::new(FilterMap::new(f))))
}
fn partition_map<'b, F, L: 'b, R: 'b>(self, f: F) -> (Stream<'b, L>, Stream<'b, R>)
where
F: FnMut(T) -> Either<L,R> + 'b,
'a: 'b,
Self: Sized,
T: 'a,
{
let (filter, stream_non_attended) = PartitionMap::new(f);
let stream = Stream::from_parent(self, Rc::new(UnsafeCell::new(filter)));
(stream, stream_non_attended)
}
fn flat_map<'b, T3: 'b, F: FnMut(T) -> Stream<'b, T3> + 'b>(self, f: F) -> Stream<'b,T3>
where
T: 'a,
T3: Clone,
'a: 'b,
Self: Sized
{
Stream::from_parent(self, Rc::new(UnsafeCell::new(FlatMap::new(f))))
}
fn take<'b>(self, take: usize) -> Stream<'b,T>
where T: 'a, 'a: 'b, Self: Sized
{
Stream::from_parent(self, Rc::new(UnsafeCell::new(Take::new(take))))
}
fn take_then<'b>(self, take: usize, then: T) -> Stream<'b,T>
where T: 'a, 'a: 'b, Self: Sized
{
Stream::from_parent(self, Rc::new(UnsafeCell::new(Take::new_with_then(take, then))))
}
fn take_while<'b, F: FnMut(&T) -> bool + 'b>(self, f: F) -> Stream<'b,T>
where T: 'a, 'a: 'b, Self: Sized
{
Stream::from_parent(self, Rc::new(UnsafeCell::new(TakeWhile::new(f))))
}
fn take_while_then<'b, F: FnMut(&T) -> bool + 'b>(self, f: F, then: T) -> Stream<'b,T>
where T: 'a, 'a: 'b, Self: Sized
{
Stream::from_parent(self, Rc::new(UnsafeCell::new(TakeWhile::new_with_then(f, then))))
}
fn skip<'b>(self, skip: usize) -> Stream<'b,T>
where T: 'a, 'a: 'b, Self: Sized
{
Stream::from_parent(self, Rc::new(UnsafeCell::new(Skip::new(skip))))
}
fn skip_while<'b, F: FnMut(&T) -> bool + 'b>(self, f: F) -> Stream<'b,T>
where T: 'a, 'a: 'b, Self: Sized
{
Stream::from_parent(self, Rc::new(UnsafeCell::new(SkipWhile::new(f))))
}
fn fold<'b, T3: 'b, F: FnMut(T3, T) -> T3 + 'b>(self, initial: T3, f: F) -> Stream<'b,T3>
where
T: 'a, 'a: 'b,
T3: Clone,
Self: Sized
{
Stream::from_parent(self, Rc::new(UnsafeCell::new(Fold::new(f,initial))))
}
fn scan<'b, T2: 'b, T3: 'b, F: FnMut(&mut T2, T) -> Option<T3> + 'b>(self, initial: T2, f: F) -> Stream<'b,T3>
where T: 'a, 'a: 'b, Self: Sized
{
Stream::from_parent(self, Rc::new(UnsafeCell::new(Scan::new(f,initial))))
}
fn dedup<'b>(self) -> Stream<'b,T>
where T: Clone + 'a + PartialEq, 'a: 'b, Self: Sized
{
Stream::from_parent(self, Rc::new(UnsafeCell::new(Dedup::new())))
}
fn dedup_by<'b, F, T2>(self, f: F) -> Stream<'b,T>
where T: 'a,
'a: 'b,
Self: Sized,
T2: PartialEq + 'b,
F: Fn(&T) -> T2 + 'b,
{
Stream::from_parent(self, Rc::new(UnsafeCell::new(DedupBy::new(f))))
}
fn dedup_or<'b>(self, or: ::Property<'b, bool>) -> Stream<'b,T>
where
T: 'a + PartialEq + Clone,
'a: 'b,
Self: Sized
{
Stream::from_parent(self, Rc::new(UnsafeCell::new(DedupOr::new(or))))
}
fn delay<'b>(self, delay: usize) -> Stream<'b,T>
where T: 'a + PartialEq, 'a: 'b, Self: Sized
{
Stream::from_parent(self, Rc::new(UnsafeCell::new(Delay::new(delay))))
}
fn inspect<'b,F>(self, inspect: F) -> Stream<'b,T>
where T: 'b,
'a: 'b,
F: Fn(&T) + 'b,
Self: Sized{
Stream::from_parent(self, Rc::new(UnsafeCell::new(Inspect::new(inspect))))
}
fn once<'b, T2>(self, once: T2) -> Stream<'b,T2>
where
T: 'a,
T2: 'b,
'a: 'b,
Self: Sized
{
Stream::from_parent(self, Rc::new(UnsafeCell::new(Once::new(once))))
}
fn merge<'b,'c, S: StreamExt<'b,T>>(self, other: S) -> Stream<'c,T>
where T: Clone + 'c, 'a: 'c, 'b: 'c, Self: Sized
{
let inner = Rc::new(UnsafeCell::new(SenderInner::new()));
unsafe{ (*self.inner_rc().get()).push(transform_sink_lifetime(inner.clone())) };
unsafe{ (*other.inner_rc().get()).push(transform_sink_lifetime(inner.clone())) };
let parents: Vec<Rc<UnsafeCell<dyn Remove>>> = unsafe{ vec![
mem::transmute(self.into_remove()),
mem::transmute(other.into_remove())
] };
Stream::from_remove(parents, inner)
}
#[cfg(not(target_arch = "wasm32"))]
fn throttle<'c>(self, fps: f32) -> Stream<'c, T>
where T: 'a, 'a: 'c, Self: Sized
{
let last_time: Option<time::Instant> = None;
let duration = time::Duration::from_nanos((1. / fps * 1_000_000_000.) as u64);
self.scan(last_time, move |last_time, v| {
let now = time::Instant::now();
if let Some(last_time) = last_time{
if now - *last_time > duration {
*last_time = now;
Some(v)
}else{
None
}
}else{
*last_time = Some(now);
Some(v)
}
})
}
#[cfg(target_arch = "wasm32")]
fn throttle(self, fps: f32) -> Self
where Self: Sized
{
self
}
#[cfg(not(target_arch = "wasm32"))]
fn benchmark<'b, 'c, F: 'b, S: StreamExt<'b,F>>(self, frame_stream: S) -> (Stream<'c, T>, Stream<'c, time::Duration>)
where T: Clone + 'a, 'a: 'c, 'b: 'c,
Self: Sized,
{
let (timed_sender, timed_stream) = stream();
let _0 = time::Duration::new(0, 0);
#[derive(Clone)]
enum Event<T>{
Frame,
Other(T)
}
let compute_time = self.map(|t| Event::Other(t))
.merge(frame_stream.map(|_| Event::Frame))
.scan(_0, move |frame_t, e| {
match e {
Event::Frame => {
Some(mem::replace(frame_t, _0))
}
Event::Other(e) => {
let then = time::Instant::now();
timed_sender.send(e);
*frame_t += then.elapsed();
None
}
}
});
(timed_stream, compute_time)
}
#[cfg(feature = "numbers")]
fn bandpass<'b>(self, alpha: T) -> Stream<'b, T>
where T: 'a + num_traits::Zero + num_traits::One + Sub<Output=T> + Add<Output=T> + Mul<Output=T> + Clone,
'a: 'b,
Self: Sized,
{
let _0: T = num_traits::zero();
let _1: T = num_traits::one();
self.fold(_0, move |prev, next| prev.clone() * alpha.clone() + next.clone() * (_1.clone() - alpha.clone()))
}
#[cfg(feature = "numbers")]
fn resolution<'b>(self, resolution: u8) -> Stream<'b, T>
where T: 'a + Div<Output=T> + Mul<Output=T> + num_traits::Float,
'a: 'b,
Self: Sized,
{
let factor = 10u64.pow(resolution as u32);
let factor: T = num_traits::cast(factor).unwrap();
self.map(move |t| {
let u: u64 = num_traits::cast(t * factor).unwrap();
let t: T = num_traits::cast(u).unwrap();
t / factor
})
}
fn to_bool<'b>(self) -> Stream<'b,bool>
where T: 'a, 'a: 'b, Self: Sized
{
self.map(|_| true)
}
fn until<'b, 'c, T2: 'b, S: StreamExt<'b,T2>>(self, until: S) -> Stream<'c, bool>
where Self: Sized, T: 'a, 'a: 'c, 'b: 'c
{
self.to_bool().merge(until.to_bool().not())
}
fn try_iter<'b>(self) -> TryIter<'b, T>
where Self: Sized + 'a, T: 'a, 'a: 'b
{
TryIter::new(self)
}
fn to_parameter(self, initial: T) -> Parameter<'a,T>
where Self: Sized + 'a, T: 'a
{
Parameter::from_parent(self.try_iter(), initial)
}
fn try_iter_last_frame<'b, S>(self, last_frame: S) -> TryIterLastFrame<'b, T>
where Self: Sized + 'a,
T: 'a, 'a: 'b,
S: StreamExt<'a, ()> + 'a
{
TryIterLastFrame::new(self, last_frame)
}
#[cfg(feature = "async")]
fn receiver_async<'b>(self) -> ReceiverAsync<'b, T>
where Self: Sized + 'a, T: 'a, 'a: 'b
{
ReceiverAsync::new(self)
}
fn to_box(self) -> Box<dyn StreamExt<'a,T> + 'a>
where Self: Sized + 'a
{
Box::new(self) as Box<dyn StreamExt<'a,T> + 'a>
}
fn to_property<'b>(self, initial: T) -> ::Property<'a, T>
where T: Clone + 'a, Self: Sized + Into<StreamRc<'a,T>>
{
property::Property::from_parent(self, initial)
}
fn to_property_last_value<'b>(self, initial: T) -> ::PropertyLastValue<'a, T>
where T: Clone + 'a, Self: Sized + Into<StreamRc<'a,T>>
{
::PropertyLastValue::from_parent(self, initial)
}
fn collect<'b>(self, samples: usize) -> ::Property<'b, Vec<T>>
where T: Clone + 'a, 'a: 'b, Self: Sized
{
self.fold(vec![], move |mut vec,v|{
vec.push(v);
if vec.len() > samples{
vec.remove(0);
}
vec
}).to_property(vec![])
}
fn unique(self) -> Stream<'a,T>;
fn rc(self) -> StreamRc<'a,T> where T: Clone;
}
impl<'a, 'b, T, S: WithInner<'a,T>> WithInner<'b, T> for &'b S where 'a: 'b{
fn inner_rc(&self) -> Rc<UnsafeCell<dyn StreamInner<'b,T> + 'b>>{
unsafe{mem::transmute((*self).inner_rc())}
}
fn remove_rc(&self) -> Rc<UnsafeCell<dyn Remove + 'b>>{
unsafe{mem::transmute((*self).remove_rc())}
}
fn into_inner(self) -> Rc<UnsafeCell<dyn StreamInner<'b,T> + 'b>>{
let inner: Rc<UnsafeCell<dyn StreamInner<'a,T>>> = (*self).inner_rc();
unsafe{ mem::transmute(inner) }
}
fn into_remove(self) -> Rc<UnsafeCell<dyn Remove + 'b>>{
(*self).remove_rc()
}
}