Line data Source code
1 : //! Safekeeper timeline has a background task which is subscribed to `commit_lsn`
2 : //! and `flush_lsn` updates. After the partial segment was updated (`flush_lsn`
3 : //! was changed), the segment will be uploaded to S3 in about 15 minutes.
4 : //!
5 : //! The filename format for partial segments is
6 : //! `Segment_Term_Flush_Commit_skNN.partial`, where:
7 : //! - `Segment` – the segment name, like `000000010000000000000001`
8 : //! - `Term` – current term
9 : //! - `Flush` – flush_lsn in hex format `{:016X}`, e.g. `00000000346BC568`
10 : //! - `Commit` – commit_lsn in the same hex format
11 : //! - `NN` – safekeeper_id, like `1`
12 : //!
13 : //! The full object name example:
14 : //! `000000010000000000000002_2_0000000002534868_0000000002534410_sk1.partial`
15 : //!
16 : //! Each safekeeper will keep info about remote partial segments in its control
17 : //! file. Code updates state in the control file before doing any S3 operations.
18 : //! This way control file stores information about all potentially existing
19 : //! remote partial segments and can clean them up after uploading a newer version.
20 :
21 : use camino::Utf8PathBuf;
22 : use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI};
23 : use remote_storage::RemotePath;
24 : use serde::{Deserialize, Serialize};
25 :
26 : use tracing::{debug, error, info, instrument, warn};
27 : use utils::lsn::Lsn;
28 :
29 : use crate::{
30 : metrics::{PARTIAL_BACKUP_UPLOADED_BYTES, PARTIAL_BACKUP_UPLOADS},
31 : safekeeper::Term,
32 : timeline::WalResidentTimeline,
33 : timeline_manager::StateSnapshot,
34 : wal_backup::{self, remote_timeline_path},
35 : SafeKeeperConf,
36 : };
37 :
38 0 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
39 : pub enum UploadStatus {
40 : /// Upload is in progress. This status should be used only for garbage collection,
41 : /// don't read data from the remote storage with this status.
42 : InProgress,
43 : /// Upload is finished. There is always at most one segment with this status.
44 : /// It means that the segment is actual and can be used.
45 : Uploaded,
46 : /// Deletion is in progress. This status should be used only for garbage collection,
47 : /// don't read data from the remote storage with this status.
48 : Deleting,
49 : }
50 :
51 0 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
52 : pub struct PartialRemoteSegment {
53 : pub status: UploadStatus,
54 : pub name: String,
55 : pub commit_lsn: Lsn,
56 : pub flush_lsn: Lsn,
57 : // We should use last_log_term here, otherwise it's possible to have inconsistent data in the
58 : // remote storage.
59 : //
60 : // More info here: https://github.com/neondatabase/neon/pull/8022#discussion_r1654738405
61 : pub term: Term,
62 : }
63 :
64 : impl PartialRemoteSegment {
65 0 : fn eq_without_status(&self, other: &Self) -> bool {
66 0 : self.name == other.name
67 0 : && self.commit_lsn == other.commit_lsn
68 0 : && self.flush_lsn == other.flush_lsn
69 0 : && self.term == other.term
70 0 : }
71 :
72 0 : pub(crate) fn remote_path(&self, remote_timeline_path: &RemotePath) -> RemotePath {
73 0 : remote_timeline_path.join(&self.name)
74 0 : }
75 : }
76 :
77 : // NB: these structures are a part of a control_file, you can't change them without
78 : // changing the control file format version.
79 6 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
80 : pub struct State {
81 : pub segments: Vec<PartialRemoteSegment>,
82 : }
83 :
84 : impl State {
85 : /// Find an Uploaded segment. There should be only one Uploaded segment at a time.
86 0 : pub(crate) fn uploaded_segment(&self) -> Option<PartialRemoteSegment> {
87 0 : self.segments
88 0 : .iter()
89 0 : .find(|seg| seg.status == UploadStatus::Uploaded)
90 0 : .cloned()
91 0 : }
92 : }
93 :
94 : struct PartialBackup {
95 : wal_seg_size: usize,
96 : tli: WalResidentTimeline,
97 : conf: SafeKeeperConf,
98 : local_prefix: Utf8PathBuf,
99 : remote_timeline_path: RemotePath,
100 :
101 : state: State,
102 : }
103 :
104 : // Read-only methods for getting segment names
105 : impl PartialBackup {
106 0 : fn segno(&self, lsn: Lsn) -> XLogSegNo {
107 0 : lsn.segment_number(self.wal_seg_size)
108 0 : }
109 :
110 0 : fn segment_name(&self, segno: u64) -> String {
111 0 : XLogFileName(PG_TLI, segno, self.wal_seg_size)
112 0 : }
113 :
114 0 : fn remote_segment_name(
115 0 : &self,
116 0 : segno: u64,
117 0 : term: u64,
118 0 : commit_lsn: Lsn,
119 0 : flush_lsn: Lsn,
120 0 : ) -> String {
121 0 : format!(
122 0 : "{}_{}_{:016X}_{:016X}_sk{}.partial",
123 0 : self.segment_name(segno),
124 0 : term,
125 0 : flush_lsn.0,
126 0 : commit_lsn.0,
127 0 : self.conf.my_id.0,
128 0 : )
129 0 : }
130 :
131 0 : fn local_segment_name(&self, segno: u64) -> String {
132 0 : format!("{}.partial", self.segment_name(segno))
133 0 : }
134 : }
135 :
136 : impl PartialBackup {
137 : /// Takes a lock to read actual safekeeper state and returns a segment that should be uploaded.
138 0 : async fn prepare_upload(&self) -> PartialRemoteSegment {
139 : // this operation takes a lock to get the actual state
140 0 : let sk_info = self.tli.get_safekeeper_info(&self.conf).await;
141 0 : let flush_lsn = Lsn(sk_info.flush_lsn);
142 0 : let commit_lsn = Lsn(sk_info.commit_lsn);
143 0 : let last_log_term = sk_info.last_log_term;
144 0 : let segno = self.segno(flush_lsn);
145 0 :
146 0 : let name = self.remote_segment_name(segno, last_log_term, commit_lsn, flush_lsn);
147 0 :
148 0 : PartialRemoteSegment {
149 0 : status: UploadStatus::InProgress,
150 0 : name,
151 0 : commit_lsn,
152 0 : flush_lsn,
153 0 : term: last_log_term,
154 0 : }
155 0 : }
156 :
157 : /// Reads segment from disk and uploads it to the remote storage.
158 0 : async fn upload_segment(&mut self, prepared: PartialRemoteSegment) -> anyhow::Result<()> {
159 0 : let flush_lsn = prepared.flush_lsn;
160 0 : let segno = self.segno(flush_lsn);
161 0 :
162 0 : // We're going to backup bytes from the start of the segment up to flush_lsn.
163 0 : let backup_bytes = flush_lsn.segment_offset(self.wal_seg_size);
164 0 :
165 0 : let local_path = self.local_prefix.join(self.local_segment_name(segno));
166 0 : let remote_path = prepared.remote_path(&self.remote_timeline_path);
167 0 :
168 0 : // Upload first `backup_bytes` bytes of the segment to the remote storage.
169 0 : wal_backup::backup_partial_segment(&local_path, &remote_path, backup_bytes).await?;
170 0 : PARTIAL_BACKUP_UPLOADED_BYTES.inc_by(backup_bytes as u64);
171 :
172 : // We uploaded the segment, now let's verify that the data is still actual.
173 : // If the term changed, we cannot guarantee the validity of the uploaded data.
174 : // If the term is the same, we know the data is not corrupted.
175 0 : let sk_info = self.tli.get_safekeeper_info(&self.conf).await;
176 0 : if sk_info.last_log_term != prepared.term {
177 0 : anyhow::bail!("term changed during upload");
178 0 : }
179 0 : assert!(prepared.commit_lsn <= Lsn(sk_info.commit_lsn));
180 0 : assert!(prepared.flush_lsn <= Lsn(sk_info.flush_lsn));
181 :
182 0 : Ok(())
183 0 : }
184 :
185 : /// Write new state to disk. If in-memory and on-disk states diverged, returns an error.
186 0 : async fn commit_state(&mut self, new_state: State) -> anyhow::Result<()> {
187 0 : self.tli
188 0 : .map_control_file(|cf| {
189 0 : if cf.partial_backup != self.state {
190 0 : let memory = self.state.clone();
191 0 : self.state = cf.partial_backup.clone();
192 0 : anyhow::bail!(
193 0 : "partial backup state diverged, memory={:?}, disk={:?}",
194 0 : memory,
195 0 : cf.partial_backup
196 0 : );
197 0 : }
198 0 :
199 0 : cf.partial_backup = new_state.clone();
200 0 : Ok(())
201 0 : })
202 0 : .await?;
203 : // update in-memory state
204 0 : self.state = new_state;
205 0 : Ok(())
206 0 : }
207 :
208 : /// Upload the latest version of the partial segment and garbage collect older versions.
209 0 : #[instrument(name = "upload", skip_all, fields(name = %prepared.name))]
210 : async fn do_upload(&mut self, prepared: &PartialRemoteSegment) -> anyhow::Result<()> {
211 : info!("starting upload {:?}", prepared);
212 :
213 : let state_0 = self.state.clone();
214 : let state_1 = {
215 : let mut state = state_0.clone();
216 : state.segments.push(prepared.clone());
217 : state
218 : };
219 :
220 : // we're going to upload a new segment, let's write it to disk to make GC later
221 : self.commit_state(state_1).await?;
222 :
223 : self.upload_segment(prepared.clone()).await?;
224 :
225 : let state_2 = {
226 : let mut state = state_0.clone();
227 : for seg in state.segments.iter_mut() {
228 : seg.status = UploadStatus::Deleting;
229 : }
230 : let mut actual_remote_segment = prepared.clone();
231 : actual_remote_segment.status = UploadStatus::Uploaded;
232 : state.segments.push(actual_remote_segment);
233 : state
234 : };
235 :
236 : // we've uploaded new segment, it's actual, all other segments should be GCed
237 : self.commit_state(state_2).await?;
238 : self.gc().await?;
239 :
240 : Ok(())
241 : }
242 :
243 : /// Delete all non-Uploaded segments from the remote storage. There should be only one
244 : /// Uploaded segment at a time.
245 0 : #[instrument(name = "gc", skip_all)]
246 : async fn gc(&mut self) -> anyhow::Result<()> {
247 : let mut segments_to_delete = vec![];
248 :
249 : let new_segments: Vec<PartialRemoteSegment> = self
250 : .state
251 : .segments
252 : .iter()
253 0 : .filter_map(|seg| {
254 0 : if seg.status == UploadStatus::Uploaded {
255 0 : Some(seg.clone())
256 : } else {
257 0 : segments_to_delete.push(seg.name.clone());
258 0 : None
259 : }
260 0 : })
261 : .collect();
262 :
263 : info!("deleting objects: {:?}", segments_to_delete);
264 : let mut objects_to_delete = vec![];
265 : for seg in segments_to_delete.iter() {
266 : let remote_path = self.remote_timeline_path.join(seg);
267 : objects_to_delete.push(remote_path);
268 : }
269 :
270 : // removing segments from remote storage
271 : wal_backup::delete_objects(&objects_to_delete).await?;
272 :
273 : // now we can update the state on disk
274 : let new_state = {
275 : let mut state = self.state.clone();
276 : state.segments = new_segments;
277 : state
278 : };
279 : self.commit_state(new_state).await?;
280 :
281 : Ok(())
282 : }
283 : }
284 :
285 : /// Check if everything is uploaded and partial backup task doesn't need to run.
286 0 : pub(crate) fn needs_uploading(
287 0 : state: &StateSnapshot,
288 0 : uploaded: &Option<PartialRemoteSegment>,
289 0 : ) -> bool {
290 0 : match uploaded {
291 0 : Some(uploaded) => {
292 0 : uploaded.status != UploadStatus::Uploaded
293 0 : || uploaded.flush_lsn != state.flush_lsn
294 0 : || uploaded.commit_lsn != state.commit_lsn
295 0 : || uploaded.term != state.last_log_term
296 : }
297 0 : None => true,
298 : }
299 0 : }
300 :
301 : /// Main task for partial backup. It waits for the flush_lsn to change and then uploads the
302 : /// partial segment to the remote storage. It also does garbage collection of old segments.
303 : ///
304 : /// When there is nothing more to do and the last segment was successfully uploaded, the task
305 : /// returns PartialRemoteSegment, to signal readiness for offloading the timeline.
306 0 : #[instrument(name = "Partial backup", skip_all, fields(ttid = %tli.ttid))]
307 : pub async fn main_task(
308 : tli: WalResidentTimeline,
309 : conf: SafeKeeperConf,
310 : ) -> Option<PartialRemoteSegment> {
311 : debug!("started");
312 : let await_duration = conf.partial_backup_timeout;
313 :
314 : let (_, persistent_state) = tli.get_state().await;
315 : let mut commit_lsn_rx = tli.get_commit_lsn_watch_rx();
316 : let mut flush_lsn_rx = tli.get_term_flush_lsn_watch_rx();
317 : let wal_seg_size = tli.get_wal_seg_size().await;
318 :
319 : let local_prefix = tli.get_timeline_dir();
320 : let remote_timeline_path = match remote_timeline_path(&tli.ttid) {
321 : Ok(path) => path,
322 : Err(e) => {
323 : error!("failed to create remote path: {:?}", e);
324 : return None;
325 : }
326 : };
327 :
328 : let mut backup = PartialBackup {
329 : wal_seg_size,
330 : tli,
331 : state: persistent_state.partial_backup,
332 : conf,
333 : local_prefix,
334 : remote_timeline_path,
335 : };
336 :
337 : debug!("state: {:?}", backup.state);
338 :
339 : // The general idea is that each safekeeper keeps only one partial segment
340 : // both in remote storage and in local state. If this is not true, something
341 : // went wrong.
342 : const MAX_SIMULTANEOUS_SEGMENTS: usize = 10;
343 :
344 : 'outer: loop {
345 : if backup.state.segments.len() > MAX_SIMULTANEOUS_SEGMENTS {
346 : warn!(
347 : "too many segments in control_file state, running gc: {}",
348 : backup.state.segments.len()
349 : );
350 :
351 0 : backup.gc().await.unwrap_or_else(|e| {
352 0 : error!("failed to run gc: {:#}", e);
353 0 : });
354 : }
355 :
356 : // wait until we have something to upload
357 : let uploaded_segment = backup.state.uploaded_segment();
358 : if let Some(seg) = &uploaded_segment {
359 : // check if uploaded segment matches the current state
360 : if flush_lsn_rx.borrow().lsn == seg.flush_lsn
361 : && *commit_lsn_rx.borrow() == seg.commit_lsn
362 : && flush_lsn_rx.borrow().term == seg.term
363 : {
364 : // we have nothing to do, the last segment is already uploaded
365 : return Some(seg.clone());
366 : }
367 : }
368 :
369 : // if we don't have any data and zero LSNs, wait for something
370 : while flush_lsn_rx.borrow().lsn == Lsn(0) {
371 : tokio::select! {
372 : _ = backup.tli.cancel.cancelled() => {
373 : info!("timeline canceled");
374 : return None;
375 : }
376 : _ = flush_lsn_rx.changed() => {}
377 : }
378 : }
379 :
380 : // fixing the segno and waiting some time to prevent reuploading the same segment too often
381 : let pending_segno = backup.segno(flush_lsn_rx.borrow().lsn);
382 : let timeout = tokio::time::sleep(await_duration);
383 : tokio::pin!(timeout);
384 : let mut timeout_expired = false;
385 :
386 : // waiting until timeout expires OR segno changes
387 : 'inner: loop {
388 : tokio::select! {
389 : _ = backup.tli.cancel.cancelled() => {
390 : info!("timeline canceled");
391 : return None;
392 : }
393 : _ = commit_lsn_rx.changed() => {}
394 : _ = flush_lsn_rx.changed() => {
395 : let segno = backup.segno(flush_lsn_rx.borrow().lsn);
396 : if segno != pending_segno {
397 : // previous segment is no longer partial, aborting the wait
398 : break 'inner;
399 : }
400 : }
401 : _ = &mut timeout => {
402 : // timeout expired, now we are ready for upload
403 : timeout_expired = true;
404 : break 'inner;
405 : }
406 : }
407 : }
408 :
409 : if !timeout_expired {
410 : // likely segno has changed, let's try again in the next iteration
411 : continue 'outer;
412 : }
413 :
414 : let prepared = backup.prepare_upload().await;
415 : if let Some(seg) = &uploaded_segment {
416 : if seg.eq_without_status(&prepared) {
417 : // we already uploaded this segment, nothing to do
418 : continue 'outer;
419 : }
420 : }
421 :
422 : match backup.do_upload(&prepared).await {
423 : Ok(()) => {
424 : debug!(
425 : "uploaded {} up to flush_lsn {}",
426 : prepared.name, prepared.flush_lsn
427 : );
428 : PARTIAL_BACKUP_UPLOADS.with_label_values(&["ok"]).inc();
429 : }
430 : Err(e) => {
431 : info!("failed to upload {}: {:#}", prepared.name, e);
432 : PARTIAL_BACKUP_UPLOADS.with_label_values(&["error"]).inc();
433 : }
434 : }
435 : }
436 : }
|