Line data Source code
1 : use std::{num::NonZeroUsize, sync::Arc};
2 :
3 : use crate::tenant::ephemeral_file;
4 :
5 0 : #[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize)]
6 : #[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
7 : pub enum L0FlushConfig {
8 : PageCached,
9 : #[serde(rename_all = "snake_case")]
10 : Direct {
11 : max_concurrency: NonZeroUsize,
12 : },
13 : }
14 :
15 : impl Default for L0FlushConfig {
16 376 : fn default() -> Self {
17 376 : Self::Direct {
18 376 : // TODO: using num_cpus results in different peak memory usage on different instance types.
19 376 : max_concurrency: NonZeroUsize::new(usize::max(1, num_cpus::get())).unwrap(),
20 376 : }
21 376 : }
22 : }
23 :
24 : #[derive(Clone)]
25 : pub struct L0FlushGlobalState(Arc<Inner>);
26 :
27 : pub(crate) enum Inner {
28 : PageCached,
29 : Direct { semaphore: tokio::sync::Semaphore },
30 : }
31 :
32 : impl L0FlushGlobalState {
33 174 : pub fn new(config: L0FlushConfig) -> Self {
34 174 : match config {
35 0 : L0FlushConfig::PageCached => Self(Arc::new(Inner::PageCached)),
36 174 : L0FlushConfig::Direct { max_concurrency } => {
37 174 : let semaphore = tokio::sync::Semaphore::new(max_concurrency.get());
38 174 : Self(Arc::new(Inner::Direct { semaphore }))
39 : }
40 : }
41 174 : }
42 :
43 1126 : pub(crate) fn inner(&self) -> &Arc<Inner> {
44 1126 : &self.0
45 1126 : }
46 : }
47 :
48 : impl L0FlushConfig {
49 1258 : pub(crate) fn prewarm_on_write(&self) -> ephemeral_file::PrewarmPageCacheOnWrite {
50 1258 : use L0FlushConfig::*;
51 1258 : match self {
52 0 : PageCached => ephemeral_file::PrewarmPageCacheOnWrite::Yes,
53 1258 : Direct { .. } => ephemeral_file::PrewarmPageCacheOnWrite::No,
54 : }
55 1258 : }
56 : }
|