#[cfg(test)]
mod tests{
#[cfg(nightly)]
extern crate test;
use stream;
use stream_rc;
use Stream;
use StreamExt;
use Property;
use TryIterLastFrame;
use IndexedProperty;
use Priority;
#[cfg(nightly)]
use test::Bencher;
#[test]
fn not() {
let (se, st) = stream();
let p = st.not().to_property(false);
assert_eq!(*p.get(), false);
se.send(false);
assert_eq!(*p.get(), true);
se.send(true);
assert_eq!(*p.get(), false);
}
#[test]
fn map_stream() {
let (sender,stream) = stream();
let p = stream.map(|v| v*2.).to_property(0f32);
sender.send(1f32);
assert_eq!(*p.get(), 2f32);
}
#[test]
fn filter() {
let (se,st) = stream();
let st = st.filter(|v| *v != 5);
let mut i = st.try_iter();
se.send(6);
assert_eq!(i.next(), Some(6));
se.send(5);
assert_eq!(i.next(), None);
se.send(1);
assert_eq!(i.next(), Some(1));
assert_eq!(i.next(), None);
se.send(5);
assert_eq!(i.next(), None);
}
#[test]
fn filter_map() {
let (se,st) = stream();
let st = st.filter_map(|v|
if v!=5{
Some(v*2)
}else{
None
}
);
let mut i = st.try_iter();
se.send(6);
assert_eq!(i.next(), Some(12));
se.send(5);
assert_eq!(i.next(), None);
se.send(1);
assert_eq!(i.next(), Some(2));
assert_eq!(i.next(), None);
se.send(5);
assert_eq!(i.next(), None);
}
#[test]
fn filter_by() {
let (se_filter, st_filter) = stream();
let (se,st) = stream();
let p = st_filter.to_property(false);
let st = st.filter_by(p);
let mut i = st.try_iter();
se_filter.send(true);
se.send(6);
assert_eq!(i.next(), Some(6));
se_filter.send(false);
se.send(5);
assert_eq!(i.next(), None);
se_filter.send(true);
se.send(1);
assert_eq!(i.next(), Some(1));
assert_eq!(i.next(), None);
se_filter.send(false);
se.send(5);
assert_eq!(i.next(), None);
}
#[test]
fn flat_map() {
let (se,st) = stream();
let (se2, st2) = stream_rc();
let mut st = st.flat_map(|v| {
st2.clone().map(move |v2| v * v2)
}).try_iter();
se.send(2);
se2.send(2);
se2.send(3);
se2.send(4);
assert_eq!(st.next(), Some(4));
assert_eq!(st.next(), Some(6));
assert_eq!(st.next(), Some(8));
assert_eq!(st.next(), None);
se.send(2);
se2.send(2);
assert_eq!(st.next(), Some(4));
assert_eq!(st.next(), None);
}
#[test]
fn take() {
let (se,st) = stream();
let mut i = st.take(4).try_iter();
se.send(0);
assert_eq!(i.next(), Some(0));
se.send(1);
assert_eq!(i.next(), Some(1));
se.send(2);
assert_eq!(i.next(), Some(2));
se.send(3);
assert_eq!(i.next(), Some(3));
se.send(5);
assert_eq!(i.next(), None);
se.send(6);
assert_eq!(i.next(), None);
se.send(1);
assert_eq!(i.next(), None);
}
#[test]
fn take_while() {
let (se,st) = stream();
let mut i = st.take_while(|v| *v < 5).try_iter();
se.send(0);
assert_eq!(i.next(), Some(0));
se.send(1);
assert_eq!(i.next(), Some(1));
se.send(2);
assert_eq!(i.next(), Some(2));
se.send(3);
assert_eq!(i.next(), Some(3));
se.send(5);
assert_eq!(i.next(), None);
se.send(6);
assert_eq!(i.next(), None);
se.send(1);
assert_eq!(i.next(), None);
}
#[test]
fn skip() {
let (se, st) = stream();
let mut i = st.skip(4).try_iter();
se.send(0);
se.send(0);
se.send(0);
se.send(0);
assert_eq!(i.next(), None);
se.send(1);
assert_eq!(i.next(), Some(1));
assert_eq!(i.next(), None);
se.send(5);
assert_eq!(i.next(), Some(5));
}
#[test]
fn skip_while() {
let (se,st) = stream();
let mut i = st.skip_while(|v| *v==5).try_iter();
se.send(5);
se.send(5);
se.send(5);
se.send(5);
assert_eq!(i.next(), None);
se.send(1);
assert_eq!(i.next(), Some(1));
assert_eq!(i.next(), None);
se.send(5);
assert_eq!(i.next(), Some(5));
}
#[test]
fn merge() {
let (se1, st1) = stream();
let (se2, st2) = stream();
{
let mut i = st1.merge(st2).try_iter();
se1.send(0);
assert_eq!(i.next(), Some(0));
se2.send(1);
assert_eq!(i.next(), Some(1));
}
se2.send(0);
se2.send(0);
se2.send(0);
se2.send(0);
}
#[test]
fn merge_all() {
let (se1,st1) = stream::<u32>();
let (se2,st2) = stream::<u32>();
let (se3,st3) = stream::<u32>();
{
let merged = Stream::merge_all(vec![st1, st2, st3]).to_property(0);
assert_eq!(*merged.get(), 0);
se1.send(1);
assert_eq!(*merged.get(), 1);
se2.send(2);
assert_eq!(*merged.get(), 2);
se3.send(3);
assert_eq!(*merged.get(), 3);
}
se3.send(3);
se3.send(3);
se3.send(3);
}
#[test]
fn dedup() {
let (se, st) = stream();
let mut i = st.dedup().try_iter();
assert_eq!(i.next(), None);
se.send(1);
assert_eq!(i.next(), Some(1));
se.send(1);
assert_eq!(i.next(), None);
se.send(1);
assert_eq!(i.next(), None);
se.send(1);
assert_eq!(i.next(), None);
se.send(2);
assert_eq!(i.next(), Some(2));
se.send(2);
assert_eq!(i.next(), None);
se.send(2);
assert_eq!(i.next(), None);
se.send(2);
assert_eq!(i.next(), None);
se.send(1);
assert_eq!(i.next(), Some(1));
}
#[test]
fn delay() {
let (se, st) = stream();
let p = st.delay(2).to_property(0);
assert_eq!(*p.get(), 0);
se.send(1);
assert_eq!(*p.get(), 0);
se.send(2);
assert_eq!(*p.get(), 0);
se.send(3);
assert_eq!(*p.get(), 1);
se.send(4);
assert_eq!(*p.get(), 2);
se.send(5);
assert_eq!(*p.get(), 3);
}
#[test]
fn to_bool() {
let (se, st) = stream();
let p = st.to_bool().to_property(false);
assert_eq!(*p.get(), false);
se.send(5);
assert_eq!(*p.get(), true);
}
#[test]
fn zip_properties(){
let (sender1, stream1) = stream();
let (sender2, stream2) = stream();
let p1 = stream1.to_property(0f32);
let p2 = stream2.to_property(0u32);
let z = p1.zip(p2);
assert_eq!(*z.get(), (0f32,0u32));
sender1.send(1f32);
assert_eq!(*z.get(), (1f32,0u32));
sender2.send(1u32);
assert_eq!(*z.get(), (1f32,1u32));
}
#[test]
fn clone(){
let (se1, st1) = stream();
let p = st1.to_property(0u32);
let p2 = p.clone();
assert_eq!(*p.get(), 0);
assert_eq!(*p2.get(), 0);
se1.send(1);
assert_eq!(*p.get(), 1);
assert_eq!(*p2.get(), 1);
}
#[test]
fn property_identity() {
let p1 = Property::new(0f32);
let p2 = p1.clone();
assert!(p1.is(&p2))
}
#[test]
fn property_non_identity() {
let p1 = Property::new(0f32);
let p2 = Property::new(0f32);
assert!(!p1.is(&p2))
}
#[test]
fn and_then() {
let (se1, st) = stream();
let (se2, then) = stream();
let p = st.and_then(then).to_property(0);
se1.send(false);
se2.send(5);
assert_eq!(*p.get(), 0);
se2.send(5);
assert_eq!(*p.get(), 0);
se1.send(true);
se2.send(5);
assert_eq!(*p.get(), 5);
se1.send(false);
se2.send(4);
assert_eq!(*p.get(), 5);
}
#[test]
fn set_property(){
let mut p = Property::new(0u32);
p.set(1);
assert_eq!(*p.get(), 1);
}
#[test]
fn lifetimes_on_value(){
let mut a = 0;
let (sender, stream) = stream();
{
let _s = stream.on_value(|t| a = t);
sender.send(5);
}
assert_eq!(a,5);
sender.send(4);
assert_eq!(a,5);
}
#[test]
fn lifetimes_take_while(){
use std::cell::RefCell;
let a = RefCell::new(0);
let (sender, stream) = stream();
{
let _s = stream.take_while(|t|{
*a.borrow_mut() = *t;
*t < 5
});
sender.send(1);
assert_eq!(*a.borrow(),1);
sender.send(2);
assert_eq!(*a.borrow(),2);
sender.send(4);
assert_eq!(*a.borrow(),4);
sender.send(5);
assert_eq!(*a.borrow(),5);
sender.send(6);
assert_eq!(*a.borrow(),5);
}
sender.send(4);
assert_eq!(*a.borrow(),5);
}
#[test]
fn lifetimes(){
let mut a = 0;
{
let (sender, stream) = stream_rc();
fn assert(t: i32){
assert!(t==1 || t==2 || t==3);
}
let _on_value = stream.clone().on_value(assert).rc();
let _map = stream.clone().map(assert);
let _filter_map = stream.clone().filter_map(|t| {assert(t); Some(t)});
let _fold = stream.clone().fold(0, |acc,t| {assert(t); assert!(acc==0 || acc==1 || acc==3); acc + t});
{
fn assert(t: i32){
assert!(t==1 || t==2);
}
let _on_value = _on_value.on_value(assert).rc();
let _map = _on_value.clone().map(assert);
let _filter_map = _on_value.clone().filter_map(|t| {assert(t); Some(t)});
let _fold = _on_value.clone().fold(0, |acc,t| {assert(t); assert!(acc==0 || acc==1); acc + t});
{
let _stream = _on_value.clone()
.on_value(|t| a = t);
let _on_value = _on_value.clone().on_value(|t| assert_eq!(t,1)).rc();
let _map = _on_value.clone().map(|t| assert_eq!(t,1));
let _filter_map = _on_value.clone().filter_map(|t| {assert_eq!(t,1); Some(t)});
let _fold = _on_value.fold(0, |acc,t| {assert_eq!(t,1); assert_eq!(acc,0); acc + t});
sender.send(1);
}
sender.send(2);
}
sender.send(3);
assert_eq!(a, 1);
}
}
#[test]
fn drop(){
let mut a = 0;
{
let (sender, stream) = stream_rc();
fn assert(t: i32){
assert!(t==1 || t==2 || t==3);
}
let _on_value = stream.clone().on_value(assert).rc();
let _map = stream.clone().map(assert);
let _filter_map = stream.clone().filter_map(|t| {assert(t); Some(t)});
let _fold = stream.clone().fold(0, |acc,t| {assert(t); println!("acc {}", acc); assert!(acc==0 || acc==1 || acc==3); acc + t});
{
fn assert(t: i32){
assert!(t==1 || t==2);
}
let _on_value = _on_value.clone().on_value(assert).rc();
let _map = _on_value.clone().map(assert);
let _filter_map = _on_value.clone().filter_map(|t| {assert(t); Some(t)});
let _fold = _on_value.clone().fold(0, |acc,t| {assert(t); assert!(acc==0 || acc==1); acc + t});
{
let _stream = _on_value.clone()
.on_value(|t| a = t);
let _on_value = _on_value.clone().on_value(|t| assert_eq!(t,1)).rc();
let _map = _on_value.clone().map(|t| assert_eq!(t,1));
let _filter_map = _on_value.clone().filter_map(|t| {assert_eq!(t,1); Some(t)});
let _fold = _on_value.clone().fold(0, |acc,t| {assert_eq!(t,1); assert_eq!(acc,0); acc + t});
sender.send(1);
}
sender.send(2);
println!("end inner scope");
}
println!("after inner scope");
sender.send(3);
assert_eq!(a, 1);
}
}
#[test]
fn drop_iter(){
let (tx, rx) = stream();
{
let mut it = rx.try_iter();
tx.send(1);
assert_eq!(it.next(), Some(1));
assert_eq!(it.next(), None);
}
assert_eq!(tx.num_receivers(), 1);
tx.send(2);
assert_eq!(tx.num_receivers(), 0);
}
#[test]
fn drop_iter_rc(){
let (tx, rx) = stream_rc();
{
let mut it = rx.try_iter();
tx.send(1);
assert_eq!(it.next(), Some(1));
assert_eq!(it.next(), None);
}
assert_eq!(tx.num_receivers(), 1);
tx.send(2);
assert_eq!(tx.num_receivers(), 0);
}
#[test]
fn property_updates_once(){
let (se, st) = stream_rc();
let p1 = st.to_property(1);
let p3 = p1.clone();
let mut count = 0;
{
let _st = p3.stream().on_value(|_| count += 1);
se.send(1);
}
assert_eq!(count, 1);
}
#[test]
fn send_references(){
let vec = vec![3,4,5];
{
let (se, st) = stream_rc();
let p = st.filter_map(|vec: &Vec<i32>| if vec[0] == 3{
Some(true)
}else{
None
}).to_property(false);
se.send(&vec);
assert_eq!(*p.get(), true);
}
{
let vec: ::std::borrow::Cow<Vec<i32>> = ::std::borrow::Cow::Owned(vec);
let (se, st) = stream_rc();
let p = st.map(move |v| vec.clone());
let p = p.filter_map(|vec| if vec[0] == 3{
Some(true)
}else{
None
});
se.send(5);
}
}
#[test]
fn property_from_sender_stream(){
let (se, st) = stream_rc();
let st = st.map(|a| a + 5);
let mut p = Property::from_sender_stream(se, st, 0);
assert_eq!(*p.get(), 0);
p.set(5);
assert_eq!(*p.get(), 10);
p.set(15);
assert_eq!(*p.get(), 20);
let mut p2 = p.clone();
assert_eq!(*p2.get(), 20);
p2.set(5);
assert_eq!(*p2.get(), 10);
assert_eq!(*p.get(), 10);
p2.set(15);
assert_eq!(*p2.get(), 20);
assert_eq!(*p.get(), 20);
}
#[test]
fn try_iter_last_frame(){
let mut p = Property::new(0);
let (tf, rf) = stream();
let mut iter = TryIterLastFrame::new(p.clone().stream(), rf);
assert_eq!(iter.next(), None);
tf.send(());
p.set(1);
assert_eq!(iter.next(), Some(1));
p.set(2);
tf.send(());
assert_eq!(iter.next(), None);
p.set(2);
assert_eq!(iter.next(), Some(2));
}
#[test]
fn indexed_property(){
let collection = vec!["one", "two", "three"];
let index = 0;
let mut indexed_property = IndexedProperty::new(collection, index);
assert_eq!(*indexed_property.get(), "one");
indexed_property.set(1);
assert_eq!(*indexed_property.get(), "two");
indexed_property.set(2);
assert_eq!(*indexed_property.get(), "three");
}
#[test]
fn partition(){
let (se, st) = stream_rc::<usize>();
let (left, right) = st.partition(|t| { *t < 5 });
let l = left.to_property(0);
let r = right.to_property(0);
se.send(1);
assert_eq!(*l.get(), 1);
assert_eq!(*r.get(), 0);
se.send(2);
assert_eq!(*l.get(), 2);
assert_eq!(*r.get(), 0);
se.send(3);
assert_eq!(*l.get(), 3);
assert_eq!(*r.get(), 0);
se.send(4);
assert_eq!(*l.get(), 4);
assert_eq!(*r.get(), 0);
se.send(5);
assert_eq!(*l.get(), 4);
assert_eq!(*r.get(), 5);
se.send(6);
assert_eq!(*l.get(), 4);
assert_eq!(*r.get(), 6);
se.send(7);
assert_eq!(*l.get(), 4);
assert_eq!(*r.get(), 7);
}
#[test]
fn priority() {
let (se, st0) = stream_rc::<usize>();
let mut priority = Priority::new(st0);
let st1 = priority.event_stream(1);
let (st1, na1) = st1.partition(|t| { *t < 5 });
let st3 = priority.event_stream(3);
let (st3, na3) = st3.partition(|t| *t < 15);
let st2 = priority.event_stream(2);
let (st2, na2) = st2.partition(|t| *t < 10);
priority.set_non_attended(1, na1);
priority.set_non_attended(3, na3);
priority.set_non_attended(2, na2);
let mut st1 = st1.try_iter();
let mut st2 = st2.try_iter();
let mut st3 = st3.try_iter();
se.send(4);
se.send(7);
se.send(12);
assert_eq!(st1.next(), Some(4));
assert_eq!(st1.next(), None);
assert_eq!(st2.next(), Some(7));
assert_eq!(st2.next(), None);
assert_eq!(st3.next(), Some(12));
assert_eq!(st3.next(), None);
}
}
#[cfg(test)]
#[cfg(feature="unstable")]
mod benches{
extern crate test;
use stream;
use stream_rc;
use StreamExt;
use self::test::Bencher;
#[bench]
fn map_filter_map(b: &mut Bencher){
let mut numcalls = 0;
let (sender, stream) = stream();
let p = stream
.map(|t| t+5.)
.filter_map(|t| if t>5. { Some(t) } else { None })
.to_property(0.);
b.iter(|| {
sender.send(5.);
p.get();
numcalls+=1;
});
println!("final {} with {} num calls", *p.get(), numcalls);
}
#[bench]
fn vector_map_filter_map(b: &mut Bencher){
let mut numcalls = 0;
let (sender, stream) = stream_rc();
let properties: Vec<_> = (0..1000).map(|_i|{
stream.clone()
.map(|t| t+5.)
.filter_map(|t| if t>5. { Some(t) } else { None })
.to_property(0.)
}).collect();
b.iter(|| {
sender.send(5.);
numcalls+=1;
});
println!("final {} with {} num calls", *properties[0].get(), numcalls);
}
#[bench]
fn vector_map_filter_map_direct_call(b: &mut Bencher){
let mut numcalls = 0;
fn map(v: f32) -> f32{
v + 5.
}
fn filter(v: f32) -> Option<f32>{
if v>5. { Some(v) } else { None }
}
let mut properties = vec![0.;1000];
b.iter(|| {
for (i,p) in properties.iter_mut().enumerate(){
match filter(map(i as f32 + 5.)){
Some(t) => *p = t,
None => ()
}
}
numcalls+=1;
});
println!("final {} with {} num calls", properties[properties[0] as usize], numcalls);
}
#[bench]
fn map_filter_iter(b: &mut Bencher){
let mut numcalls = 0;
let mut final_value = None;
let (sender, stream) = stream();
let mut i = stream
.map(|t| t+5.)
.filter_map(|t| if t>5. { Some(t) } else { None })
.try_iter();
b.iter(|| {
sender.send(5.);
final_value = i.next();
numcalls+=1;
});
println!("final {:?} with {} num calls", final_value, numcalls);
}
#[bench]
fn map_filter_map_fold(b: &mut Bencher){
let mut numcalls = 0;
let (sender, stream) = stream();
let p = stream
.map(|t| t+5.)
.filter_map(|t| if t>5. { Some(t) } else { None })
.fold(0., |acc, t| acc + t)
.to_property(0.);
b.iter(|| {
sender.send(5.);
numcalls+=1;
});
println!("final {} with {} num calls", *p.get(), numcalls);
}
#[bench]
fn direct_calls(b: &mut Bencher){
fn map<T1,T2,F:FnMut(T1)->T2>(t: T1, mut f: F) -> T2{
f(t)
}
fn filter_map<T1,T2,F:FnMut(T1)->Option<T2>>(t: T1, mut f: F) -> Option<T2>{
f(t)
}
fn fold<T1,T2,F:FnMut(T2, T1) -> T2>(acc: T2, t: T1, mut f: F) -> T2{
f(acc, t)
}
let mut acc = 0.;
let mut numcalls = 0;
{
let mut mapper = |t| {
if let Some(t) = filter_map(map(t, |t| t+5.), |t| if t>5. { Some(t) } else { None }){
let newacc = fold(acc, t, |acc, t| acc + t );
acc = newacc;
}
};
b.iter( || {
mapper(5.);
numcalls+=1;
});
}
println!("final {} with {} num calls", acc, numcalls);
}
}