Line data Source code
1 : //! Safekeeper timeline has a background task which is subscribed to `commit_lsn`
2 : //! and `flush_lsn` updates. After the partial segment was updated (`flush_lsn`
3 : //! was changed), the segment will be uploaded to S3 in about 15 minutes.
4 : //!
5 : //! The filename format for partial segments is
6 : //! `Segment_Term_Flush_Commit_skNN.partial`, where:
7 : //! - `Segment` – the segment name, like `000000010000000000000001`
8 : //! - `Term` – current term
9 : //! - `Flush` – flush_lsn in hex format `{:016X}`, e.g. `00000000346BC568`
10 : //! - `Commit` – commit_lsn in the same hex format
11 : //! - `NN` – safekeeper_id, like `1`
12 : //!
13 : //! The full object name example:
14 : //! `000000010000000000000002_2_0000000002534868_0000000002534410_sk1.partial`
15 : //!
16 : //! Each safekeeper will keep info about remote partial segments in its control
17 : //! file. Code updates state in the control file before doing any S3 operations.
18 : //! This way control file stores information about all potentially existing
19 : //! remote partial segments and can clean them up after uploading a newer version.
20 :
21 : use std::sync::Arc;
22 :
23 : use camino::Utf8PathBuf;
24 : use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI};
25 : use rand::Rng;
26 : use remote_storage::RemotePath;
27 : use serde::{Deserialize, Serialize};
28 :
29 : use tracing::{debug, error, info, instrument};
30 : use utils::lsn::Lsn;
31 :
32 : use crate::{
33 : metrics::{PARTIAL_BACKUP_UPLOADED_BYTES, PARTIAL_BACKUP_UPLOADS},
34 : safekeeper::Term,
35 : timeline::Timeline,
36 : wal_backup, SafeKeeperConf,
37 : };
38 :
39 0 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
40 : pub enum UploadStatus {
41 : /// Upload is in progress
42 : InProgress,
43 : /// Upload is finished
44 : Uploaded,
45 : /// Deletion is in progress
46 : Deleting,
47 : }
48 :
49 0 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
50 : pub struct PartialRemoteSegment {
51 : pub status: UploadStatus,
52 : pub name: String,
53 : pub commit_lsn: Lsn,
54 : pub flush_lsn: Lsn,
55 : pub term: Term,
56 : }
57 :
58 : impl PartialRemoteSegment {
59 0 : fn eq_without_status(&self, other: &Self) -> bool {
60 0 : self.name == other.name
61 0 : && self.commit_lsn == other.commit_lsn
62 0 : && self.flush_lsn == other.flush_lsn
63 0 : && self.term == other.term
64 0 : }
65 : }
66 :
67 : // NB: these structures are a part of a control_file, you can't change them without
68 : // changing the control file format version.
69 6 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
70 : pub struct State {
71 : pub segments: Vec<PartialRemoteSegment>,
72 : }
73 :
74 : impl State {
75 : /// Find an Uploaded segment. There should be only one Uploaded segment at a time.
76 0 : fn uploaded_segment(&self) -> Option<PartialRemoteSegment> {
77 0 : self.segments
78 0 : .iter()
79 0 : .find(|seg| seg.status == UploadStatus::Uploaded)
80 0 : .cloned()
81 0 : }
82 : }
83 :
84 : struct PartialBackup {
85 : wal_seg_size: usize,
86 : tli: Arc<Timeline>,
87 : conf: SafeKeeperConf,
88 : local_prefix: Utf8PathBuf,
89 : remote_prefix: Utf8PathBuf,
90 :
91 : state: State,
92 : }
93 :
94 : // Read-only methods for getting segment names
95 : impl PartialBackup {
96 0 : fn segno(&self, lsn: Lsn) -> XLogSegNo {
97 0 : lsn.segment_number(self.wal_seg_size)
98 0 : }
99 :
100 0 : fn segment_name(&self, segno: u64) -> String {
101 0 : XLogFileName(PG_TLI, segno, self.wal_seg_size)
102 0 : }
103 :
104 0 : fn remote_segment_name(
105 0 : &self,
106 0 : segno: u64,
107 0 : term: u64,
108 0 : commit_lsn: Lsn,
109 0 : flush_lsn: Lsn,
110 0 : ) -> String {
111 0 : format!(
112 0 : "{}_{}_{:016X}_{:016X}_sk{}.partial",
113 0 : self.segment_name(segno),
114 0 : term,
115 0 : flush_lsn.0,
116 0 : commit_lsn.0,
117 0 : self.conf.my_id.0,
118 0 : )
119 0 : }
120 :
121 0 : fn local_segment_name(&self, segno: u64) -> String {
122 0 : format!("{}.partial", self.segment_name(segno))
123 0 : }
124 : }
125 :
126 : impl PartialBackup {
127 : /// Takes a lock to read actual safekeeper state and returns a segment that should be uploaded.
128 0 : async fn prepare_upload(&self) -> PartialRemoteSegment {
129 : // this operation takes a lock to get the actual state
130 0 : let sk_info = self.tli.get_safekeeper_info(&self.conf).await;
131 0 : let flush_lsn = Lsn(sk_info.flush_lsn);
132 0 : let commit_lsn = Lsn(sk_info.commit_lsn);
133 0 : let term = sk_info.term;
134 0 : let segno = self.segno(flush_lsn);
135 0 :
136 0 : let name = self.remote_segment_name(segno, term, commit_lsn, flush_lsn);
137 0 :
138 0 : PartialRemoteSegment {
139 0 : status: UploadStatus::InProgress,
140 0 : name,
141 0 : commit_lsn,
142 0 : flush_lsn,
143 0 : term,
144 0 : }
145 0 : }
146 :
147 : /// Reads segment from disk and uploads it to the remote storage.
148 0 : async fn upload_segment(&mut self, prepared: PartialRemoteSegment) -> anyhow::Result<()> {
149 0 : let flush_lsn = prepared.flush_lsn;
150 0 : let segno = self.segno(flush_lsn);
151 0 :
152 0 : // We're going to backup bytes from the start of the segment up to flush_lsn.
153 0 : let backup_bytes = flush_lsn.segment_offset(self.wal_seg_size);
154 0 :
155 0 : let local_path = self.local_prefix.join(self.local_segment_name(segno));
156 0 : let remote_path = RemotePath::new(self.remote_prefix.join(&prepared.name).as_ref())?;
157 :
158 : // Upload first `backup_bytes` bytes of the segment to the remote storage.
159 0 : wal_backup::backup_partial_segment(&local_path, &remote_path, backup_bytes).await?;
160 0 : PARTIAL_BACKUP_UPLOADED_BYTES.inc_by(backup_bytes as u64);
161 :
162 : // We uploaded the segment, now let's verify that the data is still actual.
163 : // If the term changed, we cannot guarantee the validity of the uploaded data.
164 : // If the term is the same, we know the data is not corrupted.
165 0 : let sk_info = self.tli.get_safekeeper_info(&self.conf).await;
166 0 : if sk_info.term != prepared.term {
167 0 : anyhow::bail!("term changed during upload");
168 0 : }
169 0 : assert!(prepared.commit_lsn <= Lsn(sk_info.commit_lsn));
170 0 : assert!(prepared.flush_lsn <= Lsn(sk_info.flush_lsn));
171 :
172 0 : Ok(())
173 0 : }
174 :
175 : /// Write new state to disk. If in-memory and on-disk states diverged, returns an error.
176 0 : async fn commit_state(&mut self, new_state: State) -> anyhow::Result<()> {
177 0 : self.tli
178 0 : .map_control_file(|cf| {
179 0 : if cf.partial_backup != self.state {
180 0 : let memory = self.state.clone();
181 0 : self.state = cf.partial_backup.clone();
182 0 : anyhow::bail!(
183 0 : "partial backup state diverged, memory={:?}, disk={:?}",
184 0 : memory,
185 0 : cf.partial_backup
186 0 : );
187 0 : }
188 0 :
189 0 : cf.partial_backup = new_state.clone();
190 0 : Ok(())
191 0 : })
192 0 : .await?;
193 : // update in-memory state
194 0 : self.state = new_state;
195 0 : Ok(())
196 0 : }
197 :
198 : /// Upload the latest version of the partial segment and garbage collect older versions.
199 0 : #[instrument(name = "upload", skip_all, fields(name = %prepared.name))]
200 : async fn do_upload(&mut self, prepared: &PartialRemoteSegment) -> anyhow::Result<()> {
201 : info!("starting upload {:?}", prepared);
202 :
203 : let state_0 = self.state.clone();
204 : let state_1 = {
205 : let mut state = state_0.clone();
206 : state.segments.push(prepared.clone());
207 : state
208 : };
209 :
210 : // we're going to upload a new segment, let's write it to disk to make GC later
211 : self.commit_state(state_1).await?;
212 :
213 : self.upload_segment(prepared.clone()).await?;
214 :
215 : let state_2 = {
216 : let mut state = state_0.clone();
217 : for seg in state.segments.iter_mut() {
218 : seg.status = UploadStatus::Deleting;
219 : }
220 : let mut actual_remote_segment = prepared.clone();
221 : actual_remote_segment.status = UploadStatus::Uploaded;
222 : state.segments.push(actual_remote_segment);
223 : state
224 : };
225 :
226 : // we've uploaded new segment, it's actual, all other segments should be GCed
227 : self.commit_state(state_2).await?;
228 : self.gc().await?;
229 :
230 : Ok(())
231 : }
232 :
233 : /// Delete all non-Uploaded segments from the remote storage. There should be only one
234 : /// Uploaded segment at a time.
235 0 : #[instrument(name = "gc", skip_all)]
236 : async fn gc(&mut self) -> anyhow::Result<()> {
237 : let mut segments_to_delete = vec![];
238 :
239 : let new_segments: Vec<PartialRemoteSegment> = self
240 : .state
241 : .segments
242 : .iter()
243 0 : .filter_map(|seg| {
244 0 : if seg.status == UploadStatus::Uploaded {
245 0 : Some(seg.clone())
246 : } else {
247 0 : segments_to_delete.push(seg.name.clone());
248 0 : None
249 : }
250 0 : })
251 : .collect();
252 :
253 : info!("deleting objects: {:?}", segments_to_delete);
254 : let mut objects_to_delete = vec![];
255 : for seg in segments_to_delete.iter() {
256 : let remote_path = RemotePath::new(self.remote_prefix.join(seg).as_ref())?;
257 : objects_to_delete.push(remote_path);
258 : }
259 :
260 : // removing segments from remote storage
261 : wal_backup::delete_objects(&objects_to_delete).await?;
262 :
263 : // now we can update the state on disk
264 : let new_state = {
265 : let mut state = self.state.clone();
266 : state.segments = new_segments;
267 : state
268 : };
269 : self.commit_state(new_state).await?;
270 :
271 : Ok(())
272 : }
273 : }
274 :
275 0 : #[instrument(name = "Partial backup", skip_all, fields(ttid = %tli.ttid))]
276 : pub async fn main_task(tli: Arc<Timeline>, conf: SafeKeeperConf) {
277 : debug!("started");
278 : let await_duration = conf.partial_backup_timeout;
279 :
280 : let mut cancellation_rx = match tli.get_cancellation_rx() {
281 : Ok(rx) => rx,
282 : Err(_) => {
283 : info!("timeline canceled during task start");
284 : return;
285 : }
286 : };
287 :
288 : // sleep for random time to avoid thundering herd
289 : {
290 : let randf64 = rand::thread_rng().gen_range(0.0..1.0);
291 : let sleep_duration = await_duration.mul_f64(randf64);
292 : tokio::time::sleep(sleep_duration).await;
293 : }
294 :
295 : let (_, persistent_state) = tli.get_state().await;
296 : let mut commit_lsn_rx = tli.get_commit_lsn_watch_rx();
297 : let mut flush_lsn_rx = tli.get_term_flush_lsn_watch_rx();
298 : let wal_seg_size = tli.get_wal_seg_size().await;
299 :
300 : let local_prefix = tli.timeline_dir.clone();
301 : let remote_prefix = match tli.timeline_dir.strip_prefix(&conf.workdir) {
302 : Ok(path) => path.to_owned(),
303 : Err(e) => {
304 : error!("failed to strip workspace dir prefix: {:?}", e);
305 : return;
306 : }
307 : };
308 :
309 : let mut backup = PartialBackup {
310 : wal_seg_size,
311 : tli,
312 : state: persistent_state.partial_backup,
313 : conf,
314 : local_prefix,
315 : remote_prefix,
316 : };
317 :
318 : debug!("state: {:?}", backup.state);
319 :
320 : 'outer: loop {
321 : // wait until we have something to upload
322 : let uploaded_segment = backup.state.uploaded_segment();
323 : if let Some(seg) = &uploaded_segment {
324 : // if we already uploaded something, wait until we have something new
325 : while flush_lsn_rx.borrow().lsn == seg.flush_lsn
326 : && *commit_lsn_rx.borrow() == seg.commit_lsn
327 : && flush_lsn_rx.borrow().term == seg.term
328 : {
329 : tokio::select! {
330 : _ = cancellation_rx.changed() => {
331 : info!("timeline canceled");
332 : return;
333 : }
334 : _ = commit_lsn_rx.changed() => {}
335 : _ = flush_lsn_rx.changed() => {}
336 : }
337 : }
338 : }
339 :
340 : // if we don't have any data and zero LSNs, wait for something
341 : while flush_lsn_rx.borrow().lsn == Lsn(0) {
342 : tokio::select! {
343 : _ = cancellation_rx.changed() => {
344 : info!("timeline canceled");
345 : return;
346 : }
347 : _ = flush_lsn_rx.changed() => {}
348 : }
349 : }
350 :
351 : // fixing the segno and waiting some time to prevent reuploading the same segment too often
352 : let pending_segno = backup.segno(flush_lsn_rx.borrow().lsn);
353 : let timeout = tokio::time::sleep(await_duration);
354 : tokio::pin!(timeout);
355 : let mut timeout_expired = false;
356 :
357 : // waiting until timeout expires OR segno changes
358 : 'inner: loop {
359 : tokio::select! {
360 : _ = cancellation_rx.changed() => {
361 : info!("timeline canceled");
362 : return;
363 : }
364 : _ = commit_lsn_rx.changed() => {}
365 : _ = flush_lsn_rx.changed() => {
366 : let segno = backup.segno(flush_lsn_rx.borrow().lsn);
367 : if segno != pending_segno {
368 : // previous segment is no longer partial, aborting the wait
369 : break 'inner;
370 : }
371 : }
372 : _ = &mut timeout => {
373 : // timeout expired, now we are ready for upload
374 : timeout_expired = true;
375 : break 'inner;
376 : }
377 : }
378 : }
379 :
380 : if !timeout_expired {
381 : // likely segno has changed, let's try again in the next iteration
382 : continue 'outer;
383 : }
384 :
385 : let prepared = backup.prepare_upload().await;
386 : if let Some(seg) = &uploaded_segment {
387 : if seg.eq_without_status(&prepared) {
388 : // we already uploaded this segment, nothing to do
389 : continue 'outer;
390 : }
391 : }
392 :
393 : match backup.do_upload(&prepared).await {
394 : Ok(()) => {
395 : debug!(
396 : "uploaded {} up to flush_lsn {}",
397 : prepared.name, prepared.flush_lsn
398 : );
399 : PARTIAL_BACKUP_UPLOADS.with_label_values(&["ok"]).inc();
400 : }
401 : Err(e) => {
402 : info!("failed to upload {}: {:#}", prepared.name, e);
403 : PARTIAL_BACKUP_UPLOADS.with_label_values(&["error"]).inc();
404 : }
405 : }
406 : }
407 : }
|