Line data Source code
1 : //! Safekeeper timeline has a background task which is subscribed to `commit_lsn`
2 : //! and `flush_lsn` updates. After the partial segment was updated (`flush_lsn`
3 : //! was changed), the segment will be uploaded to S3 in about 15 minutes.
4 : //!
5 : //! The filename format for partial segments is
6 : //! `Segment_Term_Flush_Commit_skNN.partial`, where:
7 : //! - `Segment` – the segment name, like `000000010000000000000001`
8 : //! - `Term` – current term
9 : //! - `Flush` – flush_lsn in hex format `{:016X}`, e.g. `00000000346BC568`
10 : //! - `Commit` – commit_lsn in the same hex format
11 : //! - `NN` – safekeeper_id, like `1`
12 : //!
13 : //! The full object name example:
14 : //! `000000010000000000000002_2_0000000002534868_0000000002534410_sk1.partial`
15 : //!
16 : //! Each safekeeper will keep info about remote partial segments in its control
17 : //! file. Code updates state in the control file before doing any S3 operations.
18 : //! This way control file stores information about all potentially existing
19 : //! remote partial segments and can clean them up after uploading a newer version.
20 : use camino::Utf8PathBuf;
21 : use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI};
22 : use remote_storage::RemotePath;
23 : use serde::{Deserialize, Serialize};
24 :
25 : use tracing::{debug, error, info, instrument, warn};
26 : use utils::{id::NodeId, lsn::Lsn};
27 :
28 : use crate::{
29 : metrics::{MISC_OPERATION_SECONDS, PARTIAL_BACKUP_UPLOADED_BYTES, PARTIAL_BACKUP_UPLOADS},
30 : rate_limit::{rand_duration, RateLimiter},
31 : safekeeper::Term,
32 : timeline::WalResidentTimeline,
33 : timeline_manager::StateSnapshot,
34 : wal_backup::{self, remote_timeline_path},
35 : SafeKeeperConf,
36 : };
37 :
38 0 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
39 : pub enum UploadStatus {
40 : /// Upload is in progress. This status should be used only for garbage collection,
41 : /// don't read data from the remote storage with this status.
42 : InProgress,
43 : /// Upload is finished. There is always at most one segment with this status.
44 : /// It means that the segment is actual and can be used.
45 : Uploaded,
46 : /// Deletion is in progress. This status should be used only for garbage collection,
47 : /// don't read data from the remote storage with this status.
48 : Deleting,
49 : }
50 :
51 0 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
52 : pub struct PartialRemoteSegment {
53 : pub status: UploadStatus,
54 : pub name: String,
55 : pub commit_lsn: Lsn,
56 : pub flush_lsn: Lsn,
57 : // We should use last_log_term here, otherwise it's possible to have inconsistent data in the
58 : // remote storage.
59 : //
60 : // More info here: https://github.com/neondatabase/neon/pull/8022#discussion_r1654738405
61 : pub term: Term,
62 : }
63 :
64 : impl PartialRemoteSegment {
65 0 : fn eq_without_status(&self, other: &Self) -> bool {
66 0 : self.name == other.name
67 0 : && self.commit_lsn == other.commit_lsn
68 0 : && self.flush_lsn == other.flush_lsn
69 0 : && self.term == other.term
70 0 : }
71 :
72 0 : pub(crate) fn remote_path(&self, remote_timeline_path: &RemotePath) -> RemotePath {
73 0 : remote_timeline_path.join(&self.name)
74 0 : }
75 : }
76 :
77 : // NB: these structures are a part of a control_file, you can't change them without
78 : // changing the control file format version.
79 6 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
80 : pub struct State {
81 : pub segments: Vec<PartialRemoteSegment>,
82 : }
83 :
84 : #[derive(Debug)]
85 : pub(crate) struct ReplaceUploadedSegment {
86 : pub(crate) previous: PartialRemoteSegment,
87 : pub(crate) current: PartialRemoteSegment,
88 : }
89 :
90 : impl State {
91 : /// Find an Uploaded segment. There should be only one Uploaded segment at a time.
92 0 : pub(crate) fn uploaded_segment(&self) -> Option<PartialRemoteSegment> {
93 0 : self.segments
94 0 : .iter()
95 0 : .find(|seg| seg.status == UploadStatus::Uploaded)
96 0 : .cloned()
97 0 : }
98 :
99 : /// Replace the name of the Uploaded segment (if one exists) in order to match
100 : /// it with `destination` safekeeper. Returns a description of the change or None
101 : /// wrapped in anyhow::Result.
102 0 : pub(crate) fn replace_uploaded_segment(
103 0 : &mut self,
104 0 : source: NodeId,
105 0 : destination: NodeId,
106 0 : ) -> anyhow::Result<Option<ReplaceUploadedSegment>> {
107 0 : let current = self
108 0 : .segments
109 0 : .iter_mut()
110 0 : .find(|seg| seg.status == UploadStatus::Uploaded);
111 :
112 0 : let current = match current {
113 0 : Some(some) => some,
114 : None => {
115 0 : return anyhow::Ok(None);
116 : }
117 : };
118 :
119 : // Sanity check that the partial segment we are replacing is belongs
120 : // to the `source` SK.
121 0 : if !current
122 0 : .name
123 0 : .ends_with(format!("sk{}.partial", source.0).as_str())
124 : {
125 0 : anyhow::bail!(
126 0 : "Partial segment name ({}) doesn't match self node id ({})",
127 0 : current.name,
128 0 : source
129 0 : );
130 0 : }
131 0 :
132 0 : let previous = current.clone();
133 0 :
134 0 : let new_name = current.name.replace(
135 0 : format!("_sk{}", source.0).as_str(),
136 0 : format!("_sk{}", destination.0).as_str(),
137 0 : );
138 0 :
139 0 : current.name = new_name;
140 0 :
141 0 : anyhow::Ok(Some(ReplaceUploadedSegment {
142 0 : previous,
143 0 : current: current.clone(),
144 0 : }))
145 0 : }
146 : }
147 :
148 : struct PartialBackup {
149 : wal_seg_size: usize,
150 : tli: WalResidentTimeline,
151 : conf: SafeKeeperConf,
152 : local_prefix: Utf8PathBuf,
153 : remote_timeline_path: RemotePath,
154 :
155 : state: State,
156 : }
157 :
158 : // Read-only methods for getting segment names
159 : impl PartialBackup {
160 0 : fn segno(&self, lsn: Lsn) -> XLogSegNo {
161 0 : lsn.segment_number(self.wal_seg_size)
162 0 : }
163 :
164 0 : fn segment_name(&self, segno: u64) -> String {
165 0 : XLogFileName(PG_TLI, segno, self.wal_seg_size)
166 0 : }
167 :
168 0 : fn remote_segment_name(
169 0 : &self,
170 0 : segno: u64,
171 0 : term: u64,
172 0 : commit_lsn: Lsn,
173 0 : flush_lsn: Lsn,
174 0 : ) -> String {
175 0 : format!(
176 0 : "{}_{}_{:016X}_{:016X}_sk{}.partial",
177 0 : self.segment_name(segno),
178 0 : term,
179 0 : flush_lsn.0,
180 0 : commit_lsn.0,
181 0 : self.conf.my_id.0,
182 0 : )
183 0 : }
184 :
185 0 : fn local_segment_name(&self, segno: u64) -> String {
186 0 : format!("{}.partial", self.segment_name(segno))
187 0 : }
188 : }
189 :
190 : impl PartialBackup {
191 : /// Takes a lock to read actual safekeeper state and returns a segment that should be uploaded.
192 0 : async fn prepare_upload(&self) -> PartialRemoteSegment {
193 : // this operation takes a lock to get the actual state
194 0 : let sk_info = self.tli.get_safekeeper_info(&self.conf).await;
195 0 : let flush_lsn = Lsn(sk_info.flush_lsn);
196 0 : let commit_lsn = Lsn(sk_info.commit_lsn);
197 0 : let last_log_term = sk_info.last_log_term;
198 0 : let segno = self.segno(flush_lsn);
199 0 :
200 0 : let name = self.remote_segment_name(segno, last_log_term, commit_lsn, flush_lsn);
201 0 :
202 0 : PartialRemoteSegment {
203 0 : status: UploadStatus::InProgress,
204 0 : name,
205 0 : commit_lsn,
206 0 : flush_lsn,
207 0 : term: last_log_term,
208 0 : }
209 0 : }
210 :
211 : /// Reads segment from disk and uploads it to the remote storage.
212 0 : async fn upload_segment(&mut self, prepared: PartialRemoteSegment) -> anyhow::Result<()> {
213 0 : let flush_lsn = prepared.flush_lsn;
214 0 : let segno = self.segno(flush_lsn);
215 0 :
216 0 : // We're going to backup bytes from the start of the segment up to flush_lsn.
217 0 : let backup_bytes = flush_lsn.segment_offset(self.wal_seg_size);
218 0 :
219 0 : let local_path = self.local_prefix.join(self.local_segment_name(segno));
220 0 : let remote_path = prepared.remote_path(&self.remote_timeline_path);
221 0 :
222 0 : // Upload first `backup_bytes` bytes of the segment to the remote storage.
223 0 : wal_backup::backup_partial_segment(&local_path, &remote_path, backup_bytes).await?;
224 0 : PARTIAL_BACKUP_UPLOADED_BYTES.inc_by(backup_bytes as u64);
225 :
226 : // We uploaded the segment, now let's verify that the data is still actual.
227 : // If the term changed, we cannot guarantee the validity of the uploaded data.
228 : // If the term is the same, we know the data is not corrupted.
229 0 : let sk_info = self.tli.get_safekeeper_info(&self.conf).await;
230 0 : if sk_info.last_log_term != prepared.term {
231 0 : anyhow::bail!("term changed during upload");
232 0 : }
233 0 : assert!(prepared.commit_lsn <= Lsn(sk_info.commit_lsn));
234 0 : assert!(prepared.flush_lsn <= Lsn(sk_info.flush_lsn));
235 :
236 0 : Ok(())
237 0 : }
238 :
239 : /// Write new state to disk. If in-memory and on-disk states diverged, returns an error.
240 0 : async fn commit_state(&mut self, new_state: State) -> anyhow::Result<()> {
241 0 : self.tli
242 0 : .map_control_file(|cf| {
243 0 : if cf.partial_backup != self.state {
244 0 : let memory = self.state.clone();
245 0 : self.state = cf.partial_backup.clone();
246 0 : anyhow::bail!(
247 0 : "partial backup state diverged, memory={:?}, disk={:?}",
248 0 : memory,
249 0 : cf.partial_backup
250 0 : );
251 0 : }
252 0 :
253 0 : cf.partial_backup = new_state.clone();
254 0 : Ok(())
255 0 : })
256 0 : .await?;
257 : // update in-memory state
258 0 : self.state = new_state;
259 0 : Ok(())
260 0 : }
261 :
262 : /// Upload the latest version of the partial segment and garbage collect older versions.
263 0 : #[instrument(name = "upload", skip_all, fields(name = %prepared.name))]
264 : async fn do_upload(&mut self, prepared: &PartialRemoteSegment) -> anyhow::Result<()> {
265 : let _timer = MISC_OPERATION_SECONDS
266 : .with_label_values(&["partial_do_upload"])
267 : .start_timer();
268 : info!("starting upload {:?}", prepared);
269 :
270 : let state_0 = self.state.clone();
271 : let state_1 = {
272 : let mut state = state_0.clone();
273 : state.segments.push(prepared.clone());
274 : state
275 : };
276 :
277 : // we're going to upload a new segment, let's write it to disk to make GC later
278 : self.commit_state(state_1).await?;
279 :
280 : self.upload_segment(prepared.clone()).await?;
281 :
282 : let state_2 = {
283 : let mut state = state_0.clone();
284 : for seg in state.segments.iter_mut() {
285 : seg.status = UploadStatus::Deleting;
286 : }
287 : let mut actual_remote_segment = prepared.clone();
288 : actual_remote_segment.status = UploadStatus::Uploaded;
289 : state.segments.push(actual_remote_segment);
290 : state
291 : };
292 :
293 : // we've uploaded new segment, it's actual, all other segments should be GCed
294 : self.commit_state(state_2).await?;
295 : self.gc().await?;
296 :
297 : Ok(())
298 : }
299 :
300 : /// Delete all non-Uploaded segments from the remote storage. There should be only one
301 : /// Uploaded segment at a time.
302 0 : #[instrument(name = "gc", skip_all)]
303 : async fn gc(&mut self) -> anyhow::Result<()> {
304 : let mut segments_to_delete = vec![];
305 :
306 : let new_segments: Vec<PartialRemoteSegment> = self
307 : .state
308 : .segments
309 : .iter()
310 0 : .filter_map(|seg| {
311 0 : if seg.status == UploadStatus::Uploaded {
312 0 : Some(seg.clone())
313 : } else {
314 0 : segments_to_delete.push(seg.name.clone());
315 0 : None
316 : }
317 0 : })
318 : .collect();
319 :
320 : if new_segments.len() == 1 {
321 : // we have an uploaded segment, it must not be deleted from remote storage
322 0 : segments_to_delete.retain(|name| name != &new_segments[0].name);
323 : } else {
324 : // there should always be zero or one uploaded segment
325 : assert!(
326 : new_segments.is_empty(),
327 : "too many uploaded segments: {:?}",
328 : new_segments
329 : );
330 : }
331 :
332 : info!("deleting objects: {:?}", segments_to_delete);
333 : let mut objects_to_delete = vec![];
334 : for seg in segments_to_delete.iter() {
335 : let remote_path = self.remote_timeline_path.join(seg);
336 : objects_to_delete.push(remote_path);
337 : }
338 :
339 : // removing segments from remote storage
340 : wal_backup::delete_objects(&objects_to_delete).await?;
341 :
342 : // now we can update the state on disk
343 : let new_state = {
344 : let mut state = self.state.clone();
345 : state.segments = new_segments;
346 : state
347 : };
348 : self.commit_state(new_state).await?;
349 :
350 : Ok(())
351 : }
352 : }
353 :
354 : /// Check if everything is uploaded and partial backup task doesn't need to run.
355 0 : pub(crate) fn needs_uploading(
356 0 : state: &StateSnapshot,
357 0 : uploaded: &Option<PartialRemoteSegment>,
358 0 : ) -> bool {
359 0 : match uploaded {
360 0 : Some(uploaded) => {
361 0 : uploaded.status != UploadStatus::Uploaded
362 0 : || uploaded.flush_lsn != state.flush_lsn
363 0 : || uploaded.commit_lsn != state.commit_lsn
364 0 : || uploaded.term != state.last_log_term
365 : }
366 0 : None => true,
367 : }
368 0 : }
369 :
370 : /// Main task for partial backup. It waits for the flush_lsn to change and then uploads the
371 : /// partial segment to the remote storage. It also does garbage collection of old segments.
372 : ///
373 : /// When there is nothing more to do and the last segment was successfully uploaded, the task
374 : /// returns PartialRemoteSegment, to signal readiness for offloading the timeline.
375 0 : #[instrument(name = "Partial backup", skip_all, fields(ttid = %tli.ttid))]
376 : pub async fn main_task(
377 : tli: WalResidentTimeline,
378 : conf: SafeKeeperConf,
379 : limiter: RateLimiter,
380 : ) -> Option<PartialRemoteSegment> {
381 : debug!("started");
382 : let await_duration = conf.partial_backup_timeout;
383 : let mut first_iteration = true;
384 :
385 : let (_, persistent_state) = tli.get_state().await;
386 : let mut commit_lsn_rx = tli.get_commit_lsn_watch_rx();
387 : let mut flush_lsn_rx = tli.get_term_flush_lsn_watch_rx();
388 : let wal_seg_size = tli.get_wal_seg_size().await;
389 :
390 : let local_prefix = tli.get_timeline_dir();
391 : let remote_timeline_path = match remote_timeline_path(&tli.ttid) {
392 : Ok(path) => path,
393 : Err(e) => {
394 : error!("failed to create remote path: {:?}", e);
395 : return None;
396 : }
397 : };
398 :
399 : let mut backup = PartialBackup {
400 : wal_seg_size,
401 : tli,
402 : state: persistent_state.partial_backup,
403 : conf,
404 : local_prefix,
405 : remote_timeline_path,
406 : };
407 :
408 : debug!("state: {:?}", backup.state);
409 :
410 : // The general idea is that each safekeeper keeps only one partial segment
411 : // both in remote storage and in local state. If this is not true, something
412 : // went wrong.
413 : const MAX_SIMULTANEOUS_SEGMENTS: usize = 10;
414 :
415 : 'outer: loop {
416 : if backup.state.segments.len() > MAX_SIMULTANEOUS_SEGMENTS {
417 : warn!(
418 : "too many segments in control_file state, running gc: {}",
419 : backup.state.segments.len()
420 : );
421 :
422 0 : backup.gc().await.unwrap_or_else(|e| {
423 0 : error!("failed to run gc: {:#}", e);
424 0 : });
425 : }
426 :
427 : // wait until we have something to upload
428 : let uploaded_segment = backup.state.uploaded_segment();
429 : if let Some(seg) = &uploaded_segment {
430 : // check if uploaded segment matches the current state
431 : if flush_lsn_rx.borrow().lsn == seg.flush_lsn
432 : && *commit_lsn_rx.borrow() == seg.commit_lsn
433 : && flush_lsn_rx.borrow().term == seg.term
434 : {
435 : // we have nothing to do, the last segment is already uploaded
436 : return Some(seg.clone());
437 : }
438 : }
439 :
440 : // if we don't have any data and zero LSNs, wait for something
441 : while flush_lsn_rx.borrow().lsn == Lsn(0) {
442 : tokio::select! {
443 : _ = backup.tli.cancel.cancelled() => {
444 : info!("timeline canceled");
445 : return None;
446 : }
447 : _ = flush_lsn_rx.changed() => {}
448 : }
449 : }
450 :
451 : // smoothing the load after restart, by sleeping for a random time.
452 : // if this is not the first iteration, we will wait for the full await_duration
453 : let await_duration = if first_iteration {
454 : first_iteration = false;
455 : rand_duration(&await_duration)
456 : } else {
457 : await_duration
458 : };
459 :
460 : // fixing the segno and waiting some time to prevent reuploading the same segment too often
461 : let pending_segno = backup.segno(flush_lsn_rx.borrow().lsn);
462 : let timeout = tokio::time::sleep(await_duration);
463 : tokio::pin!(timeout);
464 : let mut timeout_expired = false;
465 :
466 : // waiting until timeout expires OR segno changes
467 : 'inner: loop {
468 : tokio::select! {
469 : _ = backup.tli.cancel.cancelled() => {
470 : info!("timeline canceled");
471 : return None;
472 : }
473 : _ = commit_lsn_rx.changed() => {}
474 : _ = flush_lsn_rx.changed() => {
475 : let segno = backup.segno(flush_lsn_rx.borrow().lsn);
476 : if segno != pending_segno {
477 : // previous segment is no longer partial, aborting the wait
478 : break 'inner;
479 : }
480 : }
481 : _ = &mut timeout => {
482 : // timeout expired, now we are ready for upload
483 : timeout_expired = true;
484 : break 'inner;
485 : }
486 : }
487 : }
488 :
489 : if !timeout_expired {
490 : // likely segno has changed, let's try again in the next iteration
491 : continue 'outer;
492 : }
493 :
494 : // limit concurrent uploads
495 : let _upload_permit = limiter.acquire_partial_backup().await;
496 :
497 : let prepared = backup.prepare_upload().await;
498 : if let Some(seg) = &uploaded_segment {
499 : if seg.eq_without_status(&prepared) {
500 : // we already uploaded this segment, nothing to do
501 : continue 'outer;
502 : }
503 : }
504 :
505 : match backup.do_upload(&prepared).await {
506 : Ok(()) => {
507 : debug!(
508 : "uploaded {} up to flush_lsn {}",
509 : prepared.name, prepared.flush_lsn
510 : );
511 : PARTIAL_BACKUP_UPLOADS.with_label_values(&["ok"]).inc();
512 : }
513 : Err(e) => {
514 : info!("failed to upload {}: {:#}", prepared.name, e);
515 : PARTIAL_BACKUP_UPLOADS.with_label_values(&["error"]).inc();
516 : }
517 : }
518 : }
519 : }
|