Line data Source code
1 : use std::sync::Arc;
2 :
3 : use crate::rate_limit::RateLimiter;
4 : use crate::receive_wal::WalAcceptor;
5 : use crate::safekeeper::{
6 : AcceptorProposerMessage, AppendRequest, AppendRequestHeader, ProposerAcceptorMessage,
7 : ProposerElected, SafeKeeper, TermHistory,
8 : };
9 : use crate::send_wal::EndWatch;
10 : use crate::state::{TimelinePersistentState, TimelineState};
11 : use crate::timeline::{get_timeline_dir, SharedState, StateSK, Timeline};
12 : use crate::timelines_set::TimelinesSet;
13 : use crate::wal_backup::remote_timeline_path;
14 : use crate::{control_file, receive_wal, wal_storage, SafeKeeperConf};
15 : use camino_tempfile::Utf8TempDir;
16 : use postgres_ffi::v17::wal_generator::{LogicalMessageGenerator, WalGenerator};
17 : use tokio::fs::create_dir_all;
18 : use utils::id::{NodeId, TenantTimelineId};
19 : use utils::lsn::Lsn;
20 :
21 : /// A Safekeeper testing or benchmarking environment. Uses a tempdir for storage, removed on drop.
22 : pub struct Env {
23 : /// Whether to enable fsync.
24 : pub fsync: bool,
25 : /// Benchmark directory. Deleted when dropped.
26 : pub tempdir: Utf8TempDir,
27 : }
28 :
29 : impl Env {
30 : /// Creates a new test or benchmarking environment in a temporary directory. fsync controls whether to
31 : /// enable fsyncing.
32 3 : pub fn new(fsync: bool) -> anyhow::Result<Self> {
33 3 : let tempdir = camino_tempfile::tempdir()?;
34 3 : Ok(Self { fsync, tempdir })
35 3 : }
36 :
37 : /// Constructs a Safekeeper config for the given node ID.
38 6 : fn make_conf(&self, node_id: NodeId) -> SafeKeeperConf {
39 6 : let mut conf = SafeKeeperConf::dummy();
40 6 : conf.my_id = node_id;
41 6 : conf.no_sync = !self.fsync;
42 6 : conf.workdir = self.tempdir.path().join(format!("safekeeper-{node_id}"));
43 6 : conf
44 6 : }
45 :
46 : /// Constructs a Safekeeper with the given node and tenant/timeline ID.
47 : ///
48 : /// TODO: we should support using in-memory storage, to measure non-IO costs. This would be
49 : /// easier if SafeKeeper used trait objects for storage rather than generics. It's also not
50 : /// currently possible to construct a timeline using non-file storage since StateSK only accepts
51 : /// SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>.
52 3 : pub async fn make_safekeeper(
53 3 : &self,
54 3 : node_id: NodeId,
55 3 : ttid: TenantTimelineId,
56 3 : start_lsn: Lsn,
57 3 : ) -> anyhow::Result<SafeKeeper<control_file::FileStorage, wal_storage::PhysicalStorage>> {
58 3 : let conf = self.make_conf(node_id);
59 3 :
60 3 : let timeline_dir = get_timeline_dir(&conf, &ttid);
61 3 : create_dir_all(&timeline_dir).await?;
62 :
63 3 : let mut pstate = TimelinePersistentState::empty();
64 3 : pstate.tenant_id = ttid.tenant_id;
65 3 : pstate.timeline_id = ttid.timeline_id;
66 :
67 3 : let wal = wal_storage::PhysicalStorage::new(&ttid, &timeline_dir, &pstate, conf.no_sync)?;
68 3 : let ctrl =
69 3 : control_file::FileStorage::create_new(&timeline_dir, pstate, conf.no_sync).await?;
70 3 : let state = TimelineState::new(ctrl);
71 3 : let mut safekeeper = SafeKeeper::new(state, wal, conf.my_id)?;
72 :
73 : // Emulate an initial election.
74 3 : safekeeper
75 3 : .process_msg(&ProposerAcceptorMessage::Elected(ProposerElected {
76 3 : term: 1,
77 3 : start_streaming_at: start_lsn,
78 3 : term_history: TermHistory(vec![(1, start_lsn).into()]),
79 3 : timeline_start_lsn: start_lsn,
80 3 : }))
81 3 : .await?;
82 :
83 3 : Ok(safekeeper)
84 3 : }
85 :
86 : /// Constructs a timeline, including a new Safekeeper with the given node ID, and spawns its
87 : /// manager task.
88 3 : pub async fn make_timeline(
89 3 : &self,
90 3 : node_id: NodeId,
91 3 : ttid: TenantTimelineId,
92 3 : start_lsn: Lsn,
93 3 : ) -> anyhow::Result<Arc<Timeline>> {
94 3 : let conf = Arc::new(self.make_conf(node_id));
95 3 : let timeline_dir = get_timeline_dir(&conf, &ttid);
96 3 : let remote_path = remote_timeline_path(&ttid)?;
97 :
98 3 : let safekeeper = self.make_safekeeper(node_id, ttid, start_lsn).await?;
99 3 : let shared_state = SharedState::new(StateSK::Loaded(safekeeper));
100 3 :
101 3 : let timeline = Timeline::new(
102 3 : ttid,
103 3 : &timeline_dir,
104 3 : &remote_path,
105 3 : shared_state,
106 3 : conf.clone(),
107 3 : );
108 3 : timeline.bootstrap(
109 3 : &mut timeline.write_shared_state().await,
110 3 : &conf,
111 3 : Arc::new(TimelinesSet::default()), // ignored for now
112 3 : RateLimiter::new(0, 0),
113 3 : );
114 3 : Ok(timeline)
115 3 : }
116 :
117 : // This will be dead code when building a non-benchmark target with the
118 : // benchmarking feature enabled.
119 : #[allow(dead_code)]
120 3 : pub(crate) async fn write_wal(
121 3 : tli: Arc<Timeline>,
122 3 : start_lsn: Lsn,
123 3 : msg_size: usize,
124 3 : msg_count: usize,
125 3 : ) -> anyhow::Result<EndWatch> {
126 3 : let (msg_tx, msg_rx) = tokio::sync::mpsc::channel(receive_wal::MSG_QUEUE_SIZE);
127 3 : let (reply_tx, mut reply_rx) = tokio::sync::mpsc::channel(receive_wal::REPLY_QUEUE_SIZE);
128 3 :
129 3 : let end_watch = EndWatch::Commit(tli.get_commit_lsn_watch_rx());
130 3 :
131 3 : WalAcceptor::spawn(tli.wal_residence_guard().await?, msg_rx, reply_tx, Some(0));
132 3 :
133 3 : let prefix = c"p";
134 3 : let prefixlen = prefix.to_bytes_with_nul().len();
135 3 : assert!(msg_size >= prefixlen);
136 3 : let message = vec![0; msg_size - prefixlen];
137 3 :
138 3 : let walgen =
139 3 : &mut WalGenerator::new(LogicalMessageGenerator::new(prefix, &message), start_lsn);
140 3 : for _ in 0..msg_count {
141 600 : let (lsn, record) = walgen.next().unwrap();
142 600 :
143 600 : let req = AppendRequest {
144 600 : h: AppendRequestHeader {
145 600 : term: 1,
146 600 : term_start_lsn: start_lsn,
147 600 : begin_lsn: lsn,
148 600 : end_lsn: lsn + record.len() as u64,
149 600 : commit_lsn: lsn,
150 600 : truncate_lsn: Lsn(0),
151 600 : proposer_uuid: [0; 16],
152 600 : },
153 600 : wal_data: record,
154 600 : };
155 600 :
156 600 : let end_lsn = req.h.end_lsn;
157 600 :
158 600 : let msg = ProposerAcceptorMessage::AppendRequest(req);
159 600 : msg_tx.send(msg).await?;
160 600 : while let Some(reply) = reply_rx.recv().await {
161 600 : if let AcceptorProposerMessage::AppendResponse(resp) = reply {
162 600 : if resp.flush_lsn >= end_lsn {
163 600 : break;
164 0 : }
165 0 : }
166 : }
167 : }
168 :
169 3 : Ok(end_watch)
170 3 : }
171 : }
|