use std::cell::UnsafeCell;
use std::rc::Rc;
use std::mem;
use std::os::raw::c_void;
use std::ops::Not;
use crate::priv_traits::*;
use crate::utils::*;
use crate::sender_rc::SenderInner;
use crate::traits::StreamExt;
use crate::Property;
#[cfg(feature="numbers")]
use crate::ranged_property::{RangedPropertyMut, Scale};
#[cfg(feature="numbers")]
use num_traits::{ToPrimitive,FromPrimitive};
#[cfg(feature="numbers")]
use std::ops::Sub;
#[must_use = "droping a stream unsubscribes it and all the chain until the origin sender"]
pub struct StreamRc<'a, T2>{
_parent: Option<Rc<UnsafeCell<dyn Remove + 'a>>>,
inner: Rc<UnsafeCell<dyn StreamInner<'a, T2> + 'a>>,
remove: Rc<UnsafeCell<dyn Remove + 'a>>,
on_drop: Rc<UnsafeCell<Option<T2>>>,
#[cfg(feature="named")]
name: Option<String>,
}
impl<'a, T2> Clone for StreamRc<'a, T2>{
fn clone(&self) -> StreamRc<'a, T2>{
StreamRc{
_parent: self._parent.clone(),
inner: self.inner.clone(),
remove: self.remove.clone(),
on_drop: self.on_drop.clone(),
#[cfg(feature="named")]
name: self.name.clone(),
}
}
}
impl<'a, T2> Drop for StreamRc<'a,T2>{
fn drop(&mut self){
unsafe{
if Rc::strong_count(&self.inner) == 3 {
match (*self.on_drop.get()).take(){
Some(t) => (*self.inner.get()).send(t),
_ => (),
}
if let Some(ref mut _parent) = self._parent{
(*_parent.get()).remove_raw(self.inner.get() as *const c_void)
}
}
}
}
}
impl<'a, T2> StreamRc<'a,T2>{
pub fn from_parent<'p, T1, S: StreamExt<'p, T1>, Inner: StreamInner<'a,T2> + SinkInner<'a,T1> + 'a>(parent: S, inner: Rc<UnsafeCell<Inner>>) -> StreamRc<'a,T2>{
unsafe{ (*parent.inner_rc().get()).push(transform_sink_lifetime(inner.clone())) };
StreamRc{
_parent: Some(unsafe{ mem::transmute(parent.into_remove()) }),
inner: inner.clone(),
remove: inner,
on_drop: Rc::new(UnsafeCell::new(None)),
#[cfg(feature="named")]
name: None,
}
}
pub fn from_remove<'p, T1, S: Remove + 'a, Inner: StreamInner<'a,T2> + SinkInner<'a,T1> + 'a>(remove: S, inner: Rc<UnsafeCell<Inner>>) -> StreamRc<'a,T2>{
StreamRc{
_parent: Some(Rc::new(UnsafeCell::new(remove))),
inner: inner.clone(),
remove: inner,
on_drop: Rc::new(UnsafeCell::new(None)),
#[cfg(feature="named")]
name: None,
}
}
pub fn new<Inner: StreamInner<'a,T2> + 'a>(inner: Rc<UnsafeCell<Inner>>) -> StreamRc<'a,T2>{
StreamRc{
_parent: None,
inner: inner.clone(),
remove: inner,
on_drop: Rc::new(UnsafeCell::new(None)),
#[cfg(feature="named")]
name: None,
}
}
pub fn send(&self, t: T2){
unsafe{ (*self.inner.get()).send(t) }
}
pub fn on_drop(self, value: T2) -> StreamRc<'a,T2>{
unsafe{ *self.on_drop.get() = Some(value) };
self
}
#[cfg(feature="named")]
pub fn set_name(mut self, name: &str) -> StreamRc<'a,T2>{
self.name = Some(name.to_owned());
self
}
pub fn by_ref(&self) -> &Self where Self:Sized{
self
}
}
impl<'a, T2: Clone + 'a> StreamRc<'a,T2>{
pub fn never() -> StreamRc<'a,T2>{
let inner = Rc::new(UnsafeCell::new(SenderInner::new()));
StreamRc{
_parent: None,
inner: inner.clone(),
remove: inner,
on_drop: Rc::new(UnsafeCell::new(None)),
#[cfg(feature="named")]
name: None,
}
}
pub fn merge_all<'b, S, I>(streams: I) -> StreamRc<'a,T2>
where S: StreamExt<'b,T2>,
I: IntoIterator<Item = S>
{
let mut streams = streams.into_iter().collect::<Vec<_>>();
let inner = Rc::new(UnsafeCell::new(SenderInner::new()));
for stream in streams.iter_mut() {
unsafe{
(*stream.inner_rc().get()).push(transform_sink_lifetime(inner.clone()))
}
}
let parents: Vec<Rc<UnsafeCell<dyn Remove>>> = streams.into_iter()
.map(|stream| unsafe{ mem::transmute(stream.into_remove()) })
.collect();
StreamRc::from_remove(parents, inner)
}
pub fn swap_parent<S: StreamExt<'a, T2>>(&mut self, parent: S){
unsafe{
if let Some(ref mut _parent) = self._parent{
(*_parent.get()).remove_raw(self.inner.get() as *const c_void)
}
let inner = self.inner.clone();
let parent = parent.on_value(move |t| (*inner.get()).send(t));
self._parent = Some(parent.into_remove());
}
}
}
impl<'a, T2: Clone> StreamRc<'a, T2>{
pub fn connect_to_property<'b,'c>(self, property: &mut Property<'a, T2>) -> StreamRc<'a, T2> where T2: 'c, Self: Sized{
let mut property = property.clone();
self.on_value(move |t| property.set(t)).rc()
}
}
impl<'a> StreamRc<'a, f64>{
#[cfg(feature="numbers")]
pub fn connect_to_ranged_parameter_pct<T,R>(self, param: &mut RangedPropertyMut<'a,T,R>) -> StreamRc<'a,T>
where T: Clone + 'a + FromPrimitive + ToPrimitive,
R: Sub<Output=R> + ToPrimitive + Clone + 'static
{
let scale = param.scale();
let diff = param.diff().to_f64().unwrap();
let min = param.min().to_f64().unwrap();
self.map(move |pct| {
let pct = match scale {
Scale::Linear => pct.to_f64().unwrap(),
Scale::Log => pct.to_f64().unwrap() * pct.to_f64().unwrap(),
};
let v = pct * diff + min;
T::from_f64(v).unwrap()
})
.connect_to_property(&mut param.clone().into_property())
.rc()
}
}
impl<'a> Not for StreamRc<'a,bool>{
type Output = ::Stream<'a,bool>;
fn not(self) -> ::Stream<'a,bool>{
self.map(Not::not)
}
}
impl<'a> StreamRc<'a,bool>{
pub fn not<'b>(self) -> ::Stream<'b,bool> where 'a: 'b{
self.map(|b| !b)
}
pub fn and_then<T: Clone + 'a, S: StreamExt<'a,T>>(self, then: S) -> ::Stream<'a, T>{
then.filter_by(self.to_property(false))
}
pub fn switch<T: 'a + Clone, S: StreamExt<'a,T>>(self, then: S, else_: S) -> ::Stream<'a,T>{
let switch = self.to_property(false);
let then = then.filter_by(switch.clone());
let else_ = else_.filter_by(!switch);
then.merge(else_)
}
}
impl<'a, T2: 'a> WithInner<'a,T2> for StreamRc<'a,T2>{
#[inline]
fn inner_rc(&self) -> Rc<UnsafeCell<dyn StreamInner<'a,T2> + 'a>>{
self.inner.clone()
}
#[inline]
fn remove_rc(&self) -> Rc<UnsafeCell<dyn Remove + 'a>>{
self.remove.clone()
}
#[inline]
fn into_inner(self) -> Rc<UnsafeCell<dyn StreamInner<'a,T2> + 'a>>{
Rc::new(UnsafeCell::new(self))
}
#[inline]
fn into_remove(self) -> Rc<UnsafeCell<dyn Remove + 'a>>{
Rc::new(UnsafeCell::new(self))
}
}
impl<'a, T> Remove for StreamRc<'a,T>{
#[inline]
fn remove_raw(&mut self, st: *const c_void){
unsafe{ (*self.inner.get()).remove_raw(st) }
}
}
impl<'a, T> StreamInner<'a,T> for StreamRc<'a,T>{
#[inline]
fn push(&mut self, st: Rc<UnsafeCell<dyn SinkInner<'a,T> + 'a>>){
unsafe{ (*self.inner.get()).push(st) }
}
#[inline]
fn send(&mut self, t: T){
unsafe{ (*self.inner.get()).send(t) }
}
}
impl<'a,T: 'a> StreamExt<'a, T> for StreamRc<'a,T>{
fn unique(self) -> ::Stream<'a,T>{
self.into()
}
fn rc(self) -> StreamRc<'a, T>{
self
}
}