use crossbeam_channel::{self, Receiver, Sender};
use std::collections::VecDeque;
use std::env;
use std::fs::File;
use std::io::{self, BufWriter, Write};
pub(super) const LOG_ENABLED: bool = cfg!(any(rayon_rs_log, debug_assertions));
#[derive(Copy, Clone, PartialOrd, Ord, PartialEq, Eq, Debug)]
pub(super) enum Event {
Flush,
ThreadStart {
worker: usize,
terminate_addr: usize,
},
ThreadTerminate { worker: usize },
ThreadIdle { worker: usize, latch_addr: usize },
ThreadFoundWork { worker: usize, yields: u32 },
ThreadSawLatchSet { worker: usize, latch_addr: usize },
ThreadSleepy { worker: usize, jobs_counter: usize },
ThreadSleepInterruptedByLatch { worker: usize, latch_addr: usize },
ThreadSleepInterruptedByJob { worker: usize },
ThreadSleeping { worker: usize, latch_addr: usize },
ThreadAwoken { worker: usize, latch_addr: usize },
ThreadNotify { worker: usize },
JobPushed { worker: usize },
JobPopped { worker: usize },
JobStolen { worker: usize, victim: usize },
JobsInjected { count: usize },
JobUninjected { worker: usize },
JobThreadCounts {
worker: usize,
num_idle: u16,
num_sleepers: u16,
},
}
#[derive(Clone)]
pub(super) struct Logger {
sender: Option<Sender<Event>>,
}
impl Logger {
pub(super) fn new(num_workers: usize) -> Logger {
if !LOG_ENABLED {
return Self::disabled();
}
let env_log = match env::var("RAYON_LOG") {
Ok(s) => s,
Err(_) => return Self::disabled(),
};
let (sender, receiver) = crossbeam_channel::unbounded();
if env_log.starts_with("tail:") {
let filename = env_log["tail:".len()..].to_string();
::std::thread::spawn(move || {
Self::tail_logger_thread(num_workers, filename, 10_000, receiver)
});
} else if env_log == "all" {
::std::thread::spawn(move || Self::all_logger_thread(num_workers, receiver));
} else if env_log.starts_with("profile:") {
let filename = env_log["profile:".len()..].to_string();
::std::thread::spawn(move || {
Self::profile_logger_thread(num_workers, filename, 10_000, receiver)
});
} else {
panic!("RAYON_LOG should be 'tail:<file>' or 'profile:<file>'");
}
return Logger {
sender: Some(sender),
};
}
fn disabled() -> Logger {
Logger { sender: None }
}
#[inline]
pub(super) fn log(&self, event: impl FnOnce() -> Event) {
if !LOG_ENABLED {
return;
}
if let Some(sender) = &self.sender {
sender.send(event()).unwrap();
}
}
fn profile_logger_thread(
num_workers: usize,
log_filename: String,
capacity: usize,
receiver: Receiver<Event>,
) {
let file = File::create(&log_filename)
.unwrap_or_else(|err| panic!("failed to open `{}`: {}", log_filename, err));
let mut writer = BufWriter::new(file);
let mut events = Vec::with_capacity(capacity);
let mut state = SimulatorState::new(num_workers);
let timeout = std::time::Duration::from_secs(30);
loop {
loop {
match receiver.recv_timeout(timeout) {
Ok(event) => {
if let Event::Flush = event {
break;
} else {
events.push(event);
}
}
Err(_) => break,
}
if events.len() == capacity {
break;
}
}
for event in events.drain(..) {
if state.simulate(&event) {
state.dump(&mut writer, &event).unwrap();
}
}
writer.flush().unwrap();
}
}
fn tail_logger_thread(
num_workers: usize,
log_filename: String,
capacity: usize,
receiver: Receiver<Event>,
) {
let file = File::create(&log_filename)
.unwrap_or_else(|err| panic!("failed to open `{}`: {}", log_filename, err));
let mut writer = BufWriter::new(file);
let mut events: VecDeque<Event> = VecDeque::with_capacity(capacity);
let mut state = SimulatorState::new(num_workers);
let timeout = std::time::Duration::from_secs(30);
let mut skipped = false;
loop {
loop {
match receiver.recv_timeout(timeout) {
Ok(event) => {
if let Event::Flush = event {
continue;
} else {
if events.len() == capacity {
let event = events.pop_front().unwrap();
state.simulate(&event);
skipped = true;
}
events.push_back(event);
}
}
Err(_) => break,
}
}
if skipped {
write!(writer, "...\n").unwrap();
skipped = false;
}
for event in events.drain(..) {
state.simulate(&event);
state.dump(&mut writer, &event).unwrap();
}
writer.flush().unwrap();
}
}
fn all_logger_thread(num_workers: usize, receiver: Receiver<Event>) {
let stderr = std::io::stderr();
let mut state = SimulatorState::new(num_workers);
for event in receiver {
let mut writer = BufWriter::new(stderr.lock());
state.simulate(&event);
state.dump(&mut writer, &event).unwrap();
writer.flush().unwrap();
}
}
}
#[derive(Copy, Clone, PartialOrd, Ord, PartialEq, Eq, Debug)]
enum State {
Working,
Idle,
Notified,
Sleeping,
Terminated,
}
impl State {
fn letter(&self) -> char {
match self {
State::Working => 'W',
State::Idle => 'I',
State::Notified => 'N',
State::Sleeping => 'S',
State::Terminated => 'T',
}
}
}
struct SimulatorState {
local_queue_size: Vec<usize>,
thread_states: Vec<State>,
injector_size: usize,
}
impl SimulatorState {
fn new(num_workers: usize) -> Self {
Self {
local_queue_size: (0..num_workers).map(|_| 0).collect(),
thread_states: (0..num_workers).map(|_| State::Working).collect(),
injector_size: 0,
}
}
fn simulate(&mut self, event: &Event) -> bool {
match *event {
Event::ThreadIdle { worker, .. } => {
assert_eq!(self.thread_states[worker], State::Working);
self.thread_states[worker] = State::Idle;
true
}
Event::ThreadStart { worker, .. } | Event::ThreadFoundWork { worker, .. } => {
self.thread_states[worker] = State::Working;
true
}
Event::ThreadTerminate { worker, .. } => {
self.thread_states[worker] = State::Terminated;
true
}
Event::ThreadSleeping { worker, .. } => {
assert_eq!(self.thread_states[worker], State::Idle);
self.thread_states[worker] = State::Sleeping;
true
}
Event::ThreadAwoken { worker, .. } => {
assert_eq!(self.thread_states[worker], State::Notified);
self.thread_states[worker] = State::Idle;
true
}
Event::JobPushed { worker } => {
self.local_queue_size[worker] += 1;
true
}
Event::JobPopped { worker } => {
self.local_queue_size[worker] -= 1;
true
}
Event::JobStolen { victim, .. } => {
self.local_queue_size[victim] -= 1;
true
}
Event::JobsInjected { count } => {
self.injector_size += count;
true
}
Event::JobUninjected { .. } => {
self.injector_size -= 1;
true
}
Event::ThreadNotify { worker } => {
assert_eq!(self.thread_states[worker], State::Sleeping);
self.thread_states[worker] = State::Notified;
true
}
_ => false,
}
}
fn dump(&mut self, w: &mut impl Write, event: &Event) -> io::Result<()> {
let num_idle_threads = self
.thread_states
.iter()
.filter(|s| **s == State::Idle)
.count();
let num_sleeping_threads = self
.thread_states
.iter()
.filter(|s| **s == State::Sleeping)
.count();
let num_notified_threads = self
.thread_states
.iter()
.filter(|s| **s == State::Notified)
.count();
let num_pending_jobs: usize = self.local_queue_size.iter().sum();
write!(w, "{:2},", num_idle_threads)?;
write!(w, "{:2},", num_sleeping_threads)?;
write!(w, "{:2},", num_notified_threads)?;
write!(w, "{:4},", num_pending_jobs)?;
write!(w, "{:4},", self.injector_size)?;
let event_str = format!("{:?}", event);
write!(w, r#""{:60}","#, event_str)?;
for ((i, state), queue_size) in (0..).zip(&self.thread_states).zip(&self.local_queue_size) {
write!(w, " T{:02},{}", i, state.letter(),)?;
if *queue_size > 0 {
write!(w, ",{:03},", queue_size)?;
} else {
write!(w, ", ,")?;
}
}
write!(w, "\n")?;
Ok(())
}
}