use super::{
SystemCondition, SyncSystem, SystemOnceRunner, SystemOnceThreadLocalRunner,
SystemThreadLocal, CreationSystem, SystemConditionElse, CreationSystemOnceRunner
};
#[cfg(feature = "async")]
use super::CreationSystemAsync;
#[cfg(feature = "debug_parameters")]
use super::SystemDebug;
use crate::{EntitiesCreation, Entities, EntitiesThreadLocal, Resources, ResourcesThreadLocal};
#[cfg(feature="parallel_systems")]
use crate::System;
#[cfg(feature = "debug_parameters")]
use crate::EntitiesDebug;
use super::any_system::AnySystem;
use std::any::TypeId;
use pathfinding::directed::{
topological_sort::topological_sort,
bfs::bfs_loop,
strongly_connected_components::strongly_connected_components,
};
use hashbrown::HashMap;
use crate::storage::Storages;
use std::sync::atomic::AtomicUsize;
use crate::resource::{ResourcesContainer, ResourcesCreation};
use crate::utils::{delete_lifetime_mut};
#[cfg(feature="stats_events")]
use seitan::{StreamExt, SenderRc, Property};
use std::path::Path;
use std::slice;
use std::iter;
#[cfg(feature="stats_events")]
use std::time::Duration;
#[cfg(feature="async")]
use super::Task;
#[cfg(feature="async")]
use futures::task::Poll;
#[cfg(feature="async")]
use std::pin::Pin;
#[cfg(feature="async")]
use futures::Future;
#[cfg(any(feature="stats_events", feature = "async"))]
use densevec::DenseVec;
use std::mem;
#[cfg(feature="stats_events")]
use std::io::Write;
#[cfg(feature="stats_events")]
use crate::utils::{time, Stat, thread_id};
#[cfg(feature="dynamic_systems")]
use crate::dynamic_system_loader::DynamicSystemsLoader;
use retain_mut::RetainMut;
use crate::operators::OperatorId;
use crate::MaskType;
#[cfg(all(feature="stats_events", feature = "stdweb"))]
type Instant = f64;
#[cfg(all(feature="stats_events", not(feature = "stdweb")))]
use std::time::Instant;
pub struct SystemInfo<S>{
pub system: S,
pub name: Option<&'static str>,
pub checks: Option<SystemCondition>,
pub els: Option<S>,
pub before: Vec<SystemId>,
pub after: Vec<SystemId>,
pub creates: Vec<TypeId>,
pub updates: Vec<TypeId>,
pub reads: Vec<TypeId>,
pub writes: Vec<TypeId>,
pub needs: Vec<TypeId>,
pub successors: Vec<Priority>,
pub predecessors: Vec<Priority>,
pub resource_mask_r: MaskType,
pub resource_mask_w: MaskType,
pub system_info: &'static str,
}
impl<S> SystemInfo<S> {
fn entities<'a>(&'a self, storages: &'a Storages) -> Entities<'a> {
Entities {
storages: storages.substorages(
Some(&self.resource_mask_r),
Some(&self.resource_mask_w),
self.system_info
)
}
}
fn resources<'a>(&'a self,
resources: &'a ResourcesContainer,
#[cfg(feature="dynamic_systems")]
dynamic_systems: &DynamicSystemsLoader) -> Resources<'a>
{
Resources{
resources,
resource_mask_r: Some(&self.resource_mask_r),
resource_mask_w: Some(&self.resource_mask_w),
system_info: self.system_info,
#[cfg(feature="dynamic_systems")]
dynamic_systems,
}
}
}
impl SystemInfo<Box<dyn SystemThreadLocal>> {
fn run(
&mut self,
storages: &Storages,
resources: &ResourcesContainer,
#[cfg(feature="dynamic_systems")]
dynamic_systems: &DynamicSystemsLoader
) {
let entities = EntitiesThreadLocal {
storages: storages.substorages(
Some(&self.resource_mask_r),
Some(&self.resource_mask_w),
self.system_info
)
};
let resources = ResourcesThreadLocal{
resources,
resource_mask_r: Some(&self.resource_mask_r),
resource_mask_w: Some(&self.resource_mask_w),
system_info: self.system_info,
#[cfg(feature="dynamic_systems")]
dynamic_systems,
};
let mut system = &mut self.system;
if let Some(checks) = self.checks.as_ref(){
if !checks.check_should_run_thread_local(entities.storages(), &resources){
if let Some(els) = self.els.as_mut() {
system = els;
}else{
return
}
}
}
system.run(entities, resources);
}
}
impl SystemInfo<SystemOnceThreadLocalRunner> {
fn run(
&mut self,
storages: &Storages,
resources: &ResourcesContainer,
#[cfg(feature="dynamic_systems")]
dynamic_systems: &DynamicSystemsLoader
) {
let entities = EntitiesThreadLocal {
storages: storages.substorages(
Some(&self.resource_mask_r),
Some(&self.resource_mask_w),
self.system_info
)
};
let resources = ResourcesThreadLocal{
resources,
resource_mask_r: Some(&self.resource_mask_r),
resource_mask_w: Some(&self.resource_mask_w),
system_info: self.system_info,
#[cfg(feature="dynamic_systems")]
dynamic_systems,
};
let mut system = &mut self.system;
if let Some(checks) = self.checks.as_ref(){
if !checks.check_should_run_thread_local(entities.storages(), &resources){
if let Some(els) = self.els.as_mut() {
system = els;
}else{
return
}
}
}
system.run(entities, resources);
}
}
struct Barrier{
name: Option<&'static str>,
after: Vec<SystemId>,
before: Vec<SystemId>,
successors: Vec<Priority>,
predecessors: Vec<Priority>,
}
#[derive(Clone, Copy, Debug, Hash, Eq, PartialEq, Ord, PartialOrd)]
pub enum Priority{
Send(usize),
SendOnce(usize),
ThreadLocal(usize),
ThreadLocalOnce(usize),
Creation(usize),
CreationOnce(usize),
#[cfg(feature="debug_parameters")]
Debug(usize),
#[cfg(feature="async")]
CreationAsync(usize),
Barrier(usize),
}
impl Priority{
fn graph_id(self) -> String{
match self{
Priority::Send(i) => format!("Send_{}", i),
Priority::SendOnce(i) => format!("SendOnce_{}", i),
Priority::ThreadLocal(i) => format!("ThreadLocal_{}", i),
Priority::ThreadLocalOnce(i) => format!("ThreadLocalOnce_{}", i),
Priority::Creation(i) => format!("Creation_{}", i),
Priority::CreationOnce(i) => format!("CreationOnce_{}", i),
#[cfg(feature = "async")]
Priority::CreationAsync(i) => format!("CreationAsync_{}", i),
#[cfg(feature="debug_parameters")]
Priority::Debug(i) => format!("Debug_{}", i),
Priority::Barrier(i) => format!("Barrier_{}", i),
}
}
pub fn index(self) -> usize {
match self{
Priority::Send(i) => i,
Priority::SendOnce(i) => i,
Priority::ThreadLocal(i) => i,
Priority::ThreadLocalOnce(i) => i,
Priority::Creation(i) => i,
Priority::CreationOnce(i) => i,
#[cfg(feature = "async")]
Priority::CreationAsync(i) => i,
#[cfg(feature="debug_parameters")]
Priority::Debug(i) => i,
Priority::Barrier(i) => i,
}
}
}
#[cfg(feature="stats_events")]
enum SystemStatName{
Send,
SendOnce,
ThreadLocal,
ThreadLocalOnce,
Creation,
CreationOnce,
#[cfg(feature = "async")]
CreationAsync,
#[cfg(feature="debug_parameters")]
Debug,
Barrier,
}
#[cfg(feature="stats_events")]
#[derive(Serialize, Deserialize)]
struct TracingStat{
cat: String,
pid: u64,
tid: u64,
ts: u64,
ph: char,
name: String,
}
#[cfg(feature="stats_events")]
struct PerSystemVec<T>(Vec<DenseVec<T>>);
#[cfg(feature="stats_events")]
impl<T> Default for PerSystemVec<T> {
fn default() -> Self {
PerSystemVec(vec![
DenseVec::new(),
DenseVec::new(),
DenseVec::new(),
DenseVec::new(),
DenseVec::new(),
DenseVec::new(),
DenseVec::new(),
DenseVec::new(),
DenseVec::new()])
}
}
#[cfg(feature="stats_events")]
impl<T> PerSystemVec<T>{
fn index(prio: Priority) -> (SystemStatName, usize){
match prio {
Priority::Send(i) => (SystemStatName::Send, i),
Priority::SendOnce(i) => (SystemStatName::SendOnce, i),
Priority::ThreadLocal(i) => (SystemStatName::ThreadLocal, i),
Priority::ThreadLocalOnce(i) => (SystemStatName::ThreadLocalOnce, i),
Priority::Creation(i) => (SystemStatName::Creation, i),
Priority::CreationOnce(i) => (SystemStatName::CreationOnce, i),
#[cfg(feature = "async")]
Priority::CreationAsync(i) => (SystemStatName::CreationAsync, i),
#[cfg(feature="debug_parameters")]
Priority::Debug(i) => (SystemStatName::Debug, i),
Priority::Barrier(i) => (SystemStatName::Barrier, i),
}
}
fn insert(&mut self, prio: Priority, element: T){
let (ty,idx) = PerSystemVec::<T>::index(prio);
self.0[ty as usize].insert(idx, element);
}
fn clear(&mut self){
for v in &mut self.0{
v.clear();
}
}
fn get(&self, priority: Priority) -> Option<&T>{
let (ty, index) = PerSystemVec::<T>::index(priority);
self.0[ty as usize].get(index)
}
#[cfg(feature="glin")]
fn get_mut(&mut self, priority: Priority) -> Option<&mut T>{
let (ty, index) = PerSystemVec::<T>::index(priority);
self.0[ty as usize].get_mut(index)
}
#[cfg(feature="stats_events")]
#[cfg(feature="parallel_systems")]
fn extend_send<I: Iterator<Item = (usize, T)>>(&mut self, iter: I){
self.0[SystemStatName::Send as usize].extend(iter)
}
#[cfg(feature="stats_events")]
#[cfg(feature="parallel_systems")]
fn extend_send_once<I: Iterator<Item = (usize, T)>>(&mut self, iter: I){
self.0[SystemStatName::SendOnce as usize].extend(iter)
}
fn iter(&self) -> impl Iterator<Item = (Priority, &T)>{
let iter = self.0[SystemStatName::Send as usize].iter().map(|(i,t)| (Priority::Send(i), t))
.chain(self.0[SystemStatName::SendOnce as usize].iter().map(|(i,t)| (Priority::SendOnce(i), t)))
.chain(self.0[SystemStatName::ThreadLocal as usize].iter().map(|(i,t)| (Priority::ThreadLocal(i), t)))
.chain(self.0[SystemStatName::ThreadLocalOnce as usize].iter().map(|(i,t)| (Priority::ThreadLocalOnce(i), t)))
.chain(self.0[SystemStatName::Creation as usize].iter().map(|(i,t)| (Priority::Creation(i), t)))
.chain(self.0[SystemStatName::CreationOnce as usize].iter().map(|(i,t)| (Priority::CreationOnce(i), t)));
#[cfg(feature = "async")]
let iter = iter.chain(self.0[SystemStatName::CreationAsync as usize].iter().map(|(i,t)| (Priority::CreationAsync(i), t)));
#[cfg(feature = "debug_parameters")]
let iter = iter.chain(self.0[SystemStatName::Debug as usize].iter().map(|(i,t)| (Priority::Debug(i), t)));
iter.chain(self.0[SystemStatName::Barrier as usize].iter().map(|(i,t)| (Priority::Barrier(i), t)))
}
fn iter_mut(&mut self) -> impl Iterator<Item = (Priority, &mut T)>{
let send: &mut DenseVec<T> = unsafe{ mem::transmute(&mut self.0[SystemStatName::Send as usize]) };
let send_once: &mut DenseVec<T> = unsafe{ mem::transmute(&mut self.0[SystemStatName::SendOnce as usize]) };
let tl: &mut DenseVec<T> = unsafe{ mem::transmute(&mut self.0[SystemStatName::ThreadLocal as usize]) };
let tl_once: &mut DenseVec<T> = unsafe{ mem::transmute(&mut self.0[SystemStatName::ThreadLocalOnce as usize]) };
let creation: &mut DenseVec<T> = unsafe{ mem::transmute(&mut self.0[SystemStatName::Creation as usize]) };
let creation_once: &mut DenseVec<T> = unsafe{ mem::transmute(&mut self.0[SystemStatName::CreationOnce as usize]) };
#[cfg(feature = "async")]
let creation_async: &mut DenseVec<T> = unsafe{ mem::transmute(&mut self.0[SystemStatName::CreationAsync as usize]) };
#[cfg(feature = "debug_parameters")]
let debug: &mut DenseVec<T> = unsafe{ mem::transmute(&mut self.0[SystemStatName::Debug as usize]) };
let barrier: &mut DenseVec<T> = unsafe{ mem::transmute(&mut self.0[SystemStatName::Barrier as usize]) };
let iter = send.iter_mut().map(|(i,t)| (Priority::Send(i), t))
.chain(send_once.iter_mut().map(|(i,t)| (Priority::SendOnce(i), t)))
.chain(tl.iter_mut().map(|(i,t)| (Priority::ThreadLocal(i), t)))
.chain(tl_once.iter_mut().map(|(i,t)| (Priority::ThreadLocalOnce(i), t)))
.chain(creation.iter_mut().map(|(i,t)| (Priority::Creation(i), t)))
.chain(creation_once.iter_mut().map(|(i,t)| (Priority::Creation(i), t)));
#[cfg(feature = "async")]
let iter = iter.chain(creation_async.iter_mut().map(|(i,t)| (Priority::CreationAsync(i), t)));
#[cfg(feature = "debug_parameters")]
let iter = iter.chain(debug.iter_mut().map(|(i,t)| (Priority::Debug(i), t)));
iter.chain(barrier.iter_mut().map(|(i,t)| (Priority::Barrier(i), t)))
}
fn send_vec(&self) -> &DenseVec<T>{
&self.0[SystemStatName::Send as usize]
}
fn send_once_vec(&self) -> &DenseVec<T>{
&self.0[SystemStatName::SendOnce as usize]
}
#[cfg(all(feature="stats_events", feature="glin"))]
fn contains_key(&self, priority: Priority) -> bool{
let (ty, index) = PerSystemVec::<T>::index(priority);
self.0[ty as usize].contains_key(index)
}
}
#[derive(Copy,Clone,Debug)]
pub enum StatsType<'a>{
Cpu(&'a str),
Gpu(&'a str),
}
impl<'a> StatsType<'a>{
fn name(&self) -> &str{
match self{
StatsType::Cpu(name) | StatsType::Gpu(name) => name,
}
}
}
#[derive(Hash, PartialEq, Debug, Clone, Eq)]
pub enum SystemId{
TypeId(TypeId),
Name(String),
Barrier(TypeId),
}
impl SystemId{
pub fn of<T: 'static>() -> SystemId{
SystemId::TypeId(TypeId::of::<T>())
}
pub fn by_name(name: &str) -> SystemId{
SystemId::Name(name.to_owned())
}
pub fn barrier<T: super::Barrier + 'static>() -> SystemId{
SystemId::Barrier(TypeId::of::<T>())
}
}
mod mask{
#[cfg(systems_64)]
pub type SystemMask = u64;
#[cfg(not(systems_bigint))]
pub const fn zero() -> SystemMask{
0
}
#[cfg(not(systems_bigint))]
pub const fn one() -> SystemMask{
1
}
#[cfg(systems_128)]
pub type SystemMask = numext_fixed_uint::U128;
#[cfg(systems_256)]
pub type SystemMask = numext_fixed_uint::U256;
#[cfg(systems_512)]
pub type SystemMask = numext_fixed_uint::U512;
#[cfg(systems_1024)]
pub type SystemMask = numext_fixed_uint::U1024;
#[cfg(systems_2048)]
pub type SystemMask = numext_fixed_uint::U2048;
#[cfg(systems_4096)]
pub type SystemMask = numext_fixed_uint::U4096;
#[cfg(systems_bigint)]
pub const fn zero() -> SystemMask{
SystemMask::zero()
}
#[cfg(systems_bigint)]
pub const fn one() -> SystemMask{
SystemMask::one()
}
}
use self::mask::SystemMask;
use crate::bitmask::MaskType as ResourceMask;
use crate::bitmask as resource_mask;
use crate::bitmask::MAX_COMPONENTS;
#[cfg(systems_64)]
pub const MAX_SYSTEMS: usize = 64;
#[cfg(systems_128)]
pub const MAX_SYSTEMS: usize = 128;
#[cfg(systems_256)]
pub const MAX_SYSTEMS: usize = 256;
#[cfg(systems_512)]
pub const MAX_SYSTEMS: usize = 512;
#[cfg(systems_1024)]
pub const MAX_SYSTEMS: usize = 1024;
#[cfg(systems_2048)]
pub const MAX_SYSTEMS: usize = 2048;
#[cfg(systems_4096)]
pub const MAX_SYSTEMS: usize = 4096;
#[cfg(feature="stats_events")]
struct StatsSender<'a>{
then: Instant,
tid: u64,
stats: Option<&'a mut PerSystemVec<Stat>>,
#[cfg(feature="parallel_systems")]
sender: Option<crossbeam::channel::Sender<(usize, Stat)>>,
prio: Priority,
#[cfg(feature="glin")]
counter: Option<&'a mut glin::query::Duration>,
}
#[cfg(feature="stats_events")]
impl<'a> StatsSender<'a> {
fn new(stats: &mut PerSystemVec<Stat>, prio: Priority) -> StatsSender{
StatsSender {
#[cfg(feature = "stdweb")]
then: stdweb::web::Date::now(),
#[cfg(not(feature = "stdweb"))]
then: time::now(),
tid: thread_id(),
stats: Some(stats),
#[cfg(feature="parallel_systems")]
sender: None,
prio,
#[cfg(feature="glin")]
counter: None,
}
}
#[cfg(feature="glin")]
fn new_with_gpu_counters(
stats: &'a mut PerSystemVec<Stat>,
gpu_stats_counters: &'a mut PerSystemVec<glin::query::Duration>,
gpu_stats: &'a mut Vec<(Priority, Duration)>,
prio: Priority) -> StatsSender<'a>
{
let counter = if let Some(counter) = gpu_stats_counters.get_mut(prio){
if let Some(duration) = counter.result() {
gpu_stats.push((prio, duration));
}
counter.begin();
Some(counter)
}else{
None
};
StatsSender {
then: time::now(),
tid: thread_id(),
stats: Some(stats),
#[cfg(feature="parallel_systems")]
sender: None,
prio,
counter
}
}
#[cfg(feature="parallel_systems")]
fn new_with_stats_sender(sender: crossbeam::channel::Sender<(usize, Stat)>, prio: Priority) -> StatsSender<'a> {
StatsSender {
#[cfg(feature = "stdweb")]
then: stdweb::web::Date::now(),
#[cfg(not(feature = "stdweb"))]
then: time::now(),
tid: thread_id(),
stats: None,
sender: Some(sender),
prio,
#[cfg(feature="glin")]
counter: None,
}
}
}
#[cfg(feature="stats_events")]
impl<'a> Drop for StatsSender<'a>{
fn drop(&mut self){
#[cfg(feature="glin")]
{
if let Some(counter) = self.counter.take() {
counter.end();
}
}
let now = time::now();
let stat = Stat::new(self.then, now, self.tid);
#[cfg(feature="parallel_systems")]
{
if let Some(stats) = self.stats.take(){
stats.insert(self.prio, stat);
}else if let Some(sender) = self.sender.take() {
sender.send((self.prio.index(), stat)).unwrap();
}
}
#[cfg(not(feature="parallel_systems"))]
self.stats.take().unwrap().insert(self.prio, stat);
}
}
#[derive(Default)]
pub struct Systems{
pub(super) systems: Vec<SystemInfo<SyncSystem>>,
pub(super) systems_once: Vec<SystemInfo<SystemOnceRunner>>,
pub(super) systems_thread_local: Vec<SystemInfo<Box<dyn SystemThreadLocal>>>,
pub(super) systems_once_thread_local: Vec<SystemInfo<SystemOnceThreadLocalRunner>>,
pub(super) systems_creation: Vec<SystemInfo<Box<dyn CreationSystem>>>,
pub(super) systems_creation_once: Vec<SystemInfo<CreationSystemOnceRunner>>,
#[cfg(feature = "debug_parameters")]
pub(super) systems_debug: Vec<SystemInfo<Box<dyn SystemDebug>>>,
#[cfg(feature = "async")]
pub(super) async_tasks: DenseVec<SystemInfo<Arc<Task>>>,
barriers: Vec<Barrier>,
id_index: HashMap<SystemId, Priority>,
#[cfg(feature="stats_events")]
prev_stats: PerSystemVec<Stat>,
#[cfg(feature="stats_events")]
stats: PerSystemVec<Stat>,
#[cfg(feature="stats_events")]
gpu_stats: Vec<(Priority, Duration)>,
#[cfg(feature="stats_events")]
stats_events: PerSystemVec<(String, SenderRc<'static, Stat>)>,
#[cfg(all(feature="stats_events", feature="glin"))]
gpu_stats_counters: PerSystemVec<glin::query::Duration>,
#[cfg(feature="stats_events")]
gpu_stats_events: PerSystemVec<(String, SenderRc<'static, Duration>)>,
#[cfg(feature="stats_events")]
enabled_systems: PerSystemVec<Property<'static, bool>>,
#[cfg(feature="stats_events")]
send_enabled_systems: DenseVec<bool>,
#[cfg(feature="stats_events")]
send_enabled_systems_once: DenseVec<bool>,
needs_dependencies_refresh: bool,
needs_predecessors_refresh: bool,
}
#[cfg(feature="parallel_systems")]
#[derive(Clone)]
pub struct SendSystems<'a>{
systems: &'a [SystemInfo<SyncSystem>],
systems_once: &'a [SystemInfo<SystemOnceRunner>],
#[cfg(feature="stats_events")]
enabled_systems: &'a DenseVec<bool>,
#[cfg(feature="stats_events")]
enabled_systems_once: &'a DenseVec<bool>,
#[cfg(feature="stats_events")]
sender_send: crossbeam::channel::Sender<(usize, Stat)>,
#[cfg(feature="stats_events")]
sender_send_once: crossbeam::channel::Sender<(usize, Stat)>,
}
#[cfg(feature="parallel_systems")]
pub struct SendSystemWrapper<'e, S> {
system: S,
entities: Entities<'e>,
resources: Resources<'e>,
#[cfg(all(feature="stats_events"))]
stats_sender: Option<crossbeam::channel::Sender<(usize, Stat)>>,
#[cfg(feature="stats_events")]
prio: Priority,
}
#[cfg(feature="parallel_systems")]
unsafe impl<'e,S> Send for SendSystemWrapper<'e,S>{}
#[cfg(feature="parallel_systems")]
unsafe impl<'e,S> Sync for SendSystemWrapper<'e,S>{}
#[cfg(feature="parallel_systems")]
impl<'e, 's> SendSystemWrapper<'e, &'s mut dyn System> {
pub fn run(self) {
#[cfg(feature="stats_events")]
let _stats_sender = {
let prio = self.prio;
self.stats_sender.map(|stats_sender| {
Some(StatsSender::new_with_stats_sender(stats_sender, prio))
})
};
self.system.run(self.entities, self.resources);
}
}
#[cfg(feature="parallel_systems")]
impl<'e, 's> SendSystemWrapper<'e, &'s SystemOnceRunner> {
pub fn run(self) {
#[cfg(feature="stats_events")]
let _stats_sender = {
let prio = self.prio;
self.stats_sender.map(|stats_sender| {
Some(StatsSender::new_with_stats_sender(stats_sender, prio))
})
};
self.system.run(self.entities, self.resources);
}
}
#[cfg(feature="parallel_systems")]
impl<'a> SendSystems<'a>{
pub fn send_system(self,
idx: usize,
storages: &'a Storages,
resources: &'a ResourcesContainer,
#[cfg(feature="dynamic_systems")]
dynamic_systems: &'a DynamicSystemsLoader
) -> Option<SendSystemWrapper<'a, &'a mut dyn System>>
{
let system_w = &self.systems[idx];
#[cfg(feature="stats_events")]
let prio = Priority::Send(idx);
#[cfg(feature="stats_events")]
let has_stats = system_w.name.is_some();
#[cfg(feature="stats_events")]
let enabled = !has_stats || self.enabled_systems.get(idx).copied() != Some(false);
#[cfg(feature="stats_events")]
let stats_sender= if has_stats && enabled {
Some(self.sender_send)
}else{
None
};
#[cfg(not(feature="stats_events"))]
let enabled = true;
if enabled {
let entities = system_w.entities(storages);
let resources = system_w.resources(
resources,
#[cfg(feature="dynamic_systems")]
dynamic_systems
);
let mut system = &system_w.system;
if let Some(checks) = system_w.checks.as_ref(){
if !checks.check_should_run(entities.storages(), &resources){
if let Some(els) = system_w.els.as_ref() {
system = els;
}else{
return None
}
}
}
Some(SendSystemWrapper{
system: unsafe{ system.borrow_mut() },
entities,
resources,
#[cfg(feature="stats_events")]
stats_sender,
#[cfg(feature="stats_events")]
prio
})
}else{
None
}
}
pub fn send_once_system(self,
idx: usize,
storages: &'a Storages,
resources: &'a ResourcesContainer,
#[cfg(feature="dynamic_systems")]
dynamic_systems: &'a DynamicSystemsLoader
) -> Option<SendSystemWrapper<'a, &'a SystemOnceRunner>>
{
#[cfg(feature="stats_events")]
let prio = Priority::SendOnce(idx);
let system_w = &self.systems_once[idx];
#[cfg(feature="stats_events")]
let has_stats = system_w.name.is_some();
#[cfg(feature="stats_events")]
let enabled = !has_stats || self.enabled_systems_once.get(idx).copied() != Some(false);
#[cfg(feature="stats_events")]
let stats_sender= if has_stats && enabled {
Some(self.sender_send_once)
}else{
None
};
#[cfg(not(feature="stats_events"))]
let enabled = true;
if enabled {
let entities = system_w.entities(storages);
let resources = system_w.resources(
resources,
#[cfg(feature="dynamic_systems")]
dynamic_systems
);
let mut system = &system_w.system;
if let Some(checks) = system_w.checks.as_ref(){
if !checks.check_should_run(entities.storages(), &resources){
if let Some(els) = system_w.els.as_ref() {
system = els;
}else{
return None
}
}
}
Some(SendSystemWrapper {
system,
entities,
resources,
#[cfg(feature="stats_events")]
stats_sender,
#[cfg(feature="stats_events")]
prio
})
}else{
None
}
}
}
#[cfg(all(feature="parallel_systems", feature="stats_events"))]
pub struct SendSystemsStats<'a>{
stats: &'a mut PerSystemVec<Stat>,
receiver_send: crossbeam::channel::Receiver<(usize, Stat)>,
receiver_send_once: crossbeam::channel::Receiver<(usize, Stat)>,
}
#[cfg(all(feature="parallel_systems", feature="stats_events"))]
impl<'a> SendSystemsStats<'a>{
pub fn receive(&mut self){
self.stats.extend_send(self.receiver_send.try_iter());
self.stats.extend_send_once(self.receiver_send_once.try_iter());
}
}
#[cfg(all(feature="parallel_systems", not(feature="stats_events")))]
pub type SendSystemsStats = ();
impl Systems{
#[cfg(feature="stats_events")]
pub fn reset_stats(&mut self){
self.send_enabled_systems.clear();
let enabled_systems = self.enabled_systems.send_vec().iter().map(|(p, e)| (p, *e.get()));
self.send_enabled_systems.extend(enabled_systems);
self.send_enabled_systems_once.clear();
let enabled_systems_once = self.enabled_systems.send_once_vec().iter().map(|(p, e)| (p, *e.get()));
self.send_enabled_systems_once.extend(enabled_systems_once);
self.prev_stats.clear();
mem::swap(&mut self.stats, &mut self.prev_stats);
#[cfg(feature="glin")]
self.gpu_stats.clear();
}
#[cfg(feature="parallel_systems")]
pub fn send_systems(&mut self) -> (SendSystems, SendSystemsStats){
#[cfg(feature="stats_events")]
let (sender_send, receiver_send) = crossbeam::channel::bounded(self.systems.len());
#[cfg(feature="stats_events")]
let (sender_send_once, receiver_send_once) = crossbeam::channel::bounded(self.systems.len());
let send = SendSystems{
systems: &self.systems,
systems_once: &self.systems_once,
#[cfg(feature="stats_events")]
enabled_systems: &self.send_enabled_systems,
#[cfg(feature="stats_events")]
enabled_systems_once: &self.send_enabled_systems_once,
#[cfg(feature="stats_events")]
sender_send,
#[cfg(feature="stats_events")]
sender_send_once,
};
#[cfg(feature="stats_events")]
let tl = SendSystemsStats{
stats: &mut self.stats,
receiver_send,
receiver_send_once,
};
#[cfg(not(feature="stats_events"))]
let tl = ();
(send, tl)
}
pub fn run_send_system(
&mut self,
idx: usize,
storages: &Storages,
resources: &ResourcesContainer,
#[cfg(feature="dynamic_systems")]
dynamic_systems: &DynamicSystemsLoader
)
{
#[cfg(feature="stats_events")]
let prio = Priority::Send(idx);
let system_w = &mut self.systems[idx];
#[cfg(feature="stats_events")]
let has_stats = system_w.name.is_some();
#[cfg(feature="stats_events")]
let enabled = !has_stats || self.send_enabled_systems.get(idx).copied() != Some(false);
#[cfg(feature="stats_events")]
let _stat_sender= if has_stats && enabled {
Some(StatsSender::new(&mut self.stats, prio))
}else{
None
};
#[cfg(not(feature="stats_events"))]
let enabled = true;
if enabled {
let entities = system_w.entities(storages);
let resources = system_w.resources(
resources,
#[cfg(feature="dynamic_systems")]
dynamic_systems
);
let mut system = &system_w.system;
if let Some(checks) = system_w.checks.as_ref(){
if !checks.check_should_run(entities.storages(), &resources){
if let Some(els) = system_w.els.as_ref() {
system = els;
}else{
return
}
}
}
let system = unsafe{ system.borrow_mut() };
system.run(entities, resources);
}
}
pub fn run_send_once_system(
&mut self,
idx: usize,
storages: &Storages,
resources: &ResourcesContainer,
#[cfg(feature="dynamic_systems")]
dynamic_systems: &DynamicSystemsLoader
)
{
#[cfg(feature="stats_events")]
let prio = Priority::SendOnce(idx);
let system_w = &self.systems_once[idx];
#[cfg(feature="stats_events")]
let has_stats = system_w.name.is_some();
#[cfg(feature="stats_events")]
let enabled = !has_stats || self.send_enabled_systems.get(idx).copied() != Some(false);
#[cfg(feature="stats_events")]
let _stat_sender= if has_stats && enabled {
Some(StatsSender::new(&mut self.stats, prio))
}else{
None
};
#[cfg(not(feature="stats_events"))]
let enabled = true;
if enabled {
let entities = system_w.entities(storages);
let resources = system_w.resources(
resources,
#[cfg(feature="dynamic_systems")]
dynamic_systems
);
let mut system = &system_w.system;
if let Some(checks) = system_w.checks.as_ref(){
if !checks.check_should_run(entities.storages(), &resources){
if let Some(els) = system_w.els.as_ref() {
system = els;
}else{
return
}
}
}
system.run(entities, resources);
}
}
pub fn run_thread_local_system(&mut self,
idx: usize,
storages: &Storages,
resources: &ResourcesContainer,
#[cfg(feature="dynamic_systems")]
dynamic_systems: &DynamicSystemsLoader
){
#[cfg(feature="stats_events")]
let prio = Priority::ThreadLocal(idx);
let system_w = &mut self.systems_thread_local[idx];
#[cfg(feature="stats_events")]
let has_stats = system_w.name.is_some();
#[cfg(feature="stats_events")]
let enabled = !has_stats || self.enabled_systems.get(prio).map(|e| *e.get()) != Some(false);
#[cfg(feature="stats_events")]
let _stat_sender= if has_stats && enabled {
#[cfg(feature="glin")]
{
Some(StatsSender::new_with_gpu_counters(&mut self.stats, &mut self.gpu_stats_counters, &mut self.gpu_stats, prio))
}
#[cfg(not(feature="glin"))]
{
Some(StatsSender::new(&mut self.stats, prio))
}
}else{
None
};
#[cfg(not(feature="stats_events"))]
let enabled = true;
if enabled {
system_w.run(
storages,
resources,
#[cfg(feature="dynamic_systems")]
dynamic_systems,
)
}
}
pub fn run_thread_local_once_system(&mut self,
idx: usize,
storages: &Storages,
resources: &ResourcesContainer,
#[cfg(feature="dynamic_systems")]
dynamic_systems: &DynamicSystemsLoader
){
#[cfg(feature="stats_events")]
let prio = Priority::ThreadLocalOnce(idx);
let system_w = &mut self.systems_once_thread_local[idx];
#[cfg(feature="stats_events")]
let has_stats = system_w.name.is_some();
#[cfg(feature="stats_events")]
let enabled = !has_stats || self.enabled_systems.get(prio).map(|e| *e.get()) != Some(false);
#[cfg(feature="stats_events")]
let _stat_sender= if has_stats && enabled {
#[cfg(feature="glin")]
{
Some(StatsSender::new_with_gpu_counters(&mut self.stats, &mut self.gpu_stats_counters, &mut self.gpu_stats, prio))
}
#[cfg(not(feature="glin"))]
{
Some(StatsSender::new(&mut self.stats, prio))
}
}else{
None
};
#[cfg(not(feature="stats_events"))]
let enabled = true;
if enabled {
system_w.run(
storages,
resources,
#[cfg(feature="dynamic_systems")]
dynamic_systems,
)
}
}
pub fn run_creation_system(&mut self, idx: usize, entities: EntitiesCreation, resources: ResourcesCreation){
#[cfg(feature="stats_events")]
let prio = Priority::Creation(idx);
let system_w = &mut self.systems_creation[idx];
#[cfg(feature="stats_events")]
let has_stats = system_w.name.is_some();
#[cfg(feature="stats_events")]
let enabled = !has_stats || self.enabled_systems.get(prio).map(|e| *e.get()) != Some(false);
#[cfg(feature="stats_events")]
let _stat_sender= if has_stats && enabled {
#[cfg(feature="glin")]
{
Some(StatsSender::new_with_gpu_counters(&mut self.stats, &mut self.gpu_stats_counters, &mut self.gpu_stats, prio))
}
#[cfg(not(feature="glin"))]
{
Some(StatsSender::new(&mut self.stats, prio))
}
}else{
None
};
#[cfg(not(feature="stats_events"))]
let enabled = true;
if enabled {
let mut system = &mut system_w.system;
if let Some(checks) = system_w.checks.as_ref(){
if !checks.check_should_run_creation(
&entities.storages().substorages_all(),
&resources)
{
if let Some(els) = system_w.els.as_mut(){
system = els;
}else{
return
}
}
}
system.run(entities, resources);
}
}
pub fn run_creation_system_once(&mut self, idx: usize, entities: EntitiesCreation, resources: ResourcesCreation){
#[cfg(feature="stats_events")]
let prio = Priority::CreationOnce(idx);
let system_w = &mut self.systems_creation_once[idx];
#[cfg(feature="stats_events")]
let has_stats = system_w.name.is_some();
#[cfg(feature="stats_events")]
let enabled = !has_stats || self.enabled_systems.get(prio).map(|e| *e.get()) != Some(false);
#[cfg(feature="stats_events")]
let _stat_sender= if has_stats && enabled {
#[cfg(feature="glin")]
{
Some(StatsSender::new_with_gpu_counters(&mut self.stats, &mut self.gpu_stats_counters, &mut self.gpu_stats, prio))
}
#[cfg(not(feature="glin"))]
{
Some(StatsSender::new(&mut self.stats, prio))
}
}else{
None
};
#[cfg(not(feature="stats_events"))]
let enabled = true;
if enabled {
let mut system = &mut system_w.system;
if let Some(checks) = system_w.checks.as_ref(){
if !checks.check_should_run_creation(
&entities.storages().substorages_all(),
&resources)
{
if let Some(els) = system_w.els.as_mut(){
system = els;
}else{
return
}
}
}
system.run(entities, resources);
}
}
#[cfg(feature = "async")]
pub fn run_creation_system_async(&mut self, idx: usize, entities: CreationProxy, resources: ResourcesCreation) -> bool{
let prio = Priority::CreationAsync(idx);
let async_tasks = &mut self.async_tasks;
let system_w = async_tasks.get_mut(idx);
if system_w.is_none(){
return true;
}
let system_w = system_w.unwrap();
let system_w: &mut SystemInfo<_> = unsafe{ delete_lifetime_mut(system_w) };
#[cfg(feature="stats_events")]
let has_stats = system_w.name.is_some();
#[cfg(feature="stats_events")]
let enabled = !has_stats || self.enabled_systems.get(prio).map(|e| *e.get()) != Some(false);
#[cfg(feature="stats_events")]
let _stat_sender= if has_stats && enabled {
#[cfg(feature="glin")]
{
Some(StatsSender::new_with_gpu_counters(&mut self.stats, &mut self.gpu_stats_counters, &mut self.gpu_stats, prio))
}
#[cfg(not(feature="glin"))]
{
Some(StatsSender::new(&mut self.stats, prio))
}
}else{
None
};
#[cfg(not(feature="stats_events"))]
let enabled = true;
if enabled {
let mut task = &mut system_w.system;
if let Some(checks) = system_w.checks.as_ref(){
if !checks.check_should_run_creation(
&entities.storages().substorages_all(),
&resources)
{
if let Some(els) = system_w.els.as_mut(){
task = els;
}else{
return false;
}
}
}
if Poll::Ready(()) == Task::run(task){
async_tasks.remove(idx);
true
}else{
false
}
}else{
false
}
}
#[cfg(feature = "debug_parameters")]
pub fn run_debug_system(&mut self, idx: usize, entities: EntitiesDebug, resources: ResourcesThreadLocal){
#[cfg(feature="stats_events")]
let prio = Priority::Debug(idx);
let system_w = &mut self.systems_debug[idx];
#[cfg(feature="stats_events")]
let has_stats = system_w.name.is_some();
#[cfg(feature="stats_events")]
let enabled = !has_stats || self.enabled_systems.get(prio).map(|e| *e.get()) != Some(false);
#[cfg(feature="stats_events")]
let _stat_sender= if has_stats && enabled {
#[cfg(feature="glin")]
{
Some(StatsSender::new_with_gpu_counters(&mut self.stats, &mut self.gpu_stats_counters, &mut self.gpu_stats, prio))
}
#[cfg(not(feature="glin"))]
{
Some(StatsSender::new(&mut self.stats, prio))
}
}else{
None
};
#[cfg(not(feature="stats_events"))]
let enabled = true;
if enabled {
if let Some(checks) = system_w.checks.as_ref(){
if !checks.check_should_run_thread_local(entities.storages(), &resources){
return
}
}
system_w.system.run(entities, resources);
}
}
pub fn num_send_systems(&self) -> usize{
self.systems.len()
}
pub fn num_non_send_systems(&self) -> usize{
#[allow(unused_mut)]
let mut total = self.systems_thread_local.len() + self.systems_creation.len();
#[cfg(feature = "debug_parameters")]
{
total += self.systems_debug.len();
}
#[cfg(feature = "async")]
{
total += self.async_tasks.len();
}
total
}
pub fn num_total_systems(&self) -> usize{
self.num_send_systems() + self.num_non_send_systems()
}
pub fn system_name(&self, priority: Priority) -> String {
match priority {
Priority::Send(i) => self.systems[i].name.map(|s| s.to_owned())
.unwrap_or_else(|| format!("Unnamed send system {}", i)),
Priority::SendOnce(i) => self.systems_once[i].name.map(|s| s.to_owned())
.unwrap_or_else(|| format!("Unnamed send once system {}", i)),
Priority::ThreadLocal(i) => self.systems_thread_local[i].name.map(|s| s.to_owned())
.unwrap_or_else(|| format!("Unnamed thread local system {}", i)),
Priority::ThreadLocalOnce(i) => self.systems_once_thread_local[i].name.map(|s| s.to_owned())
.unwrap_or_else(|| format!("Unnamed thread local once system {}", i)),
Priority::Creation(i) => self.systems_creation[i].name.map(|s| s.to_owned())
.unwrap_or_else(|| format!("Unnamed creation system {}", i)),
Priority::CreationOnce(i) => self.systems_creation_once[i].name.map(|s| s.to_owned())
.unwrap_or_else(|| format!("Unnamed creation system once {}", i)),
#[cfg(feature = "async")]
Priority::CreationAsync(i) => self.async_tasks[i].name.map(|s| s.to_owned())
.unwrap_or_else(|| format!("Unnamed creation async system {}", i)),
#[cfg(feature = "debug_parameters")]
Priority::Debug(i) => self.systems_debug[i].name.map(|s| s.to_owned())
.unwrap_or_else(|| format!("Unnamed debug system {}", i)),
Priority::Barrier(i) => self.barriers[i].name.map(|s| s.to_owned())
.unwrap_or_else(|| format!("Barrier {}", i))
}
}
#[cfg(feature="debug_systems_permissions")]
pub fn system_file_line_info(&self, priority: Priority) -> &'static str {
match priority {
Priority::Send(i) => self.systems[i].system.file_line_info(),
Priority::SendOnce(i) => self.systems_once[i].name.unwrap_or(""),
Priority::ThreadLocal(i) => self.systems_thread_local[i].system.file_line_info(),
Priority::ThreadLocalOnce(i) => self.systems_once_thread_local[i].name.unwrap_or(""),
Priority::Creation(i) => self.systems_creation[i].system.file_line_info(),
Priority::CreationOnce(i) => self.systems_creation_once[i].name.unwrap_or(""),
#[cfg(feature = "async")]
Priority::CreationAsync(i) => self.async_tasks[i].system.file_line_info(),
#[cfg(feature = "debug_parameters")]
Priority::Debug(i) => self.systems_debug[i].name.unwrap_or(""),
Priority::Barrier(i) => "Barrier",
}
}
#[cfg(not(feature="debug_systems_permissions"))]
pub fn system_file_line_info(&self, priority: Priority) -> &'static str {
match priority {
Priority::Send(i) => self.systems[i].name.unwrap_or(""),
Priority::SendOnce(i) => self.systems_once[i].name.unwrap_or(""),
Priority::ThreadLocal(i) => self.systems_thread_local[i].name.unwrap_or(""),
Priority::ThreadLocalOnce(i) => self.systems_once_thread_local[i].name.unwrap_or(""),
Priority::Creation(i) => self.systems_creation[i].name.unwrap_or(""),
Priority::CreationOnce(i) => self.systems_creation_once[i].name.unwrap_or(""),
#[cfg(feature = "async")]
Priority::CreationAsync(i) => self.async_tasks[i].name.unwrap_or(""),
#[cfg(feature = "debug_parameters")]
Priority::Debug(i) => self.systems_debug[i].name.unwrap_or(""),
Priority::Barrier(i) => self.barriers[i].name.unwrap_or(""),
}
}
pub fn system_updates(&self, priority: Priority) -> &[TypeId]{
match priority {
Priority::Send(i) => &self.systems[i].updates,
Priority::SendOnce(i) => &self.systems_once[i].updates,
Priority::ThreadLocal(i) => &self.systems_thread_local[i].updates,
Priority::ThreadLocalOnce(i) => &self.systems_once_thread_local[i].updates,
Priority::Creation(i) => &self.systems_creation[i].updates,
Priority::CreationOnce(i) => &self.systems_creation_once[i].updates,
#[cfg(feature = "async")]
Priority::CreationAsync(i) => &self.async_tasks[i].updates,
#[cfg(feature = "debug_parameters")]
Priority::Debug(i) => &self.systems_debug[i].updates,
Priority::Barrier(_) => &[]
}
}
pub fn system_needs(&self, priority: Priority) -> &[TypeId]{
match priority {
Priority::Send(i) => &self.systems[i].needs,
Priority::SendOnce(i) => &self.systems_once[i].needs,
Priority::ThreadLocal(i) => &self.systems_thread_local[i].needs,
Priority::ThreadLocalOnce(i) => &self.systems_once_thread_local[i].needs,
Priority::Creation(i) => &self.systems_creation[i].needs,
Priority::CreationOnce(i) => &self.systems_creation_once[i].needs,
#[cfg(feature = "async")]
Priority::CreationAsync(i) => &self.async_tasks[i].needs,
#[cfg(feature = "debug_parameters")]
Priority::Debug(i) => &self.systems_debug[i].needs,
Priority::Barrier(_) => &[]
}
}
pub fn system_writes(&self, priority: Priority) -> &[TypeId]{
match priority {
Priority::Send(i) => &self.systems[i].writes,
Priority::SendOnce(i) => &self.systems_once[i].writes,
Priority::ThreadLocal(i) => &self.systems_thread_local[i].writes,
Priority::ThreadLocalOnce(i) => &self.systems_once_thread_local[i].writes,
Priority::Creation(i) => &self.systems_creation[i].writes,
Priority::CreationOnce(i) => &self.systems_creation_once[i].writes,
#[cfg(feature = "async")]
Priority::CreationAsync(i) => &self.async_tasks[i].writes,
#[cfg(feature = "debug_parameters")]
Priority::Debug(i) => &self.systems_debug[i].writes,
Priority::Barrier(_) => &[]
}
}
pub fn system_reads(&self, priority: Priority) -> &[TypeId]{
match priority {
Priority::Send(i) => &self.systems[i].reads,
Priority::SendOnce(i) => &self.systems_once[i].reads,
Priority::ThreadLocal(i) => &self.systems_thread_local[i].reads,
Priority::ThreadLocalOnce(i) => &self.systems_once_thread_local[i].reads,
Priority::Creation(i) => &self.systems_creation[i].reads,
Priority::CreationOnce(i) => &self.systems_creation_once[i].reads,
#[cfg(feature = "async")]
Priority::CreationAsync(i) => &self.async_tasks[i].reads,
#[cfg(feature = "debug_parameters")]
Priority::Debug(i) => &self.systems_debug[i].reads,
Priority::Barrier(_) => &[]
}
}
pub fn system_predecessors(&self, priority: Priority) -> &[Priority]{
match priority {
Priority::Send(i) => &self.systems[i].predecessors,
Priority::SendOnce(i) => &self.systems_once[i].predecessors,
Priority::ThreadLocal(i) => &self.systems_thread_local[i].predecessors,
Priority::ThreadLocalOnce(i) => &self.systems_once_thread_local[i].predecessors,
Priority::Creation(i) => &self.systems_creation[i].predecessors,
Priority::CreationOnce(i) => &self.systems_creation_once[i].predecessors,
#[cfg(feature = "async")]
Priority::CreationAsync(i) => &self.async_tasks[i].predecessors,
#[cfg(feature = "debug_parameters")]
Priority::Debug(i) => &self.systems_debug[i].predecessors,
Priority::Barrier(i) => &self.barriers[i].predecessors
}
}
pub fn system_predecessors_mut(&mut self, priority: Priority) -> &mut Vec<Priority>{
match priority {
Priority::Send(i) => &mut self.systems[i].predecessors,
Priority::SendOnce(i) => &mut self.systems_once[i].predecessors,
Priority::ThreadLocal(i) => &mut self.systems_thread_local[i].predecessors,
Priority::ThreadLocalOnce(i) => &mut self.systems_once_thread_local[i].predecessors,
Priority::Creation(i) => &mut self.systems_creation[i].predecessors,
Priority::CreationOnce(i) => &mut self.systems_creation_once[i].predecessors,
#[cfg(feature = "async")]
Priority::CreationAsync(i) => &mut self.async_tasks[i].predecessors,
#[cfg(feature = "debug_parameters")]
Priority::Debug(i) => &mut self.systems_debug[i].predecessors,
Priority::Barrier(i) => &mut self.barriers[i].predecessors
}
}
pub fn system_successors(&self, priority: Priority) -> &[Priority]{
match priority {
Priority::Send(i) => &self.systems[i].successors,
Priority::SendOnce(i) => &self.systems_once[i].successors,
Priority::ThreadLocal(i) => &self.systems_thread_local[i].successors,
Priority::ThreadLocalOnce(i) => &self.systems_once_thread_local[i].successors,
Priority::Creation(i) => &self.systems_creation[i].successors,
Priority::CreationOnce(i) => &self.systems_creation_once[i].successors,
#[cfg(feature = "async")]
Priority::CreationAsync(i) => &self.async_tasks[i].successors,
#[cfg(feature = "debug_parameters")]
Priority::Debug(i) => &self.systems_debug[i].successors,
Priority::Barrier(i) => &self.barriers[i].successors
}
}
pub fn system_successors_mut(&mut self, priority: Priority) -> &mut Vec<Priority>{
match priority {
Priority::Send(i) => &mut self.systems[i].successors,
Priority::SendOnce(i) => &mut self.systems_once[i].successors,
Priority::ThreadLocal(i) => &mut self.systems_thread_local[i].successors,
Priority::ThreadLocalOnce(i) => &mut self.systems_once_thread_local[i].successors,
Priority::Creation(i) => &mut self.systems_creation[i].successors,
Priority::CreationOnce(i) => &mut self.systems_creation_once[i].successors,
#[cfg(feature = "async")]
Priority::CreationAsync(i) => &mut self.async_tasks[i].successors,
#[cfg(feature = "debug_parameters")]
Priority::Debug(i) => &mut self.systems_debug[i].successors,
Priority::Barrier(i) => &mut self.barriers[i].successors
}
}
pub fn succeded_by(&self, prec: Priority, succ: Priority) -> bool{
let node_successors = self.node_successors();
node_successors(&prec)
.any(|prec_succ| prec_succ == succ || self.succeded_by(prec_succ, succ))
}
pub fn preceded_by(&self, succ: Priority, prec: Priority) -> bool{
let node_predecessors = self.node_predecessors();
node_predecessors(&succ)
.any(|succ_prec| succ_prec == prec || self.preceded_by(succ_prec, prec))
}
fn add_succesor(&mut self, pred: Priority, succ: Priority){
match pred {
Priority::Send(i) => self.systems[i].successors.push(succ),
Priority::SendOnce(i) => self.systems_once[i].successors.push(succ),
Priority::ThreadLocal(i) => self.systems_thread_local[i].successors.push(succ),
Priority::ThreadLocalOnce(i) => self.systems_once_thread_local[i].successors.push(succ),
Priority::Creation(i) => self.systems_creation[i].successors.push(succ),
Priority::CreationOnce(i) => self.systems_creation_once[i].successors.push(succ),
#[cfg(feature = "async")]
Priority::CreationAsync(i) => self.async_tasks[i].successors.push(succ),
#[cfg(feature = "debug_parameters")]
Priority::Debug(i) => self.systems_debug[i].successors.push(succ),
Priority::Barrier(i) => self.barriers[i].successors.push(succ),
}
}
fn add_predecesor(&mut self, succ: Priority, pred: Priority){
match succ {
Priority::Send(i) => self.systems[i].predecessors.push(pred),
Priority::SendOnce(i) => self.systems_once[i].predecessors.push(pred),
Priority::ThreadLocal(i) => self.systems_thread_local[i].predecessors.push(pred),
Priority::ThreadLocalOnce(i) => self.systems_once_thread_local[i].predecessors.push(pred),
Priority::Creation(i) => self.systems_creation[i].predecessors.push(pred),
Priority::CreationOnce(i) => self.systems_creation_once[i].predecessors.push(pred),
#[cfg(feature = "async")]
Priority::CreationAsync(i) => self.async_tasks[i].predecessors.push(pred),
#[cfg(feature = "debug_parameters")]
Priority::Debug(i) => self.systems_debug[i].predecessors.push(pred),
Priority::Barrier(i) => self.barriers[i].predecessors.push(pred),
}
}
fn needs_index(&self) -> HashMap<TypeId, Vec<Priority>>{
let mut index = HashMap::default();
for system in self.graph_nodes() {
match system{
Priority::Send(i) => for needs in self.systems[i].needs.iter(){
index.entry(*needs).or_insert_with(|| vec![]).push(system)
},
Priority::SendOnce(i) => for needs in self.systems_once[i].needs.iter(){
index.entry(*needs).or_insert_with(|| vec![]).push(system)
},
Priority::ThreadLocal(i) => for needs in self.systems_thread_local[i].needs.iter(){
index.entry(*needs).or_insert_with(|| vec![]).push(system)
},
Priority::ThreadLocalOnce(i) => for needs in self.systems_once_thread_local[i].needs.iter(){
index.entry(*needs).or_insert_with(|| vec![]).push(system)
},
Priority::Creation(i) => for needs in self.systems_creation[i].needs.iter(){
index.entry(*needs).or_insert_with(|| vec![]).push(system)
},
Priority::CreationOnce(i) => for needs in self.systems_creation_once[i].needs.iter(){
index.entry(*needs).or_insert_with(|| vec![]).push(system)
},
#[cfg(feature = "async")]
Priority::CreationAsync(i) => for needs in self.async_tasks[i].needs.iter(){
index.entry(*needs).or_insert_with(|| vec![]).push(system)
},
#[cfg(feature = "debug_parameters")]
Priority::Debug(i) => for needs in self.systems_debug[i].needs.iter(){
index.entry(*needs).or_insert_with(|| vec![]).push(system)
},
Priority::Barrier(_) => (),
}
}
index
}
fn updates_index(&self) -> HashMap<TypeId, Vec<Priority>>{
let mut index = HashMap::default();
for system in self.graph_nodes() {
match system{
Priority::Send(i) => for updates in self.systems[i].updates.iter(){
index.entry(*updates).or_insert_with(|| vec![]).push(system)
},
Priority::SendOnce(i) => for updates in self.systems_once[i].updates.iter(){
index.entry(*updates).or_insert_with(|| vec![]).push(system)
},
Priority::ThreadLocal(i) => for updates in self.systems_thread_local[i].updates.iter(){
index.entry(*updates).or_insert_with(|| vec![]).push(system)
},
Priority::ThreadLocalOnce(i) => for updates in self.systems_once_thread_local[i].updates.iter(){
index.entry(*updates).or_insert_with(|| vec![]).push(system)
},
Priority::Creation(i) => for updates in self.systems_creation[i].updates.iter(){
index.entry(*updates).or_insert_with(|| vec![]).push(system)
},
Priority::CreationOnce(i) => for updates in self.systems_creation_once[i].updates.iter(){
index.entry(*updates).or_insert_with(|| vec![]).push(system)
},
#[cfg(feature = "async")]
Priority::CreationAsync(i) => for updates in self.async_tasks[i].updates.iter(){
index.entry(*updates).or_insert_with(|| vec![]).push(system)
},
#[cfg(feature = "debug_parameters")]
Priority::Debug(i) => for updates in self.systems_debug[i].updates.iter(){
index.entry(*updates).or_insert_with(|| vec![]).push(system)
},
Priority::Barrier(_) => (),
}
}
index
}
fn creates_index(&self) -> HashMap<TypeId, Vec<Priority>>{
let mut index = HashMap::default();
for system in self.graph_nodes() {
match system{
Priority::Send(_) => (),
Priority::SendOnce(_) => (),
Priority::ThreadLocal(_) => (),
Priority::ThreadLocalOnce(_) => (),
Priority::Creation(i) => for creates in self.systems_creation[i].creates.iter(){
index.entry(*creates).or_insert_with(|| vec![]).push(system)
},
Priority::CreationOnce(i) => for creates in self.systems_creation_once[i].creates.iter(){
index.entry(*creates).or_insert_with(|| vec![]).push(system)
},
#[cfg(feature = "async")]
Priority::CreationAsync(i) => for creates in self.async_tasks[i].creates.iter(){
index.entry(*creates).or_insert_with(|| vec![]).push(system)
},
#[cfg(feature = "debug_parameters")]
Priority::Debug(_) => (),
Priority::Barrier(_) => (),
}
}
index
}
fn calculate_successors(&mut self) {
let barriers = unsafe{ delete_lifetime_mut(&mut self.barriers) };
let systems = unsafe{ delete_lifetime_mut(&mut self.systems) };
let systems_once = unsafe{ delete_lifetime_mut(&mut self.systems_once) };
let systems_thread_local = unsafe{ delete_lifetime_mut(&mut self.systems_thread_local) };
let systems_once_thread_local = unsafe{ delete_lifetime_mut(&mut self.systems_once_thread_local) };
let systems_creation = unsafe{ delete_lifetime_mut(&mut self.systems_creation) };
let systems_creation_once = unsafe{ delete_lifetime_mut(&mut self.systems_creation_once) };
#[cfg(feature = "async")]
let async_tasks = unsafe{ delete_lifetime_mut(&mut self.async_tasks) };
#[cfg(feature = "debug_parameters")]
let systems_debug = unsafe{ delete_lifetime_mut(&mut self.systems_debug) };
let needs_index = self.needs_index();
let updates_index = self.updates_index();
for system in systems.iter_mut() {
system.successors.clear();
let exisiting_befores = system.before.iter()
.filter_map(|id| self.id_index.get(id).cloned());
system.successors.extend(exisiting_befores);
}
for system in systems_once.iter_mut() {
system.successors.clear();
let exisiting_befores = system.before.iter()
.filter_map(|id| self.id_index.get(id).cloned());
system.successors.extend(exisiting_befores);
}
for system in systems_thread_local.iter_mut() {
system.successors.clear();
let exisiting_befores = system.before.iter()
.filter_map(|id| self.id_index.get(id).cloned());
system.successors.extend(exisiting_befores);
}
for system in systems_once_thread_local.iter_mut() {
system.successors.clear();
let exisiting_befores = system.before.iter()
.filter_map(|id| self.id_index.get(id).cloned());
system.successors.extend(exisiting_befores);
}
for system in systems_creation.iter_mut() {
system.successors.clear();
let exisiting_befores = system.before.iter()
.filter_map(|id| self.id_index.get(id).cloned());
system.successors.extend(exisiting_befores);
}
for system in systems_creation_once.iter_mut() {
system.successors.clear();
let exisiting_befores = system.before.iter()
.filter_map(|id| self.id_index.get(id).cloned());
system.successors.extend(exisiting_befores);
}
#[cfg(feature = "async")]
for system in async_tasks.values_mut() {
system.successors.clear();
let exisiting_befores = system.before.iter()
.filter_map(|id| self.id_index.get(id).cloned());
system.successors.extend(exisiting_befores);
}
#[cfg(feature = "debug_parameters")]
for system in systems_debug.iter_mut() {
system.successors.clear();
let exisiting_befores = system.before.iter()
.filter_map(|id| self.id_index.get(id).cloned());
system.successors.extend(exisiting_befores);
}
for barrier in barriers.iter_mut(){
barrier.successors.clear();
let exisiting_befores = barrier.before.iter()
.filter_map(|id| self.id_index.get(id).cloned());
barrier.successors.extend(exisiting_befores);
}
for (i, system) in systems.iter_mut().enumerate() {
let this = Priority::Send(i);
let successors_from_resources = system.updates.iter()
.flat_map(|updates| {
needs_index.get(updates)
.map(|need| need.iter().cloned().filter(|need| *need != this).collect())
.unwrap_or_else(|| vec![])
});
system.successors.extend(successors_from_resources);
for predecessor in system.after.iter() {
if let Some(predecessor) = self.id_index.get(predecessor).cloned() {
self.add_succesor(predecessor, this);
}
}
}
for (i, system) in systems_once.iter_mut().enumerate() {
let this = Priority::SendOnce(i);
let successors_from_resources = system.updates.iter()
.flat_map(|updates| {
needs_index.get(updates)
.map(|need| need.iter().cloned().filter(|need| *need != this).collect())
.unwrap_or_else(|| vec![])
});
system.successors.extend(successors_from_resources);
for predecessor in system.after.iter() {
if let Some(predecessor) = self.id_index.get(predecessor).cloned() {
self.add_succesor(predecessor, this);
}
}
}
for (i, system) in systems_thread_local.iter_mut().enumerate() {
let this = Priority::ThreadLocal(i);
let successors_from_resources = system.updates.iter()
.flat_map(|updates| {
needs_index.get(updates)
.map(|need| need.iter().cloned().filter(|need| *need != this).collect())
.unwrap_or_else(|| vec![])
});
system.successors.extend(successors_from_resources);
for predecessor in system.after.iter() {
if let Some(predecessor) = self.id_index.get(predecessor).cloned() {
self.add_succesor(predecessor, this);
}
}
}
for (i, system) in systems_once_thread_local.iter_mut().enumerate() {
let this = Priority::ThreadLocal(i);
let successors_from_resources = system.updates.iter()
.flat_map(|updates| {
needs_index.get(updates)
.map(|need| need.iter().cloned().filter(|need| *need != this).collect())
.unwrap_or_else(|| vec![])
});
system.successors.extend(successors_from_resources);
for predecessor in system.after.iter() {
if let Some(predecessor) = self.id_index.get(predecessor).cloned() {
self.add_succesor(predecessor, this);
}
}
}
for (i, system) in systems_creation.iter_mut().enumerate() {
let this = Priority::Creation(i);
let successors_from_resources = system.creates.iter()
.flat_map(|creates| {
updates_index.get(creates)
.map(|update| update.iter().cloned().filter(|update| *update != this).collect())
.unwrap_or_else(|| vec![])
});
system.successors.extend(successors_from_resources);
let successors_from_resources = system.creates.iter()
.flat_map(|creates| {
needs_index.get(creates)
.map(|need| need.iter().cloned().filter(|need| *need != this).collect())
.unwrap_or_else(|| vec![])
});
system.successors.extend(successors_from_resources);
let successors_from_resources = system.updates.iter()
.flat_map(|updates| {
needs_index.get(updates)
.map(|need| need.iter().cloned().filter(|need| *need != this).collect())
.unwrap_or_else(|| vec![])
});
system.successors.extend(successors_from_resources);
for predecessor in system.after.iter() {
if let Some(predecessor) = self.id_index.get(predecessor).cloned() {
self.add_succesor(predecessor, this);
}
}
}
for (i, system) in systems_creation_once.iter_mut().enumerate() {
let this = Priority::CreationOnce(i);
let successors_from_resources = system.creates.iter()
.flat_map(|creates| {
updates_index.get(creates)
.map(|update| update.iter().cloned().filter(|update| *update != this).collect())
.unwrap_or_else(|| vec![])
});
system.successors.extend(successors_from_resources);
let successors_from_resources = system.creates.iter()
.flat_map(|creates| {
needs_index.get(creates)
.map(|need| need.iter().cloned().filter(|need| *need != this).collect())
.unwrap_or_else(|| vec![])
});
system.successors.extend(successors_from_resources);
let successors_from_resources = system.updates.iter()
.flat_map(|updates| {
needs_index.get(updates)
.map(|need| need.iter().cloned().filter(|need| *need != this).collect())
.unwrap_or_else(|| vec![])
});
system.successors.extend(successors_from_resources);
for predecessor in system.after.iter() {
if let Some(predecessor) = self.id_index.get(predecessor).cloned() {
self.add_succesor(predecessor, this);
}
}
}
#[cfg(feature = "async")]
for (i, system) in async_tasks.iter_mut() {
let this = Priority::CreationAsync(i);
let successors_from_resources = system.creates.iter()
.flat_map(|creates| {
updates_index.get(creates)
.map(|update| update.iter().cloned().filter(|update| *update != this).collect())
.unwrap_or_else(|| vec![])
});
system.successors.extend(successors_from_resources);
let successors_from_resources = system.creates.iter()
.flat_map(|creates| {
needs_index.get(creates)
.map(|need| need.iter().cloned().filter(|need| *need != this).collect())
.unwrap_or_else(|| vec![])
});
system.successors.extend(successors_from_resources);
let successors_from_resources = system.updates.iter()
.flat_map(|updates| {
needs_index.get(updates)
.map(|need| need.iter().cloned().filter(|need| *need != this).collect())
.unwrap_or_else(|| vec![])
});
system.successors.extend(successors_from_resources);
for predecessor in system.after.iter() {
if let Some(predecessor) = self.id_index.get(predecessor).cloned() {
self.add_succesor(predecessor, this);
}
}
}
#[cfg(feature = "debug_parameters")]
for (i, system) in systems_debug.iter_mut().enumerate() {
let this = Priority::Debug(i);
let successors_from_resources = system.updates.iter()
.flat_map(|updates| {
needs_index.get(updates)
.map(|need| need.iter().cloned().filter(|need| *need != this).collect())
.unwrap_or_else(|| vec![])
});
system.successors.extend(successors_from_resources);
for predecessor in system.after.iter() {
if let Some(predecessor) = self.id_index.get(predecessor).cloned() {
self.add_succesor(predecessor, this);
}
}
}
for (i, barrier) in barriers.iter_mut().enumerate() {
let this = Priority::Barrier(i);
for predecessor in barrier.after.iter() {
if let Some(predecessor) = self.id_index.get(predecessor).cloned() {
self.add_succesor(predecessor, this);
}
}
}
for system in self.systems.iter_mut(){
system.successors.sort_unstable();
system.successors.dedup();
}
for system in self.systems_once.iter_mut(){
system.successors.sort_unstable();
system.successors.dedup();
}
for system in self.systems_thread_local.iter_mut(){
system.successors.sort_unstable();
system.successors.dedup();
}
for system in self.systems_once_thread_local.iter_mut(){
system.successors.sort_unstable();
system.successors.dedup();
}
for system in self.systems_creation.iter_mut(){
system.successors.sort_unstable();
system.successors.dedup();
}
for system in self.systems_creation_once.iter_mut(){
system.successors.sort_unstable();
system.successors.dedup();
}
#[cfg(feature = "async")]
for system in self.async_tasks.values_mut(){
system.successors.sort_unstable();
system.successors.dedup();
}
#[cfg(feature = "debug_parameters")]
for system in self.systems_debug.iter_mut(){
system.successors.sort_unstable();
system.successors.dedup();
}
for barrier in self.barriers.iter_mut(){
barrier.successors.sort_unstable();
barrier.successors.dedup();
}
let nodes = self.graph_nodes().collect::<Vec<_>>();
let topological_sort = topological_sort(&nodes, self.node_successors());
if topological_sort.is_ok() {
for system_i in nodes.iter() {
for system_j in nodes.iter() {
for system_k in nodes.iter() {
let succeded = {
let successors_i = self.system_successors(*system_i);
successors_i.contains(system_k)
&& successors_i.contains(system_j)
&& self.succeded_by(*system_j, *system_k)
};
if succeded {
let system_i_succs = self.system_successors_mut(*system_i);
if let Some(pos) = system_i_succs.iter().position(|p| p == system_k){
system_i_succs.swap_remove(pos);
}
}
}
}
}
}
self.needs_dependencies_refresh = false;
}
fn calculate_predecessors(&mut self) {
let barriers = unsafe{ delete_lifetime_mut(&mut self.barriers) };
let systems = unsafe{ delete_lifetime_mut(&mut self.systems) };
let systems_once = unsafe{ delete_lifetime_mut(&mut self.systems_once) };
let systems_thread_local = unsafe{ delete_lifetime_mut(&mut self.systems_thread_local) };
let systems_once_thread_local = unsafe{ delete_lifetime_mut(&mut self.systems_once_thread_local) };
let systems_creation = unsafe{ delete_lifetime_mut(&mut self.systems_creation) };
let systems_creation_once = unsafe{ delete_lifetime_mut(&mut self.systems_creation_once) };
#[cfg(feature = "async")]
let async_tasks = unsafe{ delete_lifetime_mut(&mut self.async_tasks) };
#[cfg(feature = "debug_parameters")]
let systems_debug = unsafe{ delete_lifetime_mut(&mut self.systems_debug) };
let creates_index = self.creates_index();
let updates_index = self.updates_index();
for system in systems.iter_mut() {
system.predecessors.clear();
let exisiting_afters = system.after.iter()
.filter_map(|id| self.id_index.get(id).cloned());
system.predecessors.extend(exisiting_afters);
}
for system in systems_once.iter_mut() {
system.predecessors.clear();
let exisiting_afters = system.after.iter()
.filter_map(|id| self.id_index.get(id).cloned());
system.predecessors.extend(exisiting_afters);
}
for system in systems_thread_local.iter_mut() {
system.predecessors.clear();
let exisiting_afters = system.after.iter()
.filter_map(|id| self.id_index.get(id).cloned());
system.predecessors.extend(exisiting_afters);
}
for system in systems_once_thread_local.iter_mut() {
system.predecessors.clear();
let exisiting_afters = system.after.iter()
.filter_map(|id| self.id_index.get(id).cloned());
system.predecessors.extend(exisiting_afters);
}
for system in systems_creation.iter_mut() {
system.predecessors.clear();
let exisiting_afters = system.after.iter()
.filter_map(|id| self.id_index.get(id).cloned());
system.predecessors.extend(exisiting_afters);
}
for system in systems_creation_once.iter_mut() {
system.predecessors.clear();
let exisiting_afters = system.after.iter()
.filter_map(|id| self.id_index.get(id).cloned());
system.predecessors.extend(exisiting_afters);
}
#[cfg(feature = "async")]
for system in async_tasks.values_mut() {
system.predecessors.clear();
let exisiting_afters = system.after.iter()
.filter_map(|id| self.id_index.get(id).cloned());
system.predecessors.extend(exisiting_afters);
}
#[cfg(feature = "debug_parameters")]
for system in systems_debug.iter_mut() {
system.predecessors.clear();
let exisiting_afters = system.after.iter()
.filter_map(|id| self.id_index.get(id).cloned());
system.predecessors.extend(exisiting_afters);
}
for barrier in barriers.iter_mut(){
barrier.predecessors.clear();
let exisiting_afters = barrier.after.iter()
.filter_map(|id| self.id_index.get(id).cloned());
barrier.predecessors.extend(exisiting_afters);
}
for (i, system) in systems.iter_mut().enumerate() {
let this = Priority::Send(i);
let predecessors_from_resources = system.updates.iter()
.flat_map(|updates| {
creates_index.get(updates)
.map(|create| create.iter().cloned().filter(|create| *create != this).collect())
.unwrap_or_else(|| vec![])
});
system.predecessors.extend(predecessors_from_resources);
let predecessors_from_resources = system.needs.iter()
.flat_map(|needs| {
creates_index.get(needs)
.map(|create| create.iter().cloned().filter(|create| *create != this).collect())
.unwrap_or_else(|| vec![])
});
system.predecessors.extend(predecessors_from_resources);
let predecessors_from_resources = system.needs.iter()
.flat_map(|needs| {
updates_index.get(needs)
.map(|update| update.iter().cloned().filter(|update| *update != this).collect())
.unwrap_or_else(|| vec![])
});
system.predecessors.extend(predecessors_from_resources);
for successor in system.before.iter() {
if let Some(successor) = self.id_index.get(successor).cloned() {
self.add_predecesor(successor, this);
}
}
}
for (i, system) in systems_once.iter_mut().enumerate() {
let this = Priority::SendOnce(i);
let predecessors_from_resources = system.updates.iter()
.flat_map(|updates| {
creates_index.get(updates)
.map(|create| create.iter().cloned().filter(|create| *create != this).collect())
.unwrap_or_else(|| vec![])
});
system.predecessors.extend(predecessors_from_resources);
let predecessors_from_resources = system.needs.iter()
.flat_map(|needs| {
creates_index.get(needs)
.map(|create| create.iter().cloned().filter(|create| *create != this).collect())
.unwrap_or_else(|| vec![])
});
system.predecessors.extend(predecessors_from_resources);
let predecessors_from_resources = system.needs.iter()
.flat_map(|needs| {
updates_index.get(needs)
.map(|update| update.iter().cloned().filter(|update| *update != this).collect())
.unwrap_or_else(|| vec![])
});
system.predecessors.extend(predecessors_from_resources);
for successor in system.before.iter() {
if let Some(successor) = self.id_index.get(successor).cloned() {
self.add_predecesor(successor, this);
}
}
}
for (i, system) in systems_thread_local.iter_mut().enumerate() {
let this = Priority::ThreadLocal(i);
let predecessors_from_resources = system.updates.iter()
.flat_map(|updates| {
creates_index.get(updates)
.map(|create| create.iter().cloned().filter(|create| *create != this).collect())
.unwrap_or_else(|| vec![])
});
system.predecessors.extend(predecessors_from_resources);
let predecessors_from_resources = system.needs.iter()
.flat_map(|needs| {
creates_index.get(needs)
.map(|create| create.iter().cloned().filter(|create| *create != this).collect())
.unwrap_or_else(|| vec![])
});
system.predecessors.extend(predecessors_from_resources);
let predecessors_from_resources = system.needs.iter()
.flat_map(|needs| {
updates_index.get(needs)
.map(|update| update.iter().cloned().filter(|update| *update != this).collect())
.unwrap_or_else(|| vec![])
});
system.predecessors.extend(predecessors_from_resources);
for successor in system.before.iter() {
if let Some(successor) = self.id_index.get(successor).cloned() {
self.add_predecesor(successor, this);
}
}
}
for (i, system) in systems_once_thread_local.iter_mut().enumerate() {
let this = Priority::ThreadLocalOnce(i);
let predecessors_from_resources = system.updates.iter()
.flat_map(|updates| {
creates_index.get(updates)
.map(|create| create.iter().cloned().filter(|create| *create != this).collect())
.unwrap_or_else(|| vec![])
});
system.predecessors.extend(predecessors_from_resources);
let predecessors_from_resources = system.needs.iter()
.flat_map(|needs| {
creates_index.get(needs)
.map(|create| create.iter().cloned().filter(|create| *create != this).collect())
.unwrap_or_else(|| vec![])
});
system.predecessors.extend(predecessors_from_resources);
let predecessors_from_resources = system.needs.iter()
.flat_map(|needs| {
updates_index.get(needs)
.map(|update| update.iter().cloned().filter(|update| *update != this).collect())
.unwrap_or_else(|| vec![])
});
system.predecessors.extend(predecessors_from_resources);
for successor in system.before.iter() {
if let Some(successor) = self.id_index.get(successor).cloned() {
self.add_predecesor(successor, this);
}
}
}
for (i, system) in systems_creation.iter_mut().enumerate() {
let this = Priority::Creation(i);
let predecessors_from_resources = system.updates.iter()
.flat_map(|updates| {
creates_index.get(updates)
.map(|create| create.iter().cloned().filter(|create| *create != this).collect())
.unwrap_or_else(|| vec![])
});
system.predecessors.extend(predecessors_from_resources);
let predecessors_from_resources = system.needs.iter()
.flat_map(|needs| {
creates_index.get(needs)
.map(|create| create.iter().cloned().filter(|create| *create != this).collect())
.unwrap_or_else(|| vec![])
});
system.predecessors.extend(predecessors_from_resources);
let predecessors_from_resources = system.needs.iter()
.flat_map(|needs| {
updates_index.get(needs)
.map(|update| update.iter().cloned().filter(|update| *update != this).collect())
.unwrap_or_else(|| vec![])
});
system.predecessors.extend(predecessors_from_resources);
for successor in system.before.iter() {
if let Some(successor) = self.id_index.get(successor).cloned() {
self.add_predecesor(successor, this);
}
}
}
for (i, system) in systems_creation_once.iter_mut().enumerate() {
let this = Priority::CreationOnce(i);
let predecessors_from_resources = system.updates.iter()
.flat_map(|updates| {
creates_index.get(updates)
.map(|create| create.iter().cloned().filter(|create| *create != this).collect())
.unwrap_or_else(|| vec![])
});
system.predecessors.extend(predecessors_from_resources);
let predecessors_from_resources = system.needs.iter()
.flat_map(|needs| {
creates_index.get(needs)
.map(|create| create.iter().cloned().filter(|create| *create != this).collect())
.unwrap_or_else(|| vec![])
});
system.predecessors.extend(predecessors_from_resources);
let predecessors_from_resources = system.needs.iter()
.flat_map(|needs| {
updates_index.get(needs)
.map(|update| update.iter().cloned().filter(|update| *update != this).collect())
.unwrap_or_else(|| vec![])
});
system.predecessors.extend(predecessors_from_resources);
for successor in system.before.iter() {
if let Some(successor) = self.id_index.get(successor).cloned() {
self.add_predecesor(successor, this);
}
}
}
#[cfg(feature = "async")]
for (i, system) in async_tasks.iter_mut() {
let this = Priority::CreationAsync(i);
let predecessors_from_resources = system.updates.iter()
.flat_map(|updates| {
creates_index.get(updates)
.map(|create| create.iter().cloned().filter(|create| *create != this).collect())
.unwrap_or_else(|| vec![])
});
system.predecessors.extend(predecessors_from_resources);
let predecessors_from_resources = system.needs.iter()
.flat_map(|needs| {
creates_index.get(needs)
.map(|create| create.iter().cloned().filter(|create| *create != this).collect())
.unwrap_or_else(|| vec![])
});
system.predecessors.extend(predecessors_from_resources);
let predecessors_from_resources = system.needs.iter()
.flat_map(|needs| {
updates_index.get(needs)
.map(|update| update.iter().cloned().filter(|update| *update != this).collect())
.unwrap_or_else(|| vec![])
});
system.predecessors.extend(predecessors_from_resources);
for successor in system.before.iter() {
if let Some(successor) = self.id_index.get(successor).cloned() {
self.add_predecesor(successor, this);
}
}
}
#[cfg(feature = "debug_parameters")]
for (i, system) in systems_debug.iter_mut().enumerate() {
let this = Priority::Debug(i);
let predecessors_from_resources = system.updates.iter()
.flat_map(|updates| {
creates_index.get(updates)
.map(|create| create.iter().cloned().filter(|create| *create != this).collect())
.unwrap_or_else(|| vec![])
});
system.predecessors.extend(predecessors_from_resources);
let predecessors_from_resources = system.needs.iter()
.flat_map(|needs| {
creates_index.get(needs)
.map(|create| create.iter().cloned().filter(|create| *create != this).collect())
.unwrap_or_else(|| vec![])
});
system.predecessors.extend(predecessors_from_resources);
let predecessors_from_resources = system.needs.iter()
.flat_map(|needs| {
updates_index.get(needs)
.map(|update| update.iter().cloned().filter(|update| *update != this).collect())
.unwrap_or_else(|| vec![])
});
system.predecessors.extend(predecessors_from_resources);
for successor in system.before.iter() {
if let Some(successor) = self.id_index.get(successor).cloned() {
self.add_predecesor(successor, this);
}
}
}
for (i, barrier) in barriers.iter_mut().enumerate() {
let this = Priority::Barrier(i);
for successor in barrier.before.iter() {
if let Some(successor) = self.id_index.get(successor).cloned() {
self.add_predecesor(successor, this);
}
}
}
for system in self.systems.iter_mut(){
system.predecessors.sort_unstable();
system.predecessors.dedup();
}
for system in self.systems_once.iter_mut(){
system.predecessors.sort_unstable();
system.predecessors.dedup();
}
for system in self.systems_thread_local.iter_mut(){
system.predecessors.sort_unstable();
system.predecessors.dedup();
}
for system in self.systems_once_thread_local.iter_mut(){
system.predecessors.sort_unstable();
system.predecessors.dedup();
}
for system in self.systems_creation.iter_mut(){
system.predecessors.sort_unstable();
system.predecessors.dedup();
}
for system in self.systems_creation_once.iter_mut(){
system.predecessors.sort_unstable();
system.predecessors.dedup();
}
#[cfg(feature = "async")]
for system in self.async_tasks.values_mut(){
system.predecessors.sort_unstable();
system.predecessors.dedup();
}
#[cfg(feature = "debug_parameters")]
for system in self.systems_debug.iter_mut(){
system.predecessors.sort_unstable();
system.predecessors.dedup();
}
for barrier in self.barriers.iter_mut(){
barrier.predecessors.sort_unstable();
barrier.predecessors.dedup();
}
let nodes = self.graph_nodes().collect::<Vec<_>>();
let topological_sort = topological_sort(&nodes, self.node_successors());
if topological_sort.is_ok() {
for system_i in nodes.iter() {
for system_j in nodes.iter() {
for system_k in nodes.iter() {
let preceded = {
let predecessors_i = self.system_predecessors(*system_i);
predecessors_i.contains(system_k)
&& predecessors_i.contains(system_j)
&& self.preceded_by(*system_j, *system_k)
};
if preceded {
let system_i_precs = self.system_predecessors_mut(*system_i);
if let Some(pos) = system_i_precs.iter().position(|p| p == system_k){
system_i_precs.swap_remove(pos);
}
}
}
}
}
}
self.needs_predecessors_refresh = false;
}
pub fn render_dependency_graph<P: AsRef<Path>>(&mut self, file: P){
if self.needs_dependencies_refresh {
self.calculate_successors();
}
use std::fs::File;
let mut f = File::create(file).unwrap();
dot::render(self, &mut f).unwrap();
}
fn node_successors<'a>(&'a self) -> impl Fn(&Priority) -> iter::Cloned<slice::Iter<'a, Priority>>{
move |node: &Priority| match node{
Priority::Send(i) => self.systems[*i].successors.iter().cloned(),
Priority::SendOnce(i) => self.systems_once[*i].successors.iter().cloned(),
Priority::ThreadLocal(i) => self.systems_thread_local[*i].successors.iter().cloned(),
Priority::ThreadLocalOnce(i) => self.systems_once_thread_local[*i].successors.iter().cloned(),
Priority::Creation(i) => self.systems_creation[*i].successors.iter().cloned(),
Priority::CreationOnce(i) => self.systems_creation_once[*i].successors.iter().cloned(),
#[cfg(feature = "async")]
Priority::CreationAsync(i) => self.async_tasks[*i].predecessors.iter().cloned(),
#[cfg(feature = "debug_parameters")]
Priority::Debug(i) => self.systems_debug[*i].successors.iter().cloned(),
Priority::Barrier(i) => self.barriers[*i].successors.iter().cloned(),
}
}
fn node_predecessors<'a>(&'a self) -> impl Fn(&Priority) -> iter::Cloned<slice::Iter<'a, Priority>>{
move |node: &Priority| match node{
Priority::Send(i) => self.systems[*i].predecessors.iter().cloned(),
Priority::SendOnce(i) => self.systems_once[*i].predecessors.iter().cloned(),
Priority::ThreadLocal(i) => self.systems_thread_local[*i].predecessors.iter().cloned(),
Priority::ThreadLocalOnce(i) => self.systems_once_thread_local[*i].predecessors.iter().cloned(),
Priority::Creation(i) => self.systems_creation[*i].predecessors.iter().cloned(),
Priority::CreationOnce(i) => self.systems_creation_once[*i].predecessors.iter().cloned(),
#[cfg(feature = "async")]
Priority::CreationAsync(i) => self.async_tasks[*i].predecessors.iter().cloned(),
#[cfg(feature = "debug_parameters")]
Priority::Debug(i) => self.systems_debug[*i].predecessors.iter().cloned(),
Priority::Barrier(i) => self.barriers[*i].predecessors.iter().cloned(),
}
}
pub fn graph_nodes(&self) -> impl Iterator<Item = Priority> + '_{
let send_nodes = (0 .. self.systems.len()).map(|i| Priority::Send(i));
let send_once_nodes = (0 .. self.systems_once.len()).map(|i| Priority::SendOnce(i));
let tl_nodes = (0 .. self.systems_thread_local.len()).map(|i| Priority::ThreadLocal(i));
let tl_once_nodes = (0 .. self.systems_once_thread_local.len()).map(|i| Priority::ThreadLocalOnce(i));
let creation_nodes = (0 .. self.systems_creation.len()).map(|i| Priority::Creation(i));
let creation_once_nodes = (0 .. self.systems_creation_once.len()).map(|i| Priority::CreationOnce(i));
#[cfg(feature = "async")]
let creation_nodes_async = self.async_tasks.keys().map(|i| Priority::CreationAsync(i));
#[cfg(not(feature = "async"))]
let creation_nodes_async = (0 .. 0).map(|i| Priority::Send(i));
#[cfg(feature = "debug_parameters")]
let debug_nodes = (0 .. self.systems_debug.len()).map(|i| Priority::Debug(i));
#[cfg(not(feature = "debug_parameters"))]
let debug_nodes = (0 .. 0).map(|i| Priority::Send(i));
let barriers = (0..self.barriers.len()).map(|i| Priority::Barrier(i));
send_nodes
.chain(send_once_nodes)
.chain(tl_nodes)
.chain(tl_once_nodes)
.chain(creation_nodes)
.chain(creation_once_nodes)
.chain(creation_nodes_async)
.chain(debug_nodes)
.chain(barriers)
}
pub fn topological_sort(&mut self, inverse_trait_index: &HashMap<TypeId, TypeId>) -> Result<Vec<(Priority, OperatorId)>, Vec<String>> {
if self.needs_dependencies_refresh {
self.calculate_successors();
}
if self.needs_predecessors_refresh {
self.calculate_predecessors();
}
let nodes = self.graph_nodes().collect::<Vec<_>>();
topological_sort(&nodes, self.node_successors())
.map(|systems| systems.into_iter().map(|system| {
let operator_id = match system {
Priority::Send(i) => {
let reads = self.systems[i].reads.iter()
.chain(&self.systems[i].needs)
.map(|id| if let Some(trait_id) = inverse_trait_index.get(id){
OperatorId::Several(vec![OperatorId::Read(*id), OperatorId::Read(*trait_id)])
}else{
OperatorId::Read(*id)
});
let writes = self.systems[i].writes.iter()
.chain(&self.systems[i].updates)
.map(|id| if let Some(trait_id) = inverse_trait_index.get(id){
OperatorId::Several(vec![OperatorId::Write(*id), OperatorId::Write(*trait_id)])
}else{
OperatorId::Write(*id)
});
OperatorId::Several(reads.chain(writes).collect())
}
Priority::SendOnce(i) => {
let reads = self.systems_once[i].reads.iter()
.chain(&self.systems_once[i].needs)
.map(|id| if let Some(trait_id) = inverse_trait_index.get(id){
OperatorId::Several(vec![OperatorId::Read(*id), OperatorId::Read(*trait_id)])
}else{
OperatorId::Read(*id)
});
let writes = self.systems_once[i].writes.iter()
.chain(&self.systems_once[i].updates)
.map(|id| if let Some(trait_id) = inverse_trait_index.get(id){
OperatorId::Several(vec![OperatorId::Write(*id), OperatorId::Write(*trait_id)])
}else{
OperatorId::Write(*id)
});
OperatorId::Several(reads.chain(writes).collect())
}
Priority::ThreadLocal(i) => {
let reads = self.systems_thread_local[i].reads.iter()
.chain(&self.systems_thread_local[i].needs)
.map(|id| if let Some(trait_id) = inverse_trait_index.get(id){
OperatorId::Several(vec![OperatorId::Read(*id), OperatorId::Read(*trait_id)])
}else{
OperatorId::Read(*id)
});
let writes = self.systems_thread_local[i].writes.iter()
.chain(&self.systems_thread_local[i].updates)
.map(|id| if let Some(trait_id) = inverse_trait_index.get(id){
OperatorId::Several(vec![OperatorId::Write(*id), OperatorId::Write(*trait_id)])
}else{
OperatorId::Write(*id)
});
OperatorId::Several(reads.chain(writes).collect())
}
Priority::ThreadLocalOnce(i) => {
let reads = self.systems_once_thread_local[i].reads.iter()
.chain(&self.systems_once_thread_local[i].needs)
.map(|id| if let Some(trait_id) = inverse_trait_index.get(id){
OperatorId::Several(vec![OperatorId::Read(*id), OperatorId::Read(*trait_id)])
}else{
OperatorId::Read(*id)
});
let writes = self.systems_once_thread_local[i].writes.iter()
.chain(&self.systems_once_thread_local[i].updates)
.map(|id| if let Some(trait_id) = inverse_trait_index.get(id){
OperatorId::Several(vec![OperatorId::Write(*id), OperatorId::Write(*trait_id)])
}else{
OperatorId::Write(*id)
});
OperatorId::Several(reads.chain(writes).collect())
}
Priority::Creation(_)
| Priority::CreationOnce(_)
| Priority::Barrier(_) =>
{
OperatorId::None
}
#[cfg(feature="async")]
Priority::CreationAsync(_) => OperatorId::None,
#[cfg(feature="debug_parameters")]
Priority::Debug(_) => OperatorId::None,
};
(system, operator_id)
}).collect())
.map_err(|node_in_loop| {
let short_loop = bfs_loop(&node_in_loop, self.node_successors());
short_loop.unwrap().into_iter().map(|p| self.system_name(p)).collect::<Vec<_>>()
})
}
pub fn strongly_connected_components(&mut self) -> Vec<Vec<String>>{
if self.needs_dependencies_refresh {
self.calculate_successors();
}
let nodes = self.graph_nodes().collect::<Vec<_>>();
strongly_connected_components(&nodes, self.node_successors())
.into_iter()
.map(|ps| ps.into_iter().map(|p| self.system_name(p)).collect())
.collect()
}
#[inline]
pub fn run_one(&mut self,
next: Priority,
storages: &Storages,
resources: &mut ResourcesContainer,
next_guid: &AtomicUsize,
#[cfg(feature="dynamic_systems")]
dynamic_systems: &DynamicSystemsLoader) -> bool
{
let system_info = self.system_file_line_info(next);
match next{
Priority::Send(i) => {
self.run_send_system(i, storages, resources);
false
}
Priority::SendOnce(i) => {
self.run_send_once_system(i, storages, resources);
false
}
Priority::ThreadLocal(i) => {
self.run_thread_local_system(i, storages, resources);
false
}
Priority::ThreadLocalOnce(i) => {
self.run_thread_local_once_system(i, storages, resources);
false
}
Priority::Creation(i) => {
let entities = EntitiesCreation{
storages: storages.substorages_all(),
next_guid: next_guid.clone(),
};
let resources = ResourcesCreation{
resources: resources,
system_info,
#[cfg(feature="dynamic_systems")]
dynamic_systems,
};
self.run_creation_system(i, entities, resources);
false
}
Priority::CreationOnce(i) => {
let entities = EntitiesCreation{
storages: storages.substorages_all(),
next_guid: next_guid.clone(),
};
let resources = ResourcesCreation{
resources,
system_info,
#[cfg(feature="dynamic_systems")]
dynamic_systems,
};
self.run_creation_system_once(i, entities, resources);
false
}
#[cfg(feature = "async")]
Priority::CreationAsync(i) => {
let entities = CreationProxy{
storages: storages.clone(),
next_guid: next_guid.clone(),
resources: resources.clone(),
};
let mut resources = ResourcesThreadLocal{
resources: unsafe{ & *resources.get() },
resource_mask_r: None,
resource_mask_w: None,
system_info,
#[cfg(feature="dynamic_systems")]
dynamic_systems,
};
self.run_creation_system_async(i, entities, resources)
}
#[cfg(feature="debug_parameters")]
Priority::Debug(i) => {
let entities = EntitiesDebug{
storages: storages.substorages_all(),
};
let resources = ResourcesThreadLocal{
resources,
resource_mask_r: None,
resource_mask_w: None,
system_info,
#[cfg(feature="dynamic_systems")]
dynamic_systems,
};
self.run_debug_system(i, entities, resources);
false
}
Priority::Barrier(_) => false,
}
}
pub fn add_barrier<B: super::Barrier + 'static>(&mut self){
let barrier = Barrier{
name: B::name(),
after: B::after(),
before: B::before(),
successors: vec![],
predecessors: vec![],
};
let priority = Priority::Barrier(self.barriers.len());
self.barriers.push(barrier);
self.needs_dependencies_refresh = true;
self.needs_predecessors_refresh = true;
self.id_index.insert(SystemId::Barrier(TypeId::of::<B>()), priority);
}
pub fn add_system<S, TraitObject>(&mut self,
system: S,
stat: Option<StatsType>,
checks: Option<SystemConditionElse<TraitObject>>,
needs: Option<Vec<TypeId>>,
updates: Option<Vec<TypeId>>,
reads: Option<Vec<TypeId>>,
writes: Option<Vec<TypeId>>,
)
where S: AnySystem<TraitObject> + 'static,
{
let mut name = stat.map(|stat| stat.name().to_owned());
let priority = {
if let Some(name) = name.as_mut(){
while S::iter(self)
.find(|system| system.name.map(|n| n == name).unwrap_or(false))
.is_some()
{
*name += "_"
}
}
system.insert_trait_object(checks, needs, updates, reads, writes, self)
};
self.id_index.insert(SystemId::TypeId(TypeId::of::<S>()), priority);
if let Some(name) = stat.map(|stat| stat.name().to_owned()){
self.id_index.insert(SystemId::Name(name), priority);
}
#[cfg(feature="stats_events")]
{
if let Some(stat) = stat {
self.stats_events.insert(priority, (name.clone().unwrap(), SenderRc::new()));
if let StatsType::Gpu(_) = stat{
self.gpu_stats_events.insert(priority, (name.clone().unwrap(), SenderRc::new()));
}
self.enabled_systems.insert(priority, Property::new(true));
}
}
self.needs_dependencies_refresh = true;
self.needs_predecessors_refresh = true;
}
#[cfg(feature = "async")]
pub fn add_creation_system_async<S>(&mut self,
future: Pin<Box<dyn Future<Output=()>>>,
stat: Option<StatsType>,
checks: Option<SystemConditionElse<Box<dyn CreationSystemAsync>>>)
where S: CreationSystemAsync + 'static
{
let mut name = stat.map(|stat| stat.name().to_owned());
let priority = {
if let Some(name) = name.as_mut(){
while self.async_tasks.values()
.find(|system| system.name.map(|n| n == name).unwrap_or(false))
.is_some()
{
*name += "_"
}
}
let task = Arc::new(Task::new(future));
let (checks, els) = checks
.map(|checks| checks.condition_else())
.unwrap_or((None, None));
let system_info = SystemInfo{
system: task,
name: S::name(),
checks,
els: None,
before: S::before(),
after: S::after(),
updates: S::updates(),
needs: S::needs(),
creates: S::creates(),
reads: S::reads(),
writes: S::writes(),
successors: vec![],
predecessors: vec![],
};
let priority = self.async_tasks.insert_key_gen(system_info);
Priority::CreationAsync(priority)
};
self.id_index.insert(SystemId::TypeId(TypeId::of::<S>()), priority);
if let Some(name) = stat.map(|stat| stat.name().to_owned()){
self.id_index.insert(SystemId::Name(name), priority);
}
#[cfg(feature="stats_events")]
{
if let Some(stat) = stat {
self.stats_events.insert(priority, (name.clone().unwrap(), SenderRc::new()));
if let StatsType::Gpu(_) = stat{
self.gpu_stats_events.insert(priority, (name.clone().unwrap(), SenderRc::new()));
}
self.enabled_systems.insert(priority, Property::new(true));
}
}
self.needs_dependencies_refresh = true;
self.needs_predecessors_refresh = true;
}
#[cfg(feature="stats_events")]
pub fn stats(&mut self) -> impl Iterator<Item = (&str, Property<'static, Stat>)>{
self.stats_events.iter_mut()
.map(|(_, (name, sender))| (name.as_str(), sender.stream().to_property(Stat::default())))
}
#[cfg(all(feature="stats_events", feature="glin"))]
pub fn gpu_stats<C: glin::CreationContext>(&mut self, gl: &C) -> impl Iterator<Item = (&str, Property<'static, Duration>)>{
for (prio, (_, _)) in self.gpu_stats_events.iter(){
if !self.gpu_stats_counters.contains_key(prio){
self.gpu_stats_counters.insert(prio, gl.new_duration_query());
}
}
self.gpu_stats_events.iter_mut()
.map(|(_, (name, sender))| (name.as_str(), sender.stream().to_property(Duration::default())))
}
#[cfg(feature="stats_events")]
pub fn enabled_systems(&mut self) -> impl Iterator<Item = (&str, Property<'static, bool>)>{
self.enabled_systems.iter().zip(self.stats_events.iter())
.map(|((_, enabled), (_, (name,_)))| (name.as_str(), enabled.clone()))
}
#[cfg(feature="debug_parameters")]
pub fn debug_system_name(&self, i: usize) -> Option<&str>{
self.systems_debug[i].name
}
#[cfg(feature="stats_events")]
pub fn send_stats(&self){
for (prio, stat) in self.stats.iter(){
if let Some((_, sender)) = self.stats_events.get(prio){
sender.send(stat.clone())
}
}
for stat in self.gpu_stats.iter() {
if let Some((_, sender)) = self.gpu_stats_events.get(stat.0){
sender.send(stat.1)
}
}
}
#[cfg(feature="stats_events")]
pub fn previous_duration(&self, priority: Priority) -> Duration {
self.prev_stats.get(priority).map(|s| s.duration()).unwrap_or_default()
}
#[cfg(feature="stats_events")]
pub fn export_tracing_stats<P: AsRef<Path>>(&self, path: P){
if let Some(mut min) = self.stats.0[0].values().next().map(|s| s.start()){
for (_, stat) in self.stats.iter() {
min = stat.start().min(min);
}
let tracing_stats = self.stats.iter().flat_map(|(prio, stat)|{
if let Some((name, _)) = self.stats_events.get(prio){
let start = TracingStat{
cat: "".to_string(),
pid: 0,
tid: stat.thread_id(),
#[cfg(feature = "stdweb")]
ts: ((stat.start() - min) * 1000.) as u64,
#[cfg(not(feature = "stdweb"))]
ts: (stat.start() - min).as_micros() as u64,
ph: 'B',
name: name.clone(),
};
let end = TracingStat{
cat: "".to_string(),
pid: 0,
tid: stat.thread_id(),
#[cfg(feature = "stdweb")]
ts: ((stat.start() - min) * 1000.) as u64,
#[cfg(not(feature = "stdweb"))]
ts: (stat.end() - min).as_micros() as u64,
ph: 'E',
name: name.clone(),
};
vec![start, end]
}else{
vec![]
}
}).collect::<Vec<_>>();
let stats = serde_json::to_string(&tracing_stats).unwrap();
let mut file = std::fs::File::create(path).unwrap();
file.write_all(stats.as_bytes()).unwrap();
}
}
pub fn needs_predecessors_refresh(&self) -> bool {
self.needs_predecessors_refresh
}
fn set_system_masks(&mut self, priority: Priority, mask_r: &MaskType, mask_w: &MaskType) {
match priority {
Priority::Send(i) => {
self.systems[i].resource_mask_r = mask_r | mask_w;
self.systems[i].resource_mask_w = mask_w.clone();
},
Priority::SendOnce(i) => {
self.systems_once[i].resource_mask_r = mask_r | mask_w;
self.systems_once[i].resource_mask_w = mask_w.clone();
},
Priority::ThreadLocal(i) => {
self.systems_thread_local[i].resource_mask_r = mask_r | mask_w;
self.systems_thread_local[i].resource_mask_w = mask_w.clone();
},
Priority::ThreadLocalOnce(i) => {
self.systems_once_thread_local[i].resource_mask_r = mask_r | mask_w;
self.systems_once_thread_local[i].resource_mask_w = mask_w.clone();
},
Priority::Creation(i) => {
self.systems_creation[i].resource_mask_r = mask_r | mask_w;
self.systems_creation[i].resource_mask_w = mask_w.clone();
},
Priority::CreationOnce(i) => {
self.systems_creation_once[i].resource_mask_r = mask_r | mask_w;
self.systems_creation_once[i].resource_mask_w = mask_w.clone();
},
#[cfg(feature = "async")]
Priority::CreationAsync(i) => {
self.async_tasks[i].resource_mask_r = mask_r | mask_w;
self.async_tasks[i].resource_mask_w = mask_w.clone();
},
#[cfg(feature = "debug_parameters")]
Priority::Debug(i) => {
self.systems_debug[i].resource_mask_r = mask_r | mask_w;
self.systems_debug[i].resource_mask_w = mask_w.clone();
},
Priority::Barrier(_) => ()
}
}
pub fn predecessors_cache(&mut self, inverse_trait_index: &HashMap<TypeId, TypeId>) -> PredecessorsCache{
if self.needs_dependencies_refresh {
self.calculate_successors();
}
if self.needs_predecessors_refresh{
self.calculate_predecessors()
}
let mut predecessors_index: HashMap<Priority, SystemMask> = HashMap::default();
let mut predecessors_next_mask = mask::one();
let mut resources_index: HashMap<TypeId, ResourceMask> = HashMap::default();
let mut resources_next_mask = resource_mask::one();
let mut total_resources = 0;
let mut total_systems = 0;
let mut once_systems_mask = mask::zero();
for priority in self.graph_nodes(){
total_systems += 1;
if total_systems > MAX_SYSTEMS{
panic!("Trying to register more than {} systems, please use the corresponding max_systems_* feature in your cargo dependency", MAX_SYSTEMS);
}
let next = predecessors_next_mask.clone();
predecessors_next_mask *= SystemMask::from(2u8);
predecessors_index.insert(priority, next.clone());
if let Priority::SendOnce(_) | Priority::ThreadLocalOnce(_) | Priority::CreationOnce(_) = priority {
once_systems_mask |= next;
}
};
let storage = self.graph_nodes().map(|priority| {
let mask = predecessors_index[&priority].clone();
let predecessors = self.system_predecessors(priority).iter()
.map(|predecessor| predecessors_index[predecessor].clone())
.fold(mask::zero(), |acc, mask| acc | mask);
let writes = self.system_updates(priority).iter()
.chain(self.system_writes(priority).iter())
.chain(self.system_updates(priority).iter().filter_map(|id| inverse_trait_index.get(id)))
.chain(self.system_writes(priority).iter().filter_map(|id| inverse_trait_index.get(id)))
.map(|update| resources_index.entry(*update).or_insert_with(|| {
let next = resources_next_mask.clone();
resources_next_mask *= ResourceMask::from(2u8);
total_resources += 1;
if total_resources > MAX_COMPONENTS{
panic!("Trying to register more than {} resources through system dependencies, please use the corresponding max_components_* feature in your cargo dependency", MAX_COMPONENTS);
}
next
}).clone()).fold(resource_mask::zero(), |acc, mask| acc | mask);
let mut write_bits = self.system_updates(priority).iter()
.chain(self.system_writes(priority).iter())
.chain(self.system_updates(priority).iter().filter_map(|id| inverse_trait_index.get(id)))
.chain(self.system_writes(priority).iter().filter_map(|id| inverse_trait_index.get(id)))
.map(|update| resources_index[update].trailing_zeros() as usize)
.collect::<Vec<_>>();
write_bits.sort_unstable();
let reads = self.system_needs(priority).iter()
.chain(self.system_reads(priority).iter())
.chain(self.system_needs(priority).iter().filter_map(|id| inverse_trait_index.get(id)))
.chain(self.system_reads(priority).iter().filter_map(|id| inverse_trait_index.get(id)))
.map(|need| resources_index.entry(*need).or_insert_with(|| {
let next = resources_next_mask.clone();
resources_next_mask *= ResourceMask::from(2u8);
total_resources += 1;
if total_resources > MAX_COMPONENTS{
panic!("Trying to register more than 64 resources through system dependencies, please use the corresponding max_components_* feature in your cargo dependency");
}
next
}).clone()).fold(resource_mask::zero(), |acc, mask| acc | mask);
let mut read_bits = self.system_needs(priority).iter()
.chain(self.system_reads(priority).iter())
.chain(self.system_needs(priority).iter().filter_map(|id| inverse_trait_index.get(id)))
.chain(self.system_reads(priority).iter().filter_map(|id| inverse_trait_index.get(id)))
.map(|need| resources_index[need].trailing_zeros() as usize)
.collect::<Vec<_>>();
read_bits.sort_unstable();
let reads_writes = (&reads) | (&writes);
SystemCache{
priority,
mask,
original_predecessors: predecessors.clone(),
predecessors,
writes,
reads,
reads_writes,
write_bits,
read_bits,
run: false,
}
}).collect::<Vec<_>>();
for system in &storage {
self.set_system_masks(
system.priority,
&system.reads,
&system.writes
);
}
PredecessorsCache{
storage,
resources_index: Some(resources_index),
send_writing: resource_mask::zero(),
tl_writing: resource_mask::zero(),
send_reading: vec![0; total_resources],
tl_reading: vec![0; total_resources],
tasks_done: mask::zero(),
exclusive_running: 0,
total_systems_run: 0,
tl_systems_running: 0,
send_systems_running: 0,
waiting_send_systems: false,
once_systems_mask,
}
}
}
struct SystemCache{
priority: Priority,
mask: SystemMask,
predecessors: SystemMask,
writes: ResourceMask,
reads: ResourceMask,
reads_writes: ResourceMask,
write_bits: Vec<usize>,
read_bits: Vec<usize>,
original_predecessors: SystemMask,
run: bool,
}
impl SystemCache{
#[inline]
fn writes(&self) -> &ResourceMask{
&self.writes
}
#[inline]
fn reads_writes(&self) -> &ResourceMask{
&self.reads_writes
}
#[inline]
fn mask(&self) -> &SystemMask{
&self.mask
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
pub struct SystemHandle(usize);
#[derive(Default)]
pub struct PredecessorsCache{
storage: Vec<SystemCache>,
resources_index: Option<HashMap<TypeId, ResourceMask>>,
send_writing: ResourceMask,
tl_writing: ResourceMask,
send_reading: Vec<u8>,
tl_reading: Vec<u8>,
exclusive_running: usize,
tasks_done: SystemMask,
total_systems_run: usize,
send_systems_running: usize,
tl_systems_running: usize,
waiting_send_systems: bool,
once_systems_mask: SystemMask,
}
impl PredecessorsCache{
#[inline]
pub fn all_done(&self) -> bool{
self.total_systems_run == self.storage.len()
}
#[inline]
fn send_writing(&self) -> &ResourceMask{
&self.send_writing
}
#[inline]
fn tl_writing(&self) -> &ResourceMask{
&self.tl_writing
}
pub fn done(&mut self, system: SystemHandle){
let system = &self.storage[system.0];
self.tasks_done |= system.mask();
match system.priority{
Priority::Send(_) | Priority::SendOnce(_) => {
self.send_systems_running -= 1;
self.send_writing &= !system.writes();
for bit in system.read_bits.iter(){
unsafe{ *self.send_reading.get_unchecked_mut(*bit) -= 1 };
}
}
Priority::ThreadLocal(_) | Priority::ThreadLocalOnce(_) => {
self.tl_systems_running -= 1;
self.tl_writing &= !system.writes();
for bit in system.read_bits.iter(){
unsafe{ *self.tl_reading.get_unchecked_mut(*bit) -= 1 };
}
}
Priority::Creation(_) | Priority::CreationOnce(_) => self.exclusive_running -= 1,
#[cfg(feature="async")]
Priority::CreationAsync(_) => self.exclusive_running -= 1,
#[cfg(feature="debug_parameters")]
Priority::Debug(_) => self.exclusive_running -= 1,
Priority::Barrier(_) => (),
}
}
pub fn reset(&mut self) {
if self.once_systems_mask != mask::zero() {
let once_systems_mask = mem::replace(&mut self.once_systems_mask, mask::zero());
self.storage.retain_mut(|system| {
if let Priority::SendOnce(_) | Priority::ThreadLocalOnce(_) | Priority::CreationOnce(_) = system.priority {
false
}else{
system.original_predecessors &= !once_systems_mask.clone();
true
}
});
}
for system in self.storage.iter_mut(){
system.predecessors = system.original_predecessors.clone();
system.run = false;
}
self.send_writing = resource_mask::zero();
self.tl_writing = resource_mask::zero();
let num_reads = self.send_reading.len();
self.send_reading = vec![0; num_reads];
self.tl_reading = vec![0; num_reads];
self.exclusive_running = 0;
self.tasks_done = mask::zero();
self.total_systems_run = 0;
self.send_systems_running = 0;
}
pub fn systems_running(&self) -> usize{
self.send_systems_running + self.tl_systems_running + self.exclusive_running
}
pub fn send_systems_running(&self) -> usize{
self.send_systems_running
}
pub fn iter(&mut self) -> PredecessorsIter{
PredecessorsIter{
total_writing: self.send_writing() | self.tl_writing(),
iter: self.storage.iter_mut().enumerate(),
tasks_done: &mut self.tasks_done,
waiting_send_systems: &mut self.waiting_send_systems,
exclusive_running: &mut self.exclusive_running,
send_reading: &mut self.send_reading,
tl_reading: &mut self.tl_reading,
send_writing: &mut self.send_writing,
tl_writing: &mut self.tl_writing,
total_systems_run: &mut self.total_systems_run,
send_systems_running: &mut self.send_systems_running,
tl_systems_running: &mut self.tl_systems_running,
}
}
pub fn take_resources_index(&mut self) -> HashMap<TypeId, ResourceMask> {
self.resources_index.take().unwrap()
}
pub fn system_priority(&self, system: SystemHandle) -> Priority{
self.storage[system.0].priority
}
}
pub struct PredecessorsIter<'a>{
iter: iter::Enumerate<slice::IterMut<'a, SystemCache>>,
total_writing: ResourceMask,
tasks_done: &'a mut SystemMask,
waiting_send_systems: &'a mut bool,
exclusive_running: &'a mut usize,
send_reading: &'a mut [u8],
tl_reading: &'a mut [u8],
send_writing: &'a mut ResourceMask,
tl_writing: &'a mut ResourceMask,
total_systems_run: &'a mut usize,
send_systems_running: &'a mut usize,
tl_systems_running: &'a mut usize,
}
impl<'a> PredecessorsIter<'a>{
#[inline]
fn send_writing(&self) -> &ResourceMask{
self.send_writing
}
#[inline]
fn tl_writing(&self) -> &ResourceMask{
self.tl_writing
}
#[inline]
fn tasks_done(&self) -> &SystemMask{
self.tasks_done
}
}
impl<'a> Iterator for PredecessorsIter<'a>{
type Item = (Priority, SystemHandle);
fn next(&mut self) -> Option<(Priority, SystemHandle)>{
while let Some((index, system)) = self.iter.next() {
if system.run {
continue
}
system.predecessors &= !self.tasks_done();
if system.predecessors != mask::zero() {
continue;
}
match system.priority {
Priority::Send(_) | Priority::SendOnce(_) => {
if *self.waiting_send_systems || *self.exclusive_running > 0{
continue;
}
let mut clashes = (system.reads_writes() & (&self.total_writing)) != resource_mask::zero();
clashes = clashes || system.write_bits.iter()
.any(|bit| unsafe{
*self.send_reading.get_unchecked(*bit) > 0
|| *self.tl_reading.get_unchecked(*bit) > 0
});
if !clashes {
*self.send_writing |= system.writes();
self.total_writing = self.send_writing() | self.tl_writing();
for bit in system.read_bits.iter(){
unsafe{ *self.send_reading.get_unchecked_mut(*bit) += 1 };
}
system.run = true;
*self.total_systems_run += 1;
*self.send_systems_running += 1;
return Some((system.priority, SystemHandle(index)));
}
}
Priority::ThreadLocal(_) | Priority::ThreadLocalOnce(_) => {
if *self.waiting_send_systems || *self.exclusive_running > 0{
continue;
}
let mut clashes = (system.reads_writes() & self.send_writing()) != resource_mask::zero();
clashes = clashes || system.write_bits.iter()
.any(|bit| unsafe{
*self.send_reading.get_unchecked(*bit) > 0
});
if !clashes {
*self.tl_writing |= system.writes();
self.total_writing = self.send_writing() | self.tl_writing();
for bit in system.read_bits.iter(){
unsafe{ *self.tl_reading.get_unchecked_mut(*bit) += 1 };
}
system.run = true;
*self.total_systems_run += 1;
*self.tl_systems_running += 1;
return Some((system.priority, SystemHandle(index)));
}
}
Priority::Creation(_) | Priority::CreationOnce(_)=> {
if !*self.waiting_send_systems && *self.send_systems_running > 0 {
*self.waiting_send_systems = true;
}else if *self.send_systems_running == 0 {
system.run = true;
*self.exclusive_running += 1;
*self.total_systems_run += 1;
*self.waiting_send_systems = false;
return Some((system.priority, SystemHandle(index)));
}
}
#[cfg(feature="async")]
Priority::CreationAsync(_) => {
if !*self.waiting_send_systems && *self.send_systems_running > 0 {
*self.waiting_send_systems = true;
}else if *self.send_systems_running == 0 {
system.run = true;
*self.exclusive_running += 1;
*self.total_systems_run += 1;
*self.waiting_send_systems = false;
return Some((system.priority, SystemHandle(index)));
}
}
#[cfg(feature="debug_parameters")]
Priority::Debug(_) => {
if !*self.waiting_send_systems && *self.send_systems_running > 0 {
*self.waiting_send_systems = true;
}else if *self.send_systems_running == 0 {
system.run = true;
*self.exclusive_running += 1;
*self.total_systems_run += 1;
*self.waiting_send_systems = false;
return Some((system.priority, SystemHandle(index)));
}
}
Priority::Barrier(_) => {
system.run = true;
*self.total_systems_run += 1;
return Some((system.priority, SystemHandle(index)));
}
}
}
*self.tasks_done = mask::zero();
None
}
}
use std::borrow::Cow;
impl<'a> dot::Labeller<'a, Priority, (Priority, Priority)> for Systems{
fn graph_id(&'a self) -> dot::Id<'a> { dot::Id::new("systems").unwrap() }
fn node_id(&'a self, n: &Priority) -> dot::Id<'a> {
dot::Id::new(n.graph_id()).unwrap()
}
fn node_label<'b>(&'b self, n: &Priority) -> dot::LabelText<'b> {
dot::LabelText::LabelStr(self.system_name(*n).into())
}
}
impl<'a> dot::GraphWalk<'a, Priority, (Priority, Priority)> for Systems {
fn nodes(&self) -> dot::Nodes<'a, Priority> {
Cow::Owned(self.graph_nodes().collect::<Vec<_>>())
}
fn edges(&'a self) -> dot::Edges<'a, (Priority, Priority)> {
let edges_send = self.systems.iter().enumerate().flat_map(|(i, system)|{
system.successors.iter().map(move |succ| (Priority::Send(i), *succ))
});
let edges_send_once = self.systems_once.iter().enumerate().flat_map(|(i, system)|{
system.successors.iter().map(move |succ| (Priority::SendOnce(i), *succ))
});
let edges_tl = self.systems_thread_local.iter().enumerate().flat_map(|(i, system)|{
system.successors.iter().map(move |succ| (Priority::ThreadLocal(i), *succ))
});
let edges_tl_once = self.systems_once_thread_local.iter().enumerate().flat_map(|(i, system)|{
system.successors.iter().map(move |succ| (Priority::ThreadLocalOnce(i), *succ))
});
let edges_creation = self.systems_creation.iter().enumerate().flat_map(|(i, system)|{
system.successors.iter().map(move |succ| (Priority::Creation(i), *succ))
});
let edges_creation_once = self.systems_creation_once.iter().enumerate().flat_map(|(i, system)|{
system.successors.iter().map(move |succ| (Priority::CreationOnce(i), *succ))
});
#[cfg(feature = "async")]
let edges_creation_async = self.async_tasks.iter().flat_map(|(i, system)|{
system.successors.iter().map(move |succ| (Priority::Creation(i), *succ))
});
#[cfg(not(feature = "async"))]
let edges_creation_async = None;
#[cfg(feature = "debug_parameters")]
let edges_debug = self.systems_debug.iter().enumerate().flat_map(|(i, system)|{
system.successors.iter().map(move |succ| (Priority::Debug(i), *succ))
});
#[cfg(not(feature = "debug_parameters"))]
let edges_debug = None;
let edges_barriers = self.barriers.iter().enumerate().flat_map(|(i,barrier)| {
barrier.successors.iter().map(move |succ| (Priority::Barrier(i), *succ))
});
Cow::Owned(edges_send
.chain(edges_send_once)
.chain(edges_tl)
.chain(edges_tl_once)
.chain(edges_creation)
.chain(edges_creation_once)
.chain(edges_creation_async)
.chain(edges_debug)
.chain(edges_barriers)
.collect())
}
fn source(&self, e: &(Priority, Priority)) -> Priority { e.0 }
fn target(&self, e: &(Priority, Priority)) -> Priority { e.1 }
}
#[derive(Clone)]
pub struct GroupSystem{
group: usize,
system: usize,
}