Struct rusty_pool::ThreadPool [−][src]
pub struct ThreadPool { /* fields omitted */ }
Self growing / shrinking ThreadPool
implementation based on crossbeam’s
multi-producer multi-consumer channels that enables awaiting the result of a
task and offers async support.
This ThreadPool
has two different pool sizes; a core pool size filled with
threads that live for as long as the channel and a max pool size which describes
the maximum amount of worker threads that may live at the same time.
Those additional non-core threads have a specific keep_alive time described when
creating the ThreadPool
that defines how long such threads may be idle for
without receiving any work before giving up and terminating their work loop.
This ThreadPool
does not spawn any threads until a task is submitted to it.
Then it will create a new thread for each task until the core pool size is full.
After that a new thread will only be created upon an execute()
call if the
current pool is lower than the max pool size and there are no idle threads.
Functions like evaluate()
and complete()
return a JoinHandle
that may be used
to await the result of a submitted task or future. JoinHandles may be sent to the
thread pool to create a task that blocks a worker thread until it receives the
result of the other task and then operates on the result. If the task panics the
JoinHandle
receives a cancellation error. This is implemented using a futures
oneshot channel to communicate with the worker thread.
This ThreadPool
may be used as a futures executor if the “async” feature is enabled,
which is the case by default. The “async” feature includes the spawn()
and
try_spawn()
functions which create a task that polls the future one by one and
creates a waker that re-submits the future to the pool when it can make progress.
Without the “async” feature, futures can simply be executed to completion using
the complete
function, which simply blocks a worker thread until the future has
been polled to completion.
The “async” feature can be disabled if not need by adding the following to your Cargo dependency:
[dependencies.rusty_pool]
default-features = false
version = "*"
When creating a new worker this ThreadPool
always re-checks whether the new worker
is still required before spawning a thread and passing it the submitted task in case
an idle thread has opened up in the meantime or another thread has already created
the worker. If the re-check failed for a core worker the pool will try creating a
new non-core worker before deciding no new worker is needed. Panicking workers are
always cloned and replaced.
Locks are only used for the join functions to lock the Condvar
, apart from that
this ThreadPool
implementation fully relies on crossbeam and atomic operations.
This ThreadPool
decides whether it is currently idle (and should fast-return
join attempts) by comparing the total worker count to the idle worker count, which
are two u32
values stored in one AtomicU64
making sure that if both are updated
they may be updated in a single atomic operation.
The thread pool and its crossbeam channel can be destroyed by using the shutdown function, however that does not stop tasks that are already running but will terminate the thread the next time it will try to fetch work from the channel.
Usage
Create a new ThreadPool
:
use rusty_pool::Builder; use rusty_pool::ThreadPool; // Create default `ThreadPool` configuration with the number of CPUs as core pool size let pool = ThreadPool::default(); // Create a `ThreadPool` with default naming: use std::time::Duration; let pool2 = ThreadPool::new(5, 50, Duration::from_secs(60)); // Create a `ThreadPool` with a custom name: let pool3 = ThreadPool::new_named(String::from("my_pool"), 5, 50, Duration::from_secs(60)); // using the Builder struct: let pool4 = Builder::new().core_size(5).max_size(50).build();
Submit a closure for execution in the ThreadPool
:
use rusty_pool::ThreadPool; use std::thread; use std::time::Duration; let pool = ThreadPool::default(); pool.execute(|| { thread::sleep(Duration::from_secs(5)); print!("hello"); });
Submit a task and await the result:
use rusty_pool::ThreadPool; use std::thread; use std::time::Duration; let pool = ThreadPool::default(); let handle = pool.evaluate(|| { thread::sleep(Duration::from_secs(5)); return 4; }); let result = handle.await_complete(); assert_eq!(result, 4);
Spawn futures using the ThreadPool
:
async fn some_async_fn(x: i32, y: i32) -> i32 { x + y } async fn other_async_fn(x: i32, y: i32) -> i32 { x - y } use rusty_pool::ThreadPool; let pool = ThreadPool::default(); // simply complete future by blocking a worker until the future has been completed let handle = pool.complete(async { let a = some_async_fn(4, 6).await; // 10 let b = some_async_fn(a, 3).await; // 13 let c = other_async_fn(b, a).await; // 3 some_async_fn(c, 5).await // 8 }); assert_eq!(handle.await_complete(), 8); use std::sync::{Arc, atomic::{AtomicI32, Ordering}}; // spawn future and create waker that automatically re-submits itself to the threadpool if ready to make progress, this requires the "async" feature which is enabled by default let count = Arc::new(AtomicI32::new(0)); let clone = count.clone(); pool.spawn(async move { let a = some_async_fn(3, 6).await; // 9 let b = other_async_fn(a, 4).await; // 5 let c = some_async_fn(b, 7).await; // 12 clone.fetch_add(c, Ordering::SeqCst); }); pool.join(); assert_eq!(count.load(Ordering::SeqCst), 12);
Join and shut down the ThreadPool
:
use std::thread; use std::time::Duration; use rusty_pool::ThreadPool; use std::sync::{Arc, atomic::{AtomicI32, Ordering}}; let pool = ThreadPool::default(); for _ in 0..10 { pool.execute(|| { thread::sleep(Duration::from_secs(10)) }) } // wait for all threads to become idle, i.e. all tasks to be completed including tasks added by other threads after join() is called by this thread or for the timeout to be reached pool.join_timeout(Duration::from_secs(5)); let count = Arc::new(AtomicI32::new(0)); for _ in 0..15 { let clone = count.clone(); pool.execute(move || { thread::sleep(Duration::from_secs(5)); clone.fetch_add(1, Ordering::SeqCst); }); } // shut down and drop the only instance of this `ThreadPool` (no clones) causing the channel to be broken leading all workers to exit after completing their current work // and wait for all workers to become idle, i.e. finish their work. pool.shutdown_join(); assert_eq!(count.load(Ordering::SeqCst), 15);
Implementations
impl ThreadPool
[src]
impl ThreadPool
[src]pub fn new(core_size: u32, max_size: u32, keep_alive: Duration) -> Self
[src]
Construct a new ThreadPool
with the specified core pool size, max pool size
and keep_alive time for non-core threads. This function does not spawn any
threads. This ThreadPool
will receive a default name in the following format:
“rusty_pool_” + pool number.
core_size
specifies the amount of threads to keep alive for as long as
the ThreadPool
exists and its channel remains connected.
max_size
specifies the maximum number of worker threads that may exist
at the same time.
keep_alive
specifies the duration for which to keep non-core pool
worker threads alive while they do not receive any work.
Panics
This function will panic if max_size is 0 or lower than core_size.
pub fn new_named(
name: String,
core_size: u32,
max_size: u32,
keep_alive: Duration
) -> Self
[src]
name: String,
core_size: u32,
max_size: u32,
keep_alive: Duration
) -> Self
Construct a new ThreadPool
with the specified name, core pool size, max pool size
and keep_alive time for non-core threads. This function does not spawn any
threads.
name
the name of the ThreadPool
that will be used as prefix for each
thread.
core_size
specifies the amount of threads to keep alive for as long as
the ThreadPool
exists and its channel remains connected.
max_size
specifies the maximum number of worker threads that may exist
at the same time.
keep_alive
specifies the duration for which to keep non-core pool
worker threads alive while they do not receive any work.
Panics
This function will panic if max_size is 0 or lower than core_size.
pub fn get_current_worker_count(&self) -> u32
[src]
Get the number of live workers, includes all workers waiting for work or executing tasks.
This counter is incremented when creating a new worker even before re-checking whether the worker is still needed. Once the worker count is updated the previous value returned by the atomic operation is analysed to check whether it still represents a value that would require a new worker. If that is not the case this counter will be decremented and the worker will never spawn a thread and start its working loop. Else this counter is decremented when a worker reaches the end of its working loop, which for non-core threads might happen if it does not receive any work during its keep alive time, for core threads this only happens once the channel is disconnected.
pub fn get_idle_worker_count(&self) -> u32
[src]
Get the number of workers currently waiting for work. Those threads are currently
polling from the crossbeam receiver. Core threads wait indefinitely and might remain
in this state until the ThreadPool
is dropped. The remaining threads give up after
waiting for the specified keep_alive time.
pub fn execute<T: Task<()> + 'static>(&self, task: T)
[src]
Send a new task to the worker threads. This function is responsible for sending the message through the channel and creating new workers if needed. If the current worker count is lower than the core pool size this function will always create a new worker. If the current worker count is equal to or greater than the core pool size this function only creates a new worker if the worker count is below the max pool size and there are no idle threads.
After constructing the worker but before spawning its thread this function checks again whether the new worker is still needed by analysing the old value returned by atomically incrementing the worker counter and checking if the worker is still needed or if another thread has already created it, for non-core threads this additionally checks whether there are idle threads now. When the recheck condition still applies the new worker will receive the task directly as first task and start executing. If trying to create a new core worker failed the next step is to try creating a non-core worker instead. When all checks still fail the task will simply be sent to the main channel instead.
Panics
This function might panic if try_execute
returns an error when the crossbeam channel has been
closed unexpectedly.
This should never occur under normal circumstances using safe code, as shutting down the ThreadPool
consumes ownership and the crossbeam channel is never dropped unless dropping the ThreadPool
.
pub fn try_execute<T: Task<()> + 'static>(
&self,
task: T
) -> Result<(), SendError<Box<dyn FnOnce() + Send + 'static>>>
[src]
&self,
task: T
) -> Result<(), SendError<Box<dyn FnOnce() + Send + 'static>>>
Send a new task to the worker threads. This function is responsible for sending the message through the channel and creating new workers if needed. If the current worker count is lower than the core pool size this function will always create a new worker. If the current worker count is equal to or greater than the core pool size this function only creates a new worker if the worker count is below the max pool size and there are no idle threads.
After constructing the worker but before spawning its thread this function checks again whether the new worker is still needed by analysing the old value returned by atomically incrementing the worker counter and checking if the worker is still needed or if another thread has already created it, for non-core threads this additionally checks whether there are idle threads now. When the recheck condition still applies the new worker will receive the task directly as first task and start executing. If trying to create a new core worker failed the next step is to try creating a non-core worker instead. When all checks still fail the task will simply be sent to the main channel instead.
Errors
This function might return crossbeam_channel::SendError
if the sender was dropped unexpectedly.
pub fn evaluate<R: Send + 'static, T: Task<R> + 'static>(
&self,
task: T
) -> JoinHandle<R>
[src]
&self,
task: T
) -> JoinHandle<R>
Send a new task to the worker threads and return a JoinHandle
that may be used to await
the result. This function is responsible for sending the message through the channel and creating new
workers if needed. If the current worker count is lower than the core pool size this function will always
create a new worker. If the current worker count is equal to or greater than the core pool size this
function only creates a new worker if the worker count is below the max pool size and there are no idle
threads.
After constructing the worker but before spawning its thread this function checks again whether the new worker is still needed by analysing the old value returned by atomically incrementing the worker counter and checking if the worker is still needed or if another thread has already created it, for non-core threads this additionally checks whether there are idle threads now. When the recheck condition still applies the new worker will receive the task directly as first task and start executing. If trying to create a new core worker failed the next step is to try creating a non-core worker instead. When all checks still fail the task will simply be sent to the main channel instead.
Panics
This function might panic if try_execute
returns an error when the crossbeam channel has been
closed unexpectedly.
This should never occur under normal circumstances using safe code, as shutting down the ThreadPool
consumes ownership and the crossbeam channel is never dropped unless dropping the ThreadPool
.
pub fn try_evaluate<R: Send + 'static, T: Task<R> + 'static>(
&self,
task: T
) -> Result<JoinHandle<R>, SendError<Box<dyn FnOnce() + Send + 'static>>>
[src]
&self,
task: T
) -> Result<JoinHandle<R>, SendError<Box<dyn FnOnce() + Send + 'static>>>
Send a new task to the worker threads and return a JoinHandle
that may be used to await
the result. This function is responsible for sending the message through the channel and creating new
workers if needed. If the current worker count is lower than the core pool size this function will always
create a new worker. If the current worker count is equal to or greater than the core pool size this
function only creates a new worker if the worker count is below the max pool size and there are no idle
threads.
After constructing the worker but before spawning its thread this function checks again whether the new worker is still needed by analysing the old value returned by atomically incrementing the worker counter and checking if the worker is still needed or if another thread has already created it, for non-core threads this additionally checks whether there are idle threads now. When the recheck condition still applies the new worker will receive the task directly as first task and start executing. If trying to create a new core worker failed the next step is to try creating a non-core worker instead. When all checks still fail the task will simply be sent to the main channel instead.
Errors
This function might return crossbeam_channel::SendError
if the sender was dropped unexpectedly.
pub fn complete<R: Send + 'static>(
&self,
future: impl Future<Output = R> + 'static + Send
) -> JoinHandle<R>
[src]
&self,
future: impl Future<Output = R> + 'static + Send
) -> JoinHandle<R>
Send a task to the ThreadPool
that completes the given Future
and return a JoinHandle
that may be used to await the result. This function simply calls evaluate()
with a closure that calls block_on
with the provided future.
Panic
This function panics if the task fails to be sent to the ThreadPool
due to the channel being broken.
pub fn try_complete<R: Send + 'static>(
&self,
future: impl Future<Output = R> + 'static + Send
) -> Result<JoinHandle<R>, SendError<Box<dyn FnOnce() + Send + 'static>>>
[src]
&self,
future: impl Future<Output = R> + 'static + Send
) -> Result<JoinHandle<R>, SendError<Box<dyn FnOnce() + Send + 'static>>>
Send a task to the ThreadPool
that completes the given Future
and return a JoinHandle
that may be used to await the result. This function simply calls try_evaluate()
with a closure that calls block_on
with the provided future.
Errors
This function returns crossbeam_channel::SendError
if the task fails to be sent to the ThreadPool
due to the channel being broken.
pub fn spawn(&self, future: impl Future<Output = ()> + 'static + Send)
[src]
Submit a Future
to be polled by this ThreadPool
. Unlike complete()
this does not
block a worker until the Future
has been completed but polls the Future
once at a time and creates a Waker
that re-submits the Future to this pool when awakened. Since Arc<AsyncTask>
implements the Task
trait this
function simply constructs the AsyncTask
and calls execute()
.
Panic
This function panics if the task fails to be sent to the ThreadPool
due to the channel being broken.
pub fn try_spawn(
&self,
future: impl Future<Output = ()> + 'static + Send
) -> Result<(), SendError<Box<dyn FnOnce() + Send + 'static>>>
[src]
&self,
future: impl Future<Output = ()> + 'static + Send
) -> Result<(), SendError<Box<dyn FnOnce() + Send + 'static>>>
Submit a Future
to be polled by this ThreadPool
. Unlike try_complete()
this does not
block a worker until the Future
has been completed but polls the Future
once at a time and creates a Waker
that re-submits the Future to this pool when awakened. Since Arc<AsyncTask>
implements the Task
trait this
function simply constructs the AsyncTask
and calls try_execute()
.
Errors
This function returns crossbeam_channel::SendError
if the task fails to be sent to the ThreadPool
due to the channel being broken.
pub fn spawn_await<R: Send + 'static>(
&self,
future: impl Future<Output = R> + 'static + Send
) -> JoinHandle<R>
[src]
&self,
future: impl Future<Output = R> + 'static + Send
) -> JoinHandle<R>
Create a top-level Future
that awaits the provided Future
and then sends the result to the
returned JoinHandle
. Unlike complete()
this does not
block a worker until the Future
has been completed but polls the Future
once at a time and creates a Waker
that re-submits the Future to this pool when awakened. Since Arc<AsyncTask>
implements the Task
trait this
function simply constructs the AsyncTask
and calls execute()
.
This enables awaiting the final result outside of an async context like complete()
while still
polling the future lazily instead of eagerly blocking the worker until the future is done.
Panic
This function panics if the task fails to be sent to the ThreadPool
due to the channel being broken.
pub fn try_spawn_await<R: Send + 'static>(
&self,
future: impl Future<Output = R> + 'static + Send
) -> Result<JoinHandle<R>, SendError<Box<dyn FnOnce() + Send + 'static>>>
[src]
&self,
future: impl Future<Output = R> + 'static + Send
) -> Result<JoinHandle<R>, SendError<Box<dyn FnOnce() + Send + 'static>>>
Create a top-level Future
that awaits the provided Future
and then sends the result to the
returned JoinHandle
. Unlike try_complete()
this does not
block a worker until the Future
has been completed but polls the Future
once at a time and creates a Waker
that re-submits the Future to this pool when awakened. Since Arc<AsyncTask>
implements the Task
trait this
function simply constructs the AsyncTask
and calls try_execute()
.
This enables awaiting the final result outside of an async context like complete()
while still
polling the future lazily instead of eagerly blocking the worker until the future is done.
Errors
This function returns crossbeam_channel::SendError
if the task fails to be sent to the ThreadPool
due to the channel being broken.
pub fn join(&self)
[src]
Blocks the current thread until there aren’t any non-idle threads anymore.
This includes work started after calling this function.
This function blocks until the next time this ThreadPool
completes all of its work,
except if all threads are idle and the channel is empty at the time of calling this
function, in which case it will fast-return.
This utilizes a Condvar
that is notified by workers when they complete a job and notice
that the channel is currently empty and it was the last thread to finish the current
generation of work (i.e. when incrementing the idle worker counter brings the value
up to the total worker counter, meaning it’s the last thread to become idle).
pub fn join_timeout(&self, time_out: Duration)
[src]
Blocks the current thread until there aren’t any non-idle threads anymore or until the
specified time_out Duration passes, whichever happens first.
This includes work started after calling this function.
This function blocks until the next time this ThreadPool
completes all of its work,
(or until the time_out is reached) except if all threads are idle and the channel is
empty at the time of calling this function, in which case it will fast-return.
This utilizes a Condvar
that is notified by workers when they complete a job and notice
that the channel is currently empty and it was the last thread to finish the current
generation of work (i.e. when incrementing the idle worker counter brings the value
up to the total worker counter, meaning it’s the last thread to become idle).
pub fn shutdown(self)
[src]
Destroy this ThreadPool
by claiming ownership and dropping the value,
causing the Sender
to drop thus disconnecting the channel.
Threads in this pool that are currently executing a task will finish what
they’re doing until they check the channel, discovering that it has been
disconnected from the sender and thus terminate their work loop.
If other clones of this ThreadPool
exist the sender will remain intact
and tasks submitted to those clones will succeed, this includes pending
AsyncTask
instances as they hold an owned clone of the ThreadPool
to re-submit awakened futures.
pub fn shutdown_join(self)
[src]
Destroy this ThreadPool
by claiming ownership and dropping the value,
causing the Sender
to drop thus disconnecting the channel.
Threads in this pool that are currently executing a task will finish what
they’re doing until they check the channel, discovering that it has been
disconnected from the sender and thus terminate their work loop.
If other clones of this ThreadPool
exist the sender will remain intact
and tasks submitted to those clones will succeed, this includes pending
AsyncTask
instances as they hold an owned clone of the ThreadPool
to re-submit awakened futures.
This function additionally joins all workers after dropping the pool to
wait for all work to finish.
Blocks the current thread until there aren’t any non-idle threads anymore.
This function blocks until this ThreadPool
completes all of its work,
except if all threads are idle and the channel is empty at the time of
calling this function, in which case the join will fast-return.
If other live clones of this ThreadPool
exist this behaves the same as
calling join
on a live ThreadPool
as tasks submitted
to one of the clones will be joined as well.
The join utilizes a Condvar
that is notified by workers when they complete a job and notice
that the channel is currently empty and it was the last thread to finish the current
generation of work (i.e. when incrementing the idle worker counter brings the value
up to the total worker counter, meaning it’s the last thread to become idle).
pub fn shutdown_join_timeout(self, timeout: Duration)
[src]
Destroy this ThreadPool
by claiming ownership and dropping the value,
causing the Sender
to drop thus disconnecting the channel.
Threads in this pool that are currently executing a task will finish what
they’re doing until they check the channel, discovering that it has been
disconnected from the sender and thus terminate their work loop.
If other clones of this ThreadPool
exist the sender will remain intact
and tasks submitted to those clones will succeed, this includes pending
AsyncTask
instances as they hold an owned clone of the ThreadPool
to re-submit awakened futures.
This function additionally joins all workers after dropping the pool to
wait for all work to finish.
Blocks the current thread until there aren’t any non-idle threads anymore or until the
specified time_out Duration passes, whichever happens first.
This function blocks until this ThreadPool
completes all of its work,
(or until the time_out is reached) except if all threads are idle and the channel is
empty at the time of calling this function, in which case the join will fast-return.
If other live clones of this ThreadPool
exist this behaves the same as
calling join
on a live ThreadPool
as tasks submitted
to one of the clones will be joined as well.
The join utilizes a Condvar
that is notified by workers when they complete a job and notice
that the channel is currently empty and it was the last thread to finish the current
generation of work (i.e. when incrementing the idle worker counter brings the value
up to the total worker counter, meaning it’s the last thread to become idle).
pub fn get_name(&self) -> &str
[src]
Return the name of this pool, used as prefix for each worker thread.
Trait Implementations
impl Clone for ThreadPool
[src]
impl Clone for ThreadPool
[src]fn clone(&self) -> ThreadPool
[src]
pub fn clone_from(&mut self, source: &Self)
1.0.0[src]
impl Default for ThreadPool
[src]
impl Default for ThreadPool
[src]