Line data Source code
1 : use tokio_util::sync::CancellationToken;
2 :
3 0 : #[derive(thiserror::Error, Debug)]
4 : pub enum YieldingLoopError {
5 : #[error("Cancelled")]
6 : Cancelled,
7 : }
8 :
9 : /// Helper for long synchronous loops, e.g. over all tenants in the system.
10 : ///
11 : /// Periodically yields to avoid blocking the executor, and after resuming
12 : /// checks the provided cancellation token to drop out promptly on shutdown.
13 : #[inline(always)]
14 0 : pub async fn yielding_loop<I, T, F>(
15 0 : interval: usize,
16 0 : cancel: &CancellationToken,
17 0 : iter: I,
18 0 : mut visitor: F,
19 0 : ) -> Result<(), YieldingLoopError>
20 0 : where
21 0 : I: Iterator<Item = T>,
22 0 : F: FnMut(T),
23 0 : {
24 0 : for (i, item) in iter.enumerate() {
25 0 : visitor(item);
26 0 :
27 0 : if (i + 1) % interval == 0 {
28 0 : tokio::task::yield_now().await;
29 0 : if cancel.is_cancelled() {
30 0 : return Err(YieldingLoopError::Cancelled);
31 0 : }
32 0 : }
33 : }
34 :
35 0 : Ok(())
36 0 : }
|