Line data Source code
1 : use tokio_util::sync::CancellationToken;
2 :
3 0 : #[derive(thiserror::Error, Debug)]
4 : pub enum YieldingLoopError {
5 : #[error("Cancelled")]
6 : Cancelled,
7 : }
8 :
9 : /// Helper for long synchronous loops, e.g. over all tenants in the system. Periodically
10 : /// yields to avoid blocking the executor, and after resuming checks the provided
11 : /// cancellation token to drop out promptly on shutdown.
12 : #[inline(always)]
13 3594 : pub async fn yielding_loop<I, T, F>(
14 3594 : interval: usize,
15 3594 : cancel: &CancellationToken,
16 3594 : iter: I,
17 3594 : mut visitor: F,
18 3594 : ) -> Result<(), YieldingLoopError>
19 3594 : where
20 3594 : I: Iterator<Item = T>,
21 3594 : F: FnMut(T),
22 3594 : {
23 3594 : for (i, item) in iter.enumerate() {
24 922 : visitor(item);
25 922 :
26 922 : if i + 1 % interval == 0 {
27 0 : tokio::task::yield_now().await;
28 0 : if cancel.is_cancelled() {
29 0 : return Err(YieldingLoopError::Cancelled);
30 0 : }
31 922 : }
32 : }
33 :
34 3594 : Ok(())
35 3594 : }
|