Line data Source code
1 : use std::ffi::CStr;
2 : use std::sync::Arc;
3 :
4 : use camino_tempfile::Utf8TempDir;
5 : use postgres_ffi::v17::wal_generator::{LogicalMessageGenerator, WalGenerator};
6 : use safekeeper_api::membership::SafekeeperGeneration as Generation;
7 : use tokio::fs::create_dir_all;
8 : use utils::id::{NodeId, TenantTimelineId};
9 : use utils::lsn::Lsn;
10 :
11 : use crate::rate_limit::RateLimiter;
12 : use crate::receive_wal::WalAcceptor;
13 : use crate::safekeeper::{
14 : AcceptorProposerMessage, AppendRequest, AppendRequestHeader, ProposerAcceptorMessage,
15 : ProposerElected, SafeKeeper, TermHistory,
16 : };
17 : use crate::send_wal::EndWatch;
18 : use crate::state::{TimelinePersistentState, TimelineState};
19 : use crate::timeline::{SharedState, StateSK, Timeline, get_timeline_dir};
20 : use crate::timelines_set::TimelinesSet;
21 : use crate::wal_backup::remote_timeline_path;
22 : use crate::{SafeKeeperConf, control_file, receive_wal, wal_storage};
23 :
24 : /// A Safekeeper testing or benchmarking environment. Uses a tempdir for storage, removed on drop.
25 : pub struct Env {
26 : /// Whether to enable fsync.
27 : pub fsync: bool,
28 : /// Benchmark directory. Deleted when dropped.
29 : pub tempdir: Utf8TempDir,
30 : }
31 :
32 : impl Env {
33 : /// Creates a new test or benchmarking environment in a temporary directory. fsync controls whether to
34 : /// enable fsyncing.
35 5 : pub fn new(fsync: bool) -> anyhow::Result<Self> {
36 5 : let tempdir = camino_tempfile::tempdir()?;
37 5 : Ok(Self { fsync, tempdir })
38 5 : }
39 :
40 : /// Constructs a Safekeeper config for the given node ID.
41 10 : fn make_conf(&self, node_id: NodeId) -> SafeKeeperConf {
42 10 : let mut conf = SafeKeeperConf::dummy();
43 10 : conf.my_id = node_id;
44 10 : conf.no_sync = !self.fsync;
45 10 : conf.workdir = self.tempdir.path().join(format!("safekeeper-{node_id}"));
46 10 : conf
47 10 : }
48 :
49 : /// Constructs a Safekeeper with the given node and tenant/timeline ID.
50 : ///
51 : /// TODO: we should support using in-memory storage, to measure non-IO costs. This would be
52 : /// easier if SafeKeeper used trait objects for storage rather than generics. It's also not
53 : /// currently possible to construct a timeline using non-file storage since StateSK only accepts
54 : /// SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>.
55 5 : pub async fn make_safekeeper(
56 5 : &self,
57 5 : node_id: NodeId,
58 5 : ttid: TenantTimelineId,
59 5 : start_lsn: Lsn,
60 5 : ) -> anyhow::Result<SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>> {
61 5 : let conf = self.make_conf(node_id);
62 5 :
63 5 : let timeline_dir = get_timeline_dir(&conf, &ttid);
64 5 : create_dir_all(&timeline_dir).await?;
65 :
66 5 : let mut pstate = TimelinePersistentState::empty();
67 5 : pstate.tenant_id = ttid.tenant_id;
68 5 : pstate.timeline_id = ttid.timeline_id;
69 :
70 5 : let wal = wal_storage::PhysicalStorage::new(&ttid, &timeline_dir, &pstate, conf.no_sync)?;
71 5 : let ctrl =
72 5 : control_file::FileStorage::create_new(&timeline_dir, pstate, conf.no_sync).await?;
73 5 : let state = TimelineState::new(ctrl);
74 5 : let mut safekeeper = SafeKeeper::new(state, wal, conf.my_id)?;
75 :
76 : // Emulate an initial election.
77 5 : safekeeper
78 5 : .process_msg(&ProposerAcceptorMessage::Elected(ProposerElected {
79 5 : generation: Generation::new(0),
80 5 : term: 1,
81 5 : start_streaming_at: start_lsn,
82 5 : term_history: TermHistory(vec![(1, start_lsn).into()]),
83 5 : }))
84 5 : .await?;
85 :
86 5 : Ok(safekeeper)
87 5 : }
88 :
89 : /// Constructs a timeline, including a new Safekeeper with the given node ID, and spawns its
90 : /// manager task.
91 5 : pub async fn make_timeline(
92 5 : &self,
93 5 : node_id: NodeId,
94 5 : ttid: TenantTimelineId,
95 5 : start_lsn: Lsn,
96 5 : ) -> anyhow::Result<Arc<Timeline>> {
97 5 : let conf = Arc::new(self.make_conf(node_id));
98 5 : let timeline_dir = get_timeline_dir(&conf, &ttid);
99 5 : let remote_path = remote_timeline_path(&ttid)?;
100 :
101 5 : let safekeeper = self.make_safekeeper(node_id, ttid, start_lsn).await?;
102 5 : let shared_state = SharedState::new(StateSK::Loaded(safekeeper));
103 5 :
104 5 : let timeline = Timeline::new(
105 5 : ttid,
106 5 : &timeline_dir,
107 5 : &remote_path,
108 5 : shared_state,
109 5 : conf.clone(),
110 5 : );
111 5 : timeline.bootstrap(
112 5 : &mut timeline.write_shared_state().await,
113 5 : &conf,
114 5 : Arc::new(TimelinesSet::default()), // ignored for now
115 5 : RateLimiter::new(0, 0),
116 5 : );
117 5 : Ok(timeline)
118 5 : }
119 :
120 : // This will be dead code when building a non-benchmark target with the
121 : // benchmarking feature enabled.
122 : #[allow(dead_code)]
123 5 : pub(crate) async fn write_wal(
124 5 : tli: Arc<Timeline>,
125 5 : start_lsn: Lsn,
126 5 : msg_size: usize,
127 5 : msg_count: usize,
128 5 : prefix: &CStr,
129 5 : mut next_record_lsns: Option<&mut Vec<Lsn>>,
130 5 : ) -> anyhow::Result<EndWatch> {
131 5 : let (msg_tx, msg_rx) = tokio::sync::mpsc::channel(receive_wal::MSG_QUEUE_SIZE);
132 5 : let (reply_tx, mut reply_rx) = tokio::sync::mpsc::channel(receive_wal::REPLY_QUEUE_SIZE);
133 5 :
134 5 : let end_watch = EndWatch::Commit(tli.get_commit_lsn_watch_rx());
135 5 :
136 5 : WalAcceptor::spawn(tli.wal_residence_guard().await?, msg_rx, reply_tx, Some(0));
137 5 :
138 5 : let prefixlen = prefix.to_bytes_with_nul().len();
139 5 : assert!(msg_size >= prefixlen);
140 5 : let message = vec![0; msg_size - prefixlen];
141 5 :
142 5 : let walgen =
143 5 : &mut WalGenerator::new(LogicalMessageGenerator::new(prefix, &message), start_lsn);
144 5 : for _ in 0..msg_count {
145 620 : let (lsn, record) = walgen.next().unwrap();
146 620 : if let Some(ref mut lsns) = next_record_lsns {
147 220 : lsns.push(lsn);
148 400 : }
149 :
150 620 : let req = AppendRequest {
151 620 : h: AppendRequestHeader {
152 620 : generation: Generation::new(0),
153 620 : term: 1,
154 620 : begin_lsn: lsn,
155 620 : end_lsn: lsn + record.len() as u64,
156 620 : commit_lsn: lsn,
157 620 : truncate_lsn: Lsn(0),
158 620 : },
159 620 : wal_data: record,
160 620 : };
161 620 :
162 620 : let end_lsn = req.h.end_lsn;
163 620 :
164 620 : let msg = ProposerAcceptorMessage::AppendRequest(req);
165 620 : msg_tx.send(msg).await?;
166 620 : while let Some(reply) = reply_rx.recv().await {
167 620 : if let AcceptorProposerMessage::AppendResponse(resp) = reply {
168 620 : if resp.flush_lsn >= end_lsn {
169 620 : break;
170 0 : }
171 0 : }
172 : }
173 : }
174 :
175 5 : Ok(end_watch)
176 5 : }
177 : }
|