Line data Source code
1 : use std::ffi::CStr;
2 : use std::sync::Arc;
3 :
4 : use camino_tempfile::Utf8TempDir;
5 : use postgres_ffi::v17::wal_generator::{LogicalMessageGenerator, WalGenerator};
6 : use safekeeper_api::membership::SafekeeperGeneration as Generation;
7 : use tokio::fs::create_dir_all;
8 : use utils::id::{NodeId, TenantTimelineId};
9 : use utils::lsn::Lsn;
10 :
11 : use crate::rate_limit::RateLimiter;
12 : use crate::receive_wal::WalAcceptor;
13 : use crate::safekeeper::{
14 : AcceptorProposerMessage, AppendRequest, AppendRequestHeader, ProposerAcceptorMessage,
15 : ProposerElected, SafeKeeper, TermHistory,
16 : };
17 : use crate::send_wal::EndWatch;
18 : use crate::state::{TimelinePersistentState, TimelineState};
19 : use crate::timeline::{SharedState, StateSK, Timeline, get_timeline_dir};
20 : use crate::timelines_set::TimelinesSet;
21 : use crate::wal_backup::{WalBackup, remote_timeline_path};
22 : use crate::{SafeKeeperConf, control_file, receive_wal, wal_storage};
23 :
24 : /// A Safekeeper testing or benchmarking environment. Uses a tempdir for storage, removed on drop.
25 : pub struct Env {
26 : /// Whether to enable fsync.
27 : pub fsync: bool,
28 : /// Benchmark directory. Deleted when dropped.
29 : pub tempdir: Utf8TempDir,
30 : }
31 :
32 : impl Env {
33 : /// Creates a new test or benchmarking environment in a temporary directory. fsync controls whether to
34 : /// enable fsyncing.
35 5 : pub fn new(fsync: bool) -> anyhow::Result<Self> {
36 5 : let tempdir = camino_tempfile::tempdir()?;
37 5 : Ok(Self { fsync, tempdir })
38 5 : }
39 :
40 : /// Constructs a Safekeeper config for the given node ID.
41 10 : fn make_conf(&self, node_id: NodeId) -> SafeKeeperConf {
42 10 : let mut conf = SafeKeeperConf::dummy();
43 10 : conf.my_id = node_id;
44 10 : conf.no_sync = !self.fsync;
45 10 : conf.workdir = self.tempdir.path().join(format!("safekeeper-{node_id}"));
46 10 : conf
47 10 : }
48 :
49 : /// Constructs a Safekeeper with the given node and tenant/timeline ID.
50 : ///
51 : /// TODO: we should support using in-memory storage, to measure non-IO costs. This would be
52 : /// easier if SafeKeeper used trait objects for storage rather than generics. It's also not
53 : /// currently possible to construct a timeline using non-file storage since StateSK only accepts
54 : /// SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>.
55 5 : pub async fn make_safekeeper(
56 5 : &self,
57 5 : node_id: NodeId,
58 5 : ttid: TenantTimelineId,
59 5 : start_lsn: Lsn,
60 5 : ) -> anyhow::Result<SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>> {
61 5 : let conf = self.make_conf(node_id);
62 :
63 5 : let timeline_dir = get_timeline_dir(&conf, &ttid);
64 5 : create_dir_all(&timeline_dir).await?;
65 :
66 5 : let mut pstate = TimelinePersistentState::empty();
67 5 : pstate.tenant_id = ttid.tenant_id;
68 5 : pstate.timeline_id = ttid.timeline_id;
69 :
70 5 : let wal = wal_storage::PhysicalStorage::new(&ttid, &timeline_dir, &pstate, conf.no_sync)?;
71 5 : let ctrl =
72 5 : control_file::FileStorage::create_new(&timeline_dir, pstate, conf.no_sync).await?;
73 5 : let state = TimelineState::new(ctrl);
74 5 : let mut safekeeper = SafeKeeper::new(state, wal, conf.my_id)?;
75 :
76 : // Emulate an initial election.
77 5 : safekeeper
78 5 : .process_msg(&ProposerAcceptorMessage::Elected(ProposerElected {
79 5 : generation: Generation::new(0),
80 5 : term: 1,
81 5 : start_streaming_at: start_lsn,
82 5 : term_history: TermHistory(vec![(1, start_lsn).into()]),
83 5 : }))
84 5 : .await?;
85 :
86 5 : Ok(safekeeper)
87 5 : }
88 :
89 : /// Constructs a timeline, including a new Safekeeper with the given node ID, and spawns its
90 : /// manager task.
91 5 : pub async fn make_timeline(
92 5 : &self,
93 5 : node_id: NodeId,
94 5 : ttid: TenantTimelineId,
95 5 : start_lsn: Lsn,
96 5 : ) -> anyhow::Result<Arc<Timeline>> {
97 5 : let conf = Arc::new(self.make_conf(node_id));
98 5 : let timeline_dir = get_timeline_dir(&conf, &ttid);
99 5 : let remote_path = remote_timeline_path(&ttid)?;
100 :
101 5 : let safekeeper = self.make_safekeeper(node_id, ttid, start_lsn).await?;
102 5 : let shared_state = SharedState::new(StateSK::Loaded(safekeeper));
103 :
104 5 : let wal_backup = Arc::new(WalBackup::new(&conf).await?);
105 :
106 5 : let timeline = Timeline::new(
107 5 : ttid,
108 5 : &timeline_dir,
109 5 : &remote_path,
110 5 : shared_state,
111 5 : conf.clone(),
112 5 : wal_backup.clone(),
113 : );
114 5 : timeline.bootstrap(
115 5 : &mut timeline.write_shared_state().await,
116 5 : &conf,
117 5 : Arc::new(TimelinesSet::default()), // ignored for now
118 5 : RateLimiter::new(0, 0),
119 5 : wal_backup,
120 : );
121 5 : Ok(timeline)
122 5 : }
123 :
124 : // This will be dead code when building a non-benchmark target with the
125 : // benchmarking feature enabled.
126 : #[allow(dead_code)]
127 5 : pub(crate) async fn write_wal(
128 5 : tli: Arc<Timeline>,
129 5 : start_lsn: Lsn,
130 5 : msg_size: usize,
131 5 : msg_count: usize,
132 5 : prefix: &CStr,
133 5 : mut next_record_lsns: Option<&mut Vec<Lsn>>,
134 5 : ) -> anyhow::Result<EndWatch> {
135 5 : let (msg_tx, msg_rx) = tokio::sync::mpsc::channel(receive_wal::MSG_QUEUE_SIZE);
136 5 : let (reply_tx, mut reply_rx) = tokio::sync::mpsc::channel(receive_wal::REPLY_QUEUE_SIZE);
137 :
138 5 : let end_watch = EndWatch::Commit(tli.get_commit_lsn_watch_rx());
139 :
140 5 : WalAcceptor::spawn(tli.wal_residence_guard().await?, msg_rx, reply_tx, Some(0));
141 :
142 5 : let prefixlen = prefix.to_bytes_with_nul().len();
143 5 : assert!(msg_size >= prefixlen);
144 5 : let message = vec![0; msg_size - prefixlen];
145 :
146 5 : let walgen =
147 5 : &mut WalGenerator::new(LogicalMessageGenerator::new(prefix, &message), start_lsn);
148 5 : for _ in 0..msg_count {
149 620 : let (lsn, record) = walgen.next().unwrap();
150 620 : if let Some(ref mut lsns) = next_record_lsns {
151 220 : lsns.push(lsn);
152 400 : }
153 :
154 620 : let req = AppendRequest {
155 620 : h: AppendRequestHeader {
156 620 : generation: Generation::new(0),
157 620 : term: 1,
158 620 : begin_lsn: lsn,
159 620 : end_lsn: lsn + record.len() as u64,
160 620 : commit_lsn: lsn,
161 620 : truncate_lsn: Lsn(0),
162 620 : },
163 620 : wal_data: record,
164 620 : };
165 :
166 620 : let end_lsn = req.h.end_lsn;
167 :
168 620 : let msg = ProposerAcceptorMessage::AppendRequest(req);
169 620 : msg_tx.send(msg).await?;
170 620 : while let Some(reply) = reply_rx.recv().await {
171 620 : if let AcceptorProposerMessage::AppendResponse(resp) = reply {
172 620 : if resp.flush_lsn >= end_lsn {
173 620 : break;
174 0 : }
175 0 : }
176 : }
177 : }
178 :
179 5 : Ok(end_watch)
180 5 : }
181 : }
|