use std::cell::UnsafeCell;
use std::rc::Rc;
use std::fmt::Debug;
use std::mem;
use std::os::raw::c_void;
use priv_traits::*;
use utils::*;
use sender::SenderInner;
use traits;
use traits::Stream as StreamT;
use ::Property;
use std::ops::Not;
#[must_use = "droping a stream unsubscribes it and all the chain until the origin sender"]
pub struct Stream<'a, T2: Clone + Debug + 'a>{
_parent: Option<Rc<UnsafeCell<Remove + 'a>>>,
inner: Rc<UnsafeCell<StreamInner<'a, T2> + 'a>>,
remove: Rc<UnsafeCell<Remove + 'a>>,
on_drop: Option<T2>,
#[cfg(feature="named")]
name: Option<String>,
}
impl<'a, T2: Clone + Debug> Drop for Stream<'a,T2>{
fn drop(&mut self){
unsafe{
match self.on_drop.as_ref(){
Some(t) => (*self.inner.get()).send(t.clone()),
_ => (),
}
if let Some(ref mut _parent) = self._parent{
(*_parent.get()).remove_raw(self.inner.get() as *const c_void)
}
}
}
}
impl<'a,T2: Clone + Debug> Stream<'a,T2>{
pub fn from_parent<'p, T1: Clone + Debug, S: traits::Stream<'p, T1>, Inner: StreamInner<'a,T2> + SinkInner<'a,T1> + 'a>(parent: S, inner: Rc<UnsafeCell<Inner>>) -> Stream<'a,T2>{
unsafe{ (*parent.inner_rc().get()).push(transform_sink_lifetime(inner.clone())) };
Stream{
_parent: Some(unsafe{ mem::transmute(parent.into_remove()) }),
inner: inner.clone(),
remove: inner,
on_drop: None,
#[cfg(feature="named")]
name: None,
}
}
pub fn from_remove<'p, T1: Clone + Debug, S: Remove + 'a, Inner: StreamInner<'a,T2> + SinkInner<'a,T1> + 'a>(remove: S, inner: Rc<UnsafeCell<Inner>>) -> Stream<'a,T2>{
Stream{
_parent: Some(Rc::new(UnsafeCell::new(remove))),
inner: inner.clone(),
remove: inner,
on_drop: None,
#[cfg(feature="named")]
name: None,
}
}
pub fn swap_parent<S: traits::Stream<'a, T2>>(&mut self, parent: S){
unsafe{
if let Some(ref mut _parent) = self._parent{
(*_parent.get()).remove_raw(self.inner.get() as *const c_void)
}
let inner = self.inner.clone();
let parent = parent.on_value(move |t| (*inner.get()).send(t));
self._parent = Some(parent.into_remove());
}
}
pub fn new<Inner: StreamInner<'a,T2> + 'a>(inner: Rc<UnsafeCell<Inner>>) -> Stream<'a,T2>{
Stream{
_parent: None,
inner: inner.clone(),
remove: inner,
on_drop: None,
#[cfg(feature="named")]
name: None,
}
}
pub fn never() -> Stream<'a,T2>{
let inner = Rc::new(UnsafeCell::new(SenderInner::new()));
Stream{
_parent: None,
inner: inner.clone(),
remove: inner,
on_drop: None,
#[cfg(feature="named")]
name: None,
}
}
pub fn merge_all<'b, S, I>(streams: I) -> Stream<'a,T2>
where S: ::StreamT<'b,T2>,
I: IntoIterator<Item = S>
{
let mut streams = streams.into_iter().collect::<Vec<_>>();
let inner = Rc::new(UnsafeCell::new(SenderInner::new()));
for stream in streams.iter_mut() {
unsafe{
(*stream.inner_rc().get()).push(transform_sink_lifetime(inner.clone()))
}
}
let parents: Vec<Rc<UnsafeCell<Remove>>> = streams.into_iter()
.map(|stream| unsafe{ mem::transmute(stream.into_remove()) })
.collect();
Stream::from_remove(parents, inner)
}
pub fn connect_to_property<'b,'c>(self, property: &mut Property<'a, T2>) -> Stream<'a, T2> where T2: 'c, Self: Sized{
let mut property = property.clone();
self.on_value(move |t| property.set(t))
}
pub fn send(&self, t: T2){
unsafe{ (*self.inner.get()).send(t) }
}
pub fn on_drop(mut self, value: T2) -> Stream<'a,T2>{
self.on_drop = Some(value);
self
}
#[cfg(feature="named")]
pub fn set_name(mut self, name: &str) -> Stream<'a,T2>{
self.name = Some(name.to_owned());
self
}
pub fn rc(self) -> ::StreamRc<'a,T2>{
self.into()
}
}
impl<'a> Not for Stream<'a,bool>{
type Output = Stream<'a,bool>;
fn not(self) -> Stream<'a,bool>{
self.map(Not::not)
}
}
impl<'a> Stream<'a,bool>{
pub fn not<'b>(self) -> Stream<'b,bool> where 'a: 'b{
self.map(|b| !b)
}
pub fn and_then<T: Clone + Debug + 'a, S: StreamT<'a,T>>(self, then: S) -> Stream<'a, T>{
then.filter_by(self.to_property(false))
}
pub fn switch<T: 'a + Clone + Debug, S: StreamT<'a,T>>(self, then: S, else_: S) -> Stream<'a,T>{
let switch = self.to_property(false);
let then = then.filter_by(switch.clone());
let else_ = else_.filter_by(!switch);
then.merge(else_)
}
}
impl<'a,T2: Clone + Debug> WithInner<'a,T2> for Stream<'a,T2>{
#[inline]
fn inner_rc(&self) -> Rc<UnsafeCell<StreamInner<'a,T2> + 'a>>{
self.inner.clone()
}
#[inline]
fn remove_rc(&self) -> Rc<UnsafeCell<Remove + 'a>>{
self.remove.clone()
}
#[inline]
fn into_inner(self) -> Rc<UnsafeCell<StreamInner<'a,T2> + 'a>>{
Rc::new(UnsafeCell::new(self))
}
#[inline]
fn into_remove(self) -> Rc<UnsafeCell<Remove + 'a>>{
Rc::new(UnsafeCell::new(self))
}
}
impl<'a,T:Clone + Debug> Remove for Stream<'a,T>{
#[inline]
fn remove_raw(&mut self, st: *const c_void){
unsafe{ (*self.inner.get()).remove_raw(st) }
}
}
impl<'a,T: Clone + Debug> StreamInner<'a,T> for Stream<'a,T>{
#[inline]
fn push(&mut self, st: Rc<UnsafeCell<SinkInner<'a,T> + 'a>>){
unsafe{ (*self.inner.get()).push(st) }
}
#[inline]
fn send(&mut self, t: T){
unsafe{ (*self.inner.get()).send(t) }
}
}
impl<'a,T2: Clone + Debug> StreamT<'a,T2> for Stream<'a,T2>{
fn rc(self) -> ::StreamRc<'a,T2>{
self.into()
}
fn unique(self) -> ::Stream<'a,T2>{
self
}
}