Line data Source code
1 : use std::sync::Arc;
2 :
3 : use crate::rate_limit::RateLimiter;
4 : use crate::safekeeper::{ProposerAcceptorMessage, ProposerElected, SafeKeeper, TermHistory};
5 : use crate::state::{TimelinePersistentState, TimelineState};
6 : use crate::timeline::{get_timeline_dir, SharedState, StateSK, Timeline};
7 : use crate::timelines_set::TimelinesSet;
8 : use crate::wal_backup::remote_timeline_path;
9 : use crate::{control_file, wal_storage, SafeKeeperConf};
10 : use camino_tempfile::Utf8TempDir;
11 : use tokio::fs::create_dir_all;
12 : use utils::id::{NodeId, TenantTimelineId};
13 : use utils::lsn::Lsn;
14 :
15 : /// A Safekeeper testing or benchmarking environment. Uses a tempdir for storage, removed on drop.
16 : pub struct Env {
17 : /// Whether to enable fsync.
18 : pub fsync: bool,
19 : /// Benchmark directory. Deleted when dropped.
20 : pub tempdir: Utf8TempDir,
21 : }
22 :
23 : impl Env {
24 : /// Creates a new test or benchmarking environment in a temporary directory. fsync controls whether to
25 : /// enable fsyncing.
26 0 : pub fn new(fsync: bool) -> anyhow::Result<Self> {
27 0 : let tempdir = camino_tempfile::tempdir()?;
28 0 : Ok(Self { fsync, tempdir })
29 0 : }
30 :
31 : /// Constructs a Safekeeper config for the given node ID.
32 0 : fn make_conf(&self, node_id: NodeId) -> SafeKeeperConf {
33 0 : let mut conf = SafeKeeperConf::dummy();
34 0 : conf.my_id = node_id;
35 0 : conf.no_sync = !self.fsync;
36 0 : conf.workdir = self.tempdir.path().join(format!("safekeeper-{node_id}"));
37 0 : conf
38 0 : }
39 :
40 : /// Constructs a Safekeeper with the given node and tenant/timeline ID.
41 : ///
42 : /// TODO: we should support using in-memory storage, to measure non-IO costs. This would be
43 : /// easier if SafeKeeper used trait objects for storage rather than generics. It's also not
44 : /// currently possible to construct a timeline using non-file storage since StateSK only accepts
45 : /// SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>.
46 0 : pub async fn make_safekeeper(
47 0 : &self,
48 0 : node_id: NodeId,
49 0 : ttid: TenantTimelineId,
50 0 : start_lsn: Lsn,
51 0 : ) -> anyhow::Result<SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>> {
52 0 : let conf = self.make_conf(node_id);
53 0 :
54 0 : let timeline_dir = get_timeline_dir(&conf, &ttid);
55 0 : create_dir_all(&timeline_dir).await?;
56 :
57 0 : let mut pstate = TimelinePersistentState::empty();
58 0 : pstate.tenant_id = ttid.tenant_id;
59 0 : pstate.timeline_id = ttid.timeline_id;
60 :
61 0 : let wal = wal_storage::PhysicalStorage::new(&ttid, &timeline_dir, &pstate, conf.no_sync)?;
62 0 : let ctrl =
63 0 : control_file::FileStorage::create_new(&timeline_dir, pstate, conf.no_sync).await?;
64 0 : let state = TimelineState::new(ctrl);
65 0 : let mut safekeeper = SafeKeeper::new(state, wal, conf.my_id)?;
66 :
67 : // Emulate an initial election.
68 0 : safekeeper
69 0 : .process_msg(&ProposerAcceptorMessage::Elected(ProposerElected {
70 0 : term: 1,
71 0 : start_streaming_at: start_lsn,
72 0 : term_history: TermHistory(vec![(1, start_lsn).into()]),
73 0 : timeline_start_lsn: start_lsn,
74 0 : }))
75 0 : .await?;
76 :
77 0 : Ok(safekeeper)
78 0 : }
79 :
80 : /// Constructs a timeline, including a new Safekeeper with the given node ID, and spawns its
81 : /// manager task.
82 0 : pub async fn make_timeline(
83 0 : &self,
84 0 : node_id: NodeId,
85 0 : ttid: TenantTimelineId,
86 0 : start_lsn: Lsn,
87 0 : ) -> anyhow::Result<Arc<Timeline>> {
88 0 : let conf = Arc::new(self.make_conf(node_id));
89 0 : let timeline_dir = get_timeline_dir(&conf, &ttid);
90 0 : let remote_path = remote_timeline_path(&ttid)?;
91 :
92 0 : let safekeeper = self.make_safekeeper(node_id, ttid, start_lsn).await?;
93 0 : let shared_state = SharedState::new(StateSK::Loaded(safekeeper));
94 0 :
95 0 : let timeline = Timeline::new(
96 0 : ttid,
97 0 : &timeline_dir,
98 0 : &remote_path,
99 0 : shared_state,
100 0 : conf.clone(),
101 0 : );
102 0 : timeline.bootstrap(
103 0 : &mut timeline.write_shared_state().await,
104 0 : &conf,
105 0 : Arc::new(TimelinesSet::default()), // ignored for now
106 0 : RateLimiter::new(0, 0),
107 0 : );
108 0 : Ok(timeline)
109 0 : }
110 : }
|