use std::cell::UnsafeCell;
use std::rc::Rc;
use std::mem;
use std::os::raw::c_void;
use std::ops::Not;
use crate::{PropertyLastValue, priv_traits::*};
use crate::utils::*;
use crate::sender::SenderInner;
use crate::Property;
use crate::traits::StreamExt;
#[cfg(feature="numbers")]
use crate::ranged_property::{RangedPropertyMut, Scale};
#[cfg(feature="numbers")]
use num_traits::{ToPrimitive,FromPrimitive};
#[cfg(feature="numbers")]
use std::ops::Sub;
#[must_use = "droping a stream unsubscribes it and all the chain until the origin sender"]
pub struct Stream<'a, T2>{
_parent: Option<Rc<UnsafeCell<dyn Remove + 'a>>>,
inner: Rc<UnsafeCell<dyn StreamInner<'a, T2> + 'a>>,
remove: Rc<UnsafeCell<dyn Remove + 'a>>,
on_drop: Option<T2>,
#[cfg(feature="named")]
name: Option<String>,
}
impl<'a, T2> Drop for Stream<'a,T2>{
fn drop(&mut self){
unsafe{
match self.on_drop.take(){
Some(t) => (*self.inner.get()).send(t),
_ => (),
}
if let Some(ref mut _parent) = self._parent{
(*_parent.get()).remove_raw(self.inner.get() as *const c_void)
}
}
}
}
impl<'a, T2> Stream<'a,T2>{
pub fn from_parent<'p, T1, S: StreamExt<'p, T1>, Inner: StreamInner<'a,T2> + SinkInner<'a,T1> + 'a>(parent: S, inner: Rc<UnsafeCell<Inner>>) -> Stream<'a,T2>{
unsafe{ (*parent.inner_rc().get()).push(transform_sink_lifetime(inner.clone())) };
Stream{
_parent: Some(unsafe{ mem::transmute(parent.into_remove()) }),
inner: inner.clone(),
remove: inner,
on_drop: None,
#[cfg(feature="named")]
name: None,
}
}
pub fn from_remove<'p, T1, S: Remove + 'a, Inner: StreamInner<'a,T2> + SinkInner<'a,T1> + 'a>(remove: S, inner: Rc<UnsafeCell<Inner>>) -> Stream<'a,T2>{
Stream{
_parent: Some(Rc::new(UnsafeCell::new(remove))),
inner: inner.clone(),
remove: inner,
on_drop: None,
#[cfg(feature="named")]
name: None,
}
}
pub fn new<Inner: StreamInner<'a,T2> + 'a>(inner: Rc<UnsafeCell<Inner>>) -> Stream<'a,T2>{
Stream{
_parent: None,
inner: inner.clone(),
remove: inner,
on_drop: None,
#[cfg(feature="named")]
name: None,
}
}
pub fn send(&self, t: T2){
unsafe{ (*self.inner.get()).send(t) }
}
pub fn on_drop(mut self, value: T2) -> Stream<'a,T2>{
self.on_drop = Some(value);
self
}
#[cfg(feature="named")]
pub fn set_name(mut self, name: &str) -> Stream<'a,T2>{
self.name = Some(name.to_owned());
self
}
}
impl<'a, T2: 'a> Stream<'a,T2>{
pub fn never() -> Stream<'a,T2>{
let inner = Rc::new(UnsafeCell::new(SenderInner::new()));
Stream{
_parent: None,
inner: inner.clone(),
remove: inner,
on_drop: None,
#[cfg(feature="named")]
name: None,
}
}
pub fn merge_all<'b, S, I>(streams: I) -> Stream<'a,T2>
where S: StreamExt<'b,T2>,
I: IntoIterator<Item = S>
{
let mut streams = streams.into_iter().collect::<Vec<_>>();
let inner = Rc::new(UnsafeCell::new(SenderInner::new()));
for stream in streams.iter_mut() {
unsafe{
(*stream.inner_rc().get()).push(transform_sink_lifetime(inner.clone()))
}
}
let parents: Vec<Rc<UnsafeCell<dyn Remove>>> = streams.into_iter()
.map(|stream| unsafe{ mem::transmute(stream.into_remove()) })
.collect();
Stream::from_remove(parents, inner)
}
}
impl<'a, T2: Clone + 'a> Stream<'a,T2>{
pub fn connect_to_property<'b,'c>(self, property: &mut Property<'a, T2>) -> Stream<'a, T2> where T2: 'c, Self: Sized{
let mut property = property.clone();
self.on_value(move |t| property.set(t))
}
pub fn connect_to_property_last_value<'b,'c>(self, property: &mut PropertyLastValue<'a, T2>) -> Stream<'a, T2> where T2: 'c, Self: Sized{
let mut property = property.clone();
self.on_value(move |t| property.set(t))
}
pub fn swap_parent<S: StreamExt<'a, T2>>(&mut self, parent: S){
unsafe{
if let Some(ref mut _parent) = self._parent{
(*_parent.get()).remove_raw(self.inner.get() as *const c_void)
}
let inner = self.inner.clone();
let parent = parent.on_value(move |t| (*inner.get()).send(t));
self._parent = Some(parent.into_remove());
}
}
}
impl<'a> Stream<'a,f64>{
#[cfg(feature="numbers")]
pub fn connect_to_ranged_parameter_pct<T,R>(self, param: &mut RangedPropertyMut<'a,T,R>) -> Stream<'a,T>
where T: Clone + FromPrimitive + ToPrimitive,
R: Sub<Output=R> + ToPrimitive + Clone + 'static
{
let scale = param.scale();
let diff = param.diff().to_f64().unwrap();
let min = param.min().to_f64().unwrap();
self.map(move |pct| {
let pct = match scale {
Scale::Linear => pct.to_f64().unwrap(),
Scale::Log => pct.to_f64().unwrap() * pct.to_f64().unwrap(),
};
let v = pct * diff + min;
T::from_f64(v).unwrap()
})
.connect_to_property(&mut param.clone().into_property())
}
}
impl<'a> Not for Stream<'a,bool>{
type Output = Stream<'a,bool>;
fn not(self) -> Stream<'a,bool>{
self.map(Not::not)
}
}
impl<'a> Stream<'a, bool>{
pub fn not<'b>(self) -> Stream<'b,bool> where 'a: 'b{
self.map(|b| !b)
}
pub fn and_then<T: 'a, S: StreamExt<'a,T>>(self, then: S) -> Stream<'a, T>{
then.filter_by(self.to_property(false))
}
pub fn switch<T: 'a + Clone, S: StreamExt<'a,T>>(self, then: S, else_: S) -> Stream<'a,T>{
let switch = self.to_property(false);
let then = then.filter_by(switch.clone());
let else_ = else_.filter_by(!switch);
then.merge(else_)
}
}
impl<'a, T2: 'a> WithInner<'a,T2> for Stream<'a,T2>{
#[inline]
fn inner_rc(&self) -> Rc<UnsafeCell<dyn StreamInner<'a,T2> + 'a>>{
self.inner.clone()
}
#[inline]
fn remove_rc(&self) -> Rc<UnsafeCell<dyn Remove + 'a>>{
self.remove.clone()
}
#[inline]
fn into_inner(self) -> Rc<UnsafeCell<dyn StreamInner<'a,T2> + 'a>>{
Rc::new(UnsafeCell::new(self))
}
#[inline]
fn into_remove(self) -> Rc<UnsafeCell<dyn Remove + 'a>>{
Rc::new(UnsafeCell::new(self))
}
}
impl<'a,T> Remove for Stream<'a,T>{
#[inline]
fn remove_raw(&mut self, st: *const c_void){
unsafe{ (*self.inner.get()).remove_raw(st) }
}
}
impl<'a,T> StreamInner<'a,T> for Stream<'a,T>{
#[inline]
fn push(&mut self, st: Rc<UnsafeCell<dyn SinkInner<'a,T> + 'a>>){
unsafe{ (*self.inner.get()).push(st) }
}
#[inline]
fn send(&mut self, t: T){
unsafe{ (*self.inner.get()).send(t) }
}
}
impl<'a, T2: 'a> StreamExt<'a,T2> for Stream<'a,T2>{
fn unique(self) -> ::Stream<'a,T2>{
self
}
fn rc(self) -> ::StreamRc<'a,T2> where T2: Clone{
self.into()
}
}