Line data Source code
1 : //! Safekeeper timeline has a background task which is subscribed to `commit_lsn`
2 : //! and `flush_lsn` updates. After the partial segment was updated (`flush_lsn`
3 : //! was changed), the segment will be uploaded to S3 in about 15 minutes.
4 : //!
5 : //! The filename format for partial segments is
6 : //! `Segment_Term_Flush_Commit_skNN.partial`, where:
7 : //! - `Segment` – the segment name, like `000000010000000000000001`
8 : //! - `Term` – current term
9 : //! - `Flush` – flush_lsn in hex format `{:016X}`, e.g. `00000000346BC568`
10 : //! - `Commit` – commit_lsn in the same hex format
11 : //! - `NN` – safekeeper_id, like `1`
12 : //!
13 : //! The full object name example:
14 : //! `000000010000000000000002_2_0000000002534868_0000000002534410_sk1.partial`
15 : //!
16 : //! Each safekeeper will keep info about remote partial segments in its control
17 : //! file. Code updates state in the control file before doing any S3 operations.
18 : //! This way control file stores information about all potentially existing
19 : //! remote partial segments and can clean them up after uploading a newer version.
20 :
21 : use std::sync::Arc;
22 :
23 : use camino::Utf8PathBuf;
24 : use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI};
25 : use remote_storage::RemotePath;
26 : use serde::{Deserialize, Serialize};
27 :
28 : use tracing::{debug, error, info, instrument, warn};
29 : use utils::lsn::Lsn;
30 :
31 : use crate::{
32 : metrics::{MISC_OPERATION_SECONDS, PARTIAL_BACKUP_UPLOADED_BYTES, PARTIAL_BACKUP_UPLOADS},
33 : safekeeper::Term,
34 : timeline::WalResidentTimeline,
35 : timeline_manager::StateSnapshot,
36 : wal_backup::{self, remote_timeline_path},
37 : SafeKeeperConf,
38 : };
39 :
40 : #[derive(Clone)]
41 : pub struct RateLimiter {
42 : semaphore: Arc<tokio::sync::Semaphore>,
43 : }
44 :
45 : impl RateLimiter {
46 0 : pub fn new(permits: usize) -> Self {
47 0 : Self {
48 0 : semaphore: Arc::new(tokio::sync::Semaphore::new(permits)),
49 0 : }
50 0 : }
51 :
52 0 : async fn acquire_owned(&self) -> tokio::sync::OwnedSemaphorePermit {
53 0 : let _timer = MISC_OPERATION_SECONDS
54 0 : .with_label_values(&["partial_permit_acquire"])
55 0 : .start_timer();
56 0 : self.semaphore
57 0 : .clone()
58 0 : .acquire_owned()
59 0 : .await
60 0 : .expect("semaphore is closed")
61 0 : }
62 : }
63 :
64 0 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
65 : pub enum UploadStatus {
66 : /// Upload is in progress. This status should be used only for garbage collection,
67 : /// don't read data from the remote storage with this status.
68 : InProgress,
69 : /// Upload is finished. There is always at most one segment with this status.
70 : /// It means that the segment is actual and can be used.
71 : Uploaded,
72 : /// Deletion is in progress. This status should be used only for garbage collection,
73 : /// don't read data from the remote storage with this status.
74 : Deleting,
75 : }
76 :
77 0 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
78 : pub struct PartialRemoteSegment {
79 : pub status: UploadStatus,
80 : pub name: String,
81 : pub commit_lsn: Lsn,
82 : pub flush_lsn: Lsn,
83 : // We should use last_log_term here, otherwise it's possible to have inconsistent data in the
84 : // remote storage.
85 : //
86 : // More info here: https://github.com/neondatabase/neon/pull/8022#discussion_r1654738405
87 : pub term: Term,
88 : }
89 :
90 : impl PartialRemoteSegment {
91 0 : fn eq_without_status(&self, other: &Self) -> bool {
92 0 : self.name == other.name
93 0 : && self.commit_lsn == other.commit_lsn
94 0 : && self.flush_lsn == other.flush_lsn
95 0 : && self.term == other.term
96 0 : }
97 :
98 0 : pub(crate) fn remote_path(&self, remote_timeline_path: &RemotePath) -> RemotePath {
99 0 : remote_timeline_path.join(&self.name)
100 0 : }
101 : }
102 :
103 : // NB: these structures are a part of a control_file, you can't change them without
104 : // changing the control file format version.
105 6 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
106 : pub struct State {
107 : pub segments: Vec<PartialRemoteSegment>,
108 : }
109 :
110 : impl State {
111 : /// Find an Uploaded segment. There should be only one Uploaded segment at a time.
112 0 : pub(crate) fn uploaded_segment(&self) -> Option<PartialRemoteSegment> {
113 0 : self.segments
114 0 : .iter()
115 0 : .find(|seg| seg.status == UploadStatus::Uploaded)
116 0 : .cloned()
117 0 : }
118 : }
119 :
120 : struct PartialBackup {
121 : wal_seg_size: usize,
122 : tli: WalResidentTimeline,
123 : conf: SafeKeeperConf,
124 : local_prefix: Utf8PathBuf,
125 : remote_timeline_path: RemotePath,
126 :
127 : state: State,
128 : }
129 :
130 : // Read-only methods for getting segment names
131 : impl PartialBackup {
132 0 : fn segno(&self, lsn: Lsn) -> XLogSegNo {
133 0 : lsn.segment_number(self.wal_seg_size)
134 0 : }
135 :
136 0 : fn segment_name(&self, segno: u64) -> String {
137 0 : XLogFileName(PG_TLI, segno, self.wal_seg_size)
138 0 : }
139 :
140 0 : fn remote_segment_name(
141 0 : &self,
142 0 : segno: u64,
143 0 : term: u64,
144 0 : commit_lsn: Lsn,
145 0 : flush_lsn: Lsn,
146 0 : ) -> String {
147 0 : format!(
148 0 : "{}_{}_{:016X}_{:016X}_sk{}.partial",
149 0 : self.segment_name(segno),
150 0 : term,
151 0 : flush_lsn.0,
152 0 : commit_lsn.0,
153 0 : self.conf.my_id.0,
154 0 : )
155 0 : }
156 :
157 0 : fn local_segment_name(&self, segno: u64) -> String {
158 0 : format!("{}.partial", self.segment_name(segno))
159 0 : }
160 : }
161 :
162 : impl PartialBackup {
163 : /// Takes a lock to read actual safekeeper state and returns a segment that should be uploaded.
164 0 : async fn prepare_upload(&self) -> PartialRemoteSegment {
165 : // this operation takes a lock to get the actual state
166 0 : let sk_info = self.tli.get_safekeeper_info(&self.conf).await;
167 0 : let flush_lsn = Lsn(sk_info.flush_lsn);
168 0 : let commit_lsn = Lsn(sk_info.commit_lsn);
169 0 : let last_log_term = sk_info.last_log_term;
170 0 : let segno = self.segno(flush_lsn);
171 0 :
172 0 : let name = self.remote_segment_name(segno, last_log_term, commit_lsn, flush_lsn);
173 0 :
174 0 : PartialRemoteSegment {
175 0 : status: UploadStatus::InProgress,
176 0 : name,
177 0 : commit_lsn,
178 0 : flush_lsn,
179 0 : term: last_log_term,
180 0 : }
181 0 : }
182 :
183 : /// Reads segment from disk and uploads it to the remote storage.
184 0 : async fn upload_segment(&mut self, prepared: PartialRemoteSegment) -> anyhow::Result<()> {
185 0 : let flush_lsn = prepared.flush_lsn;
186 0 : let segno = self.segno(flush_lsn);
187 0 :
188 0 : // We're going to backup bytes from the start of the segment up to flush_lsn.
189 0 : let backup_bytes = flush_lsn.segment_offset(self.wal_seg_size);
190 0 :
191 0 : let local_path = self.local_prefix.join(self.local_segment_name(segno));
192 0 : let remote_path = prepared.remote_path(&self.remote_timeline_path);
193 0 :
194 0 : // Upload first `backup_bytes` bytes of the segment to the remote storage.
195 0 : wal_backup::backup_partial_segment(&local_path, &remote_path, backup_bytes).await?;
196 0 : PARTIAL_BACKUP_UPLOADED_BYTES.inc_by(backup_bytes as u64);
197 :
198 : // We uploaded the segment, now let's verify that the data is still actual.
199 : // If the term changed, we cannot guarantee the validity of the uploaded data.
200 : // If the term is the same, we know the data is not corrupted.
201 0 : let sk_info = self.tli.get_safekeeper_info(&self.conf).await;
202 0 : if sk_info.last_log_term != prepared.term {
203 0 : anyhow::bail!("term changed during upload");
204 0 : }
205 0 : assert!(prepared.commit_lsn <= Lsn(sk_info.commit_lsn));
206 0 : assert!(prepared.flush_lsn <= Lsn(sk_info.flush_lsn));
207 :
208 0 : Ok(())
209 0 : }
210 :
211 : /// Write new state to disk. If in-memory and on-disk states diverged, returns an error.
212 0 : async fn commit_state(&mut self, new_state: State) -> anyhow::Result<()> {
213 0 : self.tli
214 0 : .map_control_file(|cf| {
215 0 : if cf.partial_backup != self.state {
216 0 : let memory = self.state.clone();
217 0 : self.state = cf.partial_backup.clone();
218 0 : anyhow::bail!(
219 0 : "partial backup state diverged, memory={:?}, disk={:?}",
220 0 : memory,
221 0 : cf.partial_backup
222 0 : );
223 0 : }
224 0 :
225 0 : cf.partial_backup = new_state.clone();
226 0 : Ok(())
227 0 : })
228 0 : .await?;
229 : // update in-memory state
230 0 : self.state = new_state;
231 0 : Ok(())
232 0 : }
233 :
234 : /// Upload the latest version of the partial segment and garbage collect older versions.
235 0 : #[instrument(name = "upload", skip_all, fields(name = %prepared.name))]
236 : async fn do_upload(&mut self, prepared: &PartialRemoteSegment) -> anyhow::Result<()> {
237 : let _timer = MISC_OPERATION_SECONDS
238 : .with_label_values(&["partial_do_upload"])
239 : .start_timer();
240 : info!("starting upload {:?}", prepared);
241 :
242 : let state_0 = self.state.clone();
243 : let state_1 = {
244 : let mut state = state_0.clone();
245 : state.segments.push(prepared.clone());
246 : state
247 : };
248 :
249 : // we're going to upload a new segment, let's write it to disk to make GC later
250 : self.commit_state(state_1).await?;
251 :
252 : self.upload_segment(prepared.clone()).await?;
253 :
254 : let state_2 = {
255 : let mut state = state_0.clone();
256 : for seg in state.segments.iter_mut() {
257 : seg.status = UploadStatus::Deleting;
258 : }
259 : let mut actual_remote_segment = prepared.clone();
260 : actual_remote_segment.status = UploadStatus::Uploaded;
261 : state.segments.push(actual_remote_segment);
262 : state
263 : };
264 :
265 : // we've uploaded new segment, it's actual, all other segments should be GCed
266 : self.commit_state(state_2).await?;
267 : self.gc().await?;
268 :
269 : Ok(())
270 : }
271 :
272 : /// Delete all non-Uploaded segments from the remote storage. There should be only one
273 : /// Uploaded segment at a time.
274 0 : #[instrument(name = "gc", skip_all)]
275 : async fn gc(&mut self) -> anyhow::Result<()> {
276 : let mut segments_to_delete = vec![];
277 :
278 : let new_segments: Vec<PartialRemoteSegment> = self
279 : .state
280 : .segments
281 : .iter()
282 0 : .filter_map(|seg| {
283 0 : if seg.status == UploadStatus::Uploaded {
284 0 : Some(seg.clone())
285 : } else {
286 0 : segments_to_delete.push(seg.name.clone());
287 0 : None
288 : }
289 0 : })
290 : .collect();
291 :
292 : info!("deleting objects: {:?}", segments_to_delete);
293 : let mut objects_to_delete = vec![];
294 : for seg in segments_to_delete.iter() {
295 : let remote_path = self.remote_timeline_path.join(seg);
296 : objects_to_delete.push(remote_path);
297 : }
298 :
299 : // removing segments from remote storage
300 : wal_backup::delete_objects(&objects_to_delete).await?;
301 :
302 : // now we can update the state on disk
303 : let new_state = {
304 : let mut state = self.state.clone();
305 : state.segments = new_segments;
306 : state
307 : };
308 : self.commit_state(new_state).await?;
309 :
310 : Ok(())
311 : }
312 : }
313 :
314 : /// Check if everything is uploaded and partial backup task doesn't need to run.
315 0 : pub(crate) fn needs_uploading(
316 0 : state: &StateSnapshot,
317 0 : uploaded: &Option<PartialRemoteSegment>,
318 0 : ) -> bool {
319 0 : match uploaded {
320 0 : Some(uploaded) => {
321 0 : uploaded.status != UploadStatus::Uploaded
322 0 : || uploaded.flush_lsn != state.flush_lsn
323 0 : || uploaded.commit_lsn != state.commit_lsn
324 0 : || uploaded.term != state.last_log_term
325 : }
326 0 : None => true,
327 : }
328 0 : }
329 :
330 : /// Main task for partial backup. It waits for the flush_lsn to change and then uploads the
331 : /// partial segment to the remote storage. It also does garbage collection of old segments.
332 : ///
333 : /// When there is nothing more to do and the last segment was successfully uploaded, the task
334 : /// returns PartialRemoteSegment, to signal readiness for offloading the timeline.
335 0 : #[instrument(name = "Partial backup", skip_all, fields(ttid = %tli.ttid))]
336 : pub async fn main_task(
337 : tli: WalResidentTimeline,
338 : conf: SafeKeeperConf,
339 : limiter: RateLimiter,
340 : ) -> Option<PartialRemoteSegment> {
341 : debug!("started");
342 : let await_duration = conf.partial_backup_timeout;
343 :
344 : let (_, persistent_state) = tli.get_state().await;
345 : let mut commit_lsn_rx = tli.get_commit_lsn_watch_rx();
346 : let mut flush_lsn_rx = tli.get_term_flush_lsn_watch_rx();
347 : let wal_seg_size = tli.get_wal_seg_size().await;
348 :
349 : let local_prefix = tli.get_timeline_dir();
350 : let remote_timeline_path = match remote_timeline_path(&tli.ttid) {
351 : Ok(path) => path,
352 : Err(e) => {
353 : error!("failed to create remote path: {:?}", e);
354 : return None;
355 : }
356 : };
357 :
358 : let mut backup = PartialBackup {
359 : wal_seg_size,
360 : tli,
361 : state: persistent_state.partial_backup,
362 : conf,
363 : local_prefix,
364 : remote_timeline_path,
365 : };
366 :
367 : debug!("state: {:?}", backup.state);
368 :
369 : // The general idea is that each safekeeper keeps only one partial segment
370 : // both in remote storage and in local state. If this is not true, something
371 : // went wrong.
372 : const MAX_SIMULTANEOUS_SEGMENTS: usize = 10;
373 :
374 : 'outer: loop {
375 : if backup.state.segments.len() > MAX_SIMULTANEOUS_SEGMENTS {
376 : warn!(
377 : "too many segments in control_file state, running gc: {}",
378 : backup.state.segments.len()
379 : );
380 :
381 0 : backup.gc().await.unwrap_or_else(|e| {
382 0 : error!("failed to run gc: {:#}", e);
383 0 : });
384 : }
385 :
386 : // wait until we have something to upload
387 : let uploaded_segment = backup.state.uploaded_segment();
388 : if let Some(seg) = &uploaded_segment {
389 : // check if uploaded segment matches the current state
390 : if flush_lsn_rx.borrow().lsn == seg.flush_lsn
391 : && *commit_lsn_rx.borrow() == seg.commit_lsn
392 : && flush_lsn_rx.borrow().term == seg.term
393 : {
394 : // we have nothing to do, the last segment is already uploaded
395 : return Some(seg.clone());
396 : }
397 : }
398 :
399 : // if we don't have any data and zero LSNs, wait for something
400 : while flush_lsn_rx.borrow().lsn == Lsn(0) {
401 : tokio::select! {
402 : _ = backup.tli.cancel.cancelled() => {
403 : info!("timeline canceled");
404 : return None;
405 : }
406 : _ = flush_lsn_rx.changed() => {}
407 : }
408 : }
409 :
410 : // fixing the segno and waiting some time to prevent reuploading the same segment too often
411 : let pending_segno = backup.segno(flush_lsn_rx.borrow().lsn);
412 : let timeout = tokio::time::sleep(await_duration);
413 : tokio::pin!(timeout);
414 : let mut timeout_expired = false;
415 :
416 : // waiting until timeout expires OR segno changes
417 : 'inner: loop {
418 : tokio::select! {
419 : _ = backup.tli.cancel.cancelled() => {
420 : info!("timeline canceled");
421 : return None;
422 : }
423 : _ = commit_lsn_rx.changed() => {}
424 : _ = flush_lsn_rx.changed() => {
425 : let segno = backup.segno(flush_lsn_rx.borrow().lsn);
426 : if segno != pending_segno {
427 : // previous segment is no longer partial, aborting the wait
428 : break 'inner;
429 : }
430 : }
431 : _ = &mut timeout => {
432 : // timeout expired, now we are ready for upload
433 : timeout_expired = true;
434 : break 'inner;
435 : }
436 : }
437 : }
438 :
439 : if !timeout_expired {
440 : // likely segno has changed, let's try again in the next iteration
441 : continue 'outer;
442 : }
443 :
444 : // limit concurrent uploads
445 : let _upload_permit = limiter.acquire_owned().await;
446 :
447 : let prepared = backup.prepare_upload().await;
448 : if let Some(seg) = &uploaded_segment {
449 : if seg.eq_without_status(&prepared) {
450 : // we already uploaded this segment, nothing to do
451 : continue 'outer;
452 : }
453 : }
454 :
455 : match backup.do_upload(&prepared).await {
456 : Ok(()) => {
457 : debug!(
458 : "uploaded {} up to flush_lsn {}",
459 : prepared.name, prepared.flush_lsn
460 : );
461 : PARTIAL_BACKUP_UPLOADS.with_label_values(&["ok"]).inc();
462 : }
463 : Err(e) => {
464 : info!("failed to upload {}: {:#}", prepared.name, e);
465 : PARTIAL_BACKUP_UPLOADS.with_label_values(&["error"]).inc();
466 : }
467 : }
468 : }
469 : }
|