Line data Source code
1 : use std::sync::Arc;
2 :
3 : use camino_tempfile::Utf8TempDir;
4 : use postgres_ffi::v17::wal_generator::{LogicalMessageGenerator, WalGenerator};
5 : use safekeeper_api::membership::SafekeeperGeneration as Generation;
6 : use tokio::fs::create_dir_all;
7 : use utils::id::{NodeId, TenantTimelineId};
8 : use utils::lsn::Lsn;
9 :
10 : use crate::rate_limit::RateLimiter;
11 : use crate::receive_wal::WalAcceptor;
12 : use crate::safekeeper::{
13 : AcceptorProposerMessage, AppendRequest, AppendRequestHeader, ProposerAcceptorMessage,
14 : ProposerElected, SafeKeeper, TermHistory,
15 : };
16 : use crate::send_wal::EndWatch;
17 : use crate::state::{TimelinePersistentState, TimelineState};
18 : use crate::timeline::{SharedState, StateSK, Timeline, get_timeline_dir};
19 : use crate::timelines_set::TimelinesSet;
20 : use crate::wal_backup::remote_timeline_path;
21 : use crate::{SafeKeeperConf, control_file, receive_wal, wal_storage};
22 :
23 : /// A Safekeeper testing or benchmarking environment. Uses a tempdir for storage, removed on drop.
24 : pub struct Env {
25 : /// Whether to enable fsync.
26 : pub fsync: bool,
27 : /// Benchmark directory. Deleted when dropped.
28 : pub tempdir: Utf8TempDir,
29 : }
30 :
31 : impl Env {
32 : /// Creates a new test or benchmarking environment in a temporary directory. fsync controls whether to
33 : /// enable fsyncing.
34 4 : pub fn new(fsync: bool) -> anyhow::Result<Self> {
35 4 : let tempdir = camino_tempfile::tempdir()?;
36 4 : Ok(Self { fsync, tempdir })
37 4 : }
38 :
39 : /// Constructs a Safekeeper config for the given node ID.
40 8 : fn make_conf(&self, node_id: NodeId) -> SafeKeeperConf {
41 8 : let mut conf = SafeKeeperConf::dummy();
42 8 : conf.my_id = node_id;
43 8 : conf.no_sync = !self.fsync;
44 8 : conf.workdir = self.tempdir.path().join(format!("safekeeper-{node_id}"));
45 8 : conf
46 8 : }
47 :
48 : /// Constructs a Safekeeper with the given node and tenant/timeline ID.
49 : ///
50 : /// TODO: we should support using in-memory storage, to measure non-IO costs. This would be
51 : /// easier if SafeKeeper used trait objects for storage rather than generics. It's also not
52 : /// currently possible to construct a timeline using non-file storage since StateSK only accepts
53 : /// SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>.
54 4 : pub async fn make_safekeeper(
55 4 : &self,
56 4 : node_id: NodeId,
57 4 : ttid: TenantTimelineId,
58 4 : start_lsn: Lsn,
59 4 : ) -> anyhow::Result<SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>> {
60 4 : let conf = self.make_conf(node_id);
61 4 :
62 4 : let timeline_dir = get_timeline_dir(&conf, &ttid);
63 4 : create_dir_all(&timeline_dir).await?;
64 :
65 4 : let mut pstate = TimelinePersistentState::empty();
66 4 : pstate.tenant_id = ttid.tenant_id;
67 4 : pstate.timeline_id = ttid.timeline_id;
68 :
69 4 : let wal = wal_storage::PhysicalStorage::new(&ttid, &timeline_dir, &pstate, conf.no_sync)?;
70 4 : let ctrl =
71 4 : control_file::FileStorage::create_new(&timeline_dir, pstate, conf.no_sync).await?;
72 4 : let state = TimelineState::new(ctrl);
73 4 : let mut safekeeper = SafeKeeper::new(state, wal, conf.my_id)?;
74 :
75 : // Emulate an initial election.
76 4 : safekeeper
77 4 : .process_msg(&ProposerAcceptorMessage::Elected(ProposerElected {
78 4 : generation: Generation::new(0),
79 4 : term: 1,
80 4 : start_streaming_at: start_lsn,
81 4 : term_history: TermHistory(vec![(1, start_lsn).into()]),
82 4 : }))
83 4 : .await?;
84 :
85 4 : Ok(safekeeper)
86 4 : }
87 :
88 : /// Constructs a timeline, including a new Safekeeper with the given node ID, and spawns its
89 : /// manager task.
90 4 : pub async fn make_timeline(
91 4 : &self,
92 4 : node_id: NodeId,
93 4 : ttid: TenantTimelineId,
94 4 : start_lsn: Lsn,
95 4 : ) -> anyhow::Result<Arc<Timeline>> {
96 4 : let conf = Arc::new(self.make_conf(node_id));
97 4 : let timeline_dir = get_timeline_dir(&conf, &ttid);
98 4 : let remote_path = remote_timeline_path(&ttid)?;
99 :
100 4 : let safekeeper = self.make_safekeeper(node_id, ttid, start_lsn).await?;
101 4 : let shared_state = SharedState::new(StateSK::Loaded(safekeeper));
102 4 :
103 4 : let timeline = Timeline::new(
104 4 : ttid,
105 4 : &timeline_dir,
106 4 : &remote_path,
107 4 : shared_state,
108 4 : conf.clone(),
109 4 : );
110 4 : timeline.bootstrap(
111 4 : &mut timeline.write_shared_state().await,
112 4 : &conf,
113 4 : Arc::new(TimelinesSet::default()), // ignored for now
114 4 : RateLimiter::new(0, 0),
115 4 : );
116 4 : Ok(timeline)
117 4 : }
118 :
119 : // This will be dead code when building a non-benchmark target with the
120 : // benchmarking feature enabled.
121 : #[allow(dead_code)]
122 4 : pub(crate) async fn write_wal(
123 4 : tli: Arc<Timeline>,
124 4 : start_lsn: Lsn,
125 4 : msg_size: usize,
126 4 : msg_count: usize,
127 4 : mut next_record_lsns: Option<&mut Vec<Lsn>>,
128 4 : ) -> anyhow::Result<EndWatch> {
129 4 : let (msg_tx, msg_rx) = tokio::sync::mpsc::channel(receive_wal::MSG_QUEUE_SIZE);
130 4 : let (reply_tx, mut reply_rx) = tokio::sync::mpsc::channel(receive_wal::REPLY_QUEUE_SIZE);
131 4 :
132 4 : let end_watch = EndWatch::Commit(tli.get_commit_lsn_watch_rx());
133 4 :
134 4 : WalAcceptor::spawn(tli.wal_residence_guard().await?, msg_rx, reply_tx, Some(0));
135 4 :
136 4 : let prefix = c"neon-file:";
137 4 : let prefixlen = prefix.to_bytes_with_nul().len();
138 4 : assert!(msg_size >= prefixlen);
139 4 : let message = vec![0; msg_size - prefixlen];
140 4 :
141 4 : let walgen =
142 4 : &mut WalGenerator::new(LogicalMessageGenerator::new(prefix, &message), start_lsn);
143 4 : for _ in 0..msg_count {
144 610 : let (lsn, record) = walgen.next().unwrap();
145 610 : if let Some(ref mut lsns) = next_record_lsns {
146 210 : lsns.push(lsn);
147 400 : }
148 :
149 610 : let req = AppendRequest {
150 610 : h: AppendRequestHeader {
151 610 : generation: Generation::new(0),
152 610 : term: 1,
153 610 : begin_lsn: lsn,
154 610 : end_lsn: lsn + record.len() as u64,
155 610 : commit_lsn: lsn,
156 610 : truncate_lsn: Lsn(0),
157 610 : },
158 610 : wal_data: record,
159 610 : };
160 610 :
161 610 : let end_lsn = req.h.end_lsn;
162 610 :
163 610 : let msg = ProposerAcceptorMessage::AppendRequest(req);
164 610 : msg_tx.send(msg).await?;
165 610 : while let Some(reply) = reply_rx.recv().await {
166 610 : if let AcceptorProposerMessage::AppendResponse(resp) = reply {
167 610 : if resp.flush_lsn >= end_lsn {
168 610 : break;
169 0 : }
170 0 : }
171 : }
172 : }
173 :
174 4 : Ok(end_watch)
175 4 : }
176 : }
|