Line data Source code
1 : //! Safekeeper timeline has a background task which is subscribed to `commit_lsn`
2 : //! and `flush_lsn` updates. After the partial segment was updated (`flush_lsn`
3 : //! was changed), the segment will be uploaded to S3 in about 15 minutes.
4 : //!
5 : //! The filename format for partial segments is
6 : //! `Segment_Term_Flush_Commit_skNN.partial`, where:
7 : //! - `Segment` – the segment name, like `000000010000000000000001`
8 : //! - `Term` – current term
9 : //! - `Flush` – flush_lsn in hex format `{:016X}`, e.g. `00000000346BC568`
10 : //! - `Commit` – commit_lsn in the same hex format
11 : //! - `NN` – safekeeper_id, like `1`
12 : //!
13 : //! The full object name example:
14 : //! `000000010000000000000002_2_0000000002534868_0000000002534410_sk1.partial`
15 : //!
16 : //! Each safekeeper will keep info about remote partial segments in its control
17 : //! file. Code updates state in the control file before doing any S3 operations.
18 : //! This way control file stores information about all potentially existing
19 : //! remote partial segments and can clean them up after uploading a newer version.
20 :
21 : use camino::Utf8PathBuf;
22 : use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI};
23 : use remote_storage::RemotePath;
24 : use serde::{Deserialize, Serialize};
25 :
26 : use tracing::{debug, error, info, instrument, warn};
27 : use utils::lsn::Lsn;
28 :
29 : use crate::{
30 : metrics::{PARTIAL_BACKUP_UPLOADED_BYTES, PARTIAL_BACKUP_UPLOADS},
31 : safekeeper::Term,
32 : timeline::FullAccessTimeline,
33 : wal_backup::{self, remote_timeline_path},
34 : SafeKeeperConf,
35 : };
36 :
37 0 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
38 : pub enum UploadStatus {
39 : /// Upload is in progress
40 : InProgress,
41 : /// Upload is finished
42 : Uploaded,
43 : /// Deletion is in progress
44 : Deleting,
45 : }
46 :
47 0 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
48 : pub struct PartialRemoteSegment {
49 : pub status: UploadStatus,
50 : pub name: String,
51 : pub commit_lsn: Lsn,
52 : pub flush_lsn: Lsn,
53 : pub term: Term,
54 : }
55 :
56 : impl PartialRemoteSegment {
57 0 : fn eq_without_status(&self, other: &Self) -> bool {
58 0 : self.name == other.name
59 0 : && self.commit_lsn == other.commit_lsn
60 0 : && self.flush_lsn == other.flush_lsn
61 0 : && self.term == other.term
62 0 : }
63 : }
64 :
65 : // NB: these structures are a part of a control_file, you can't change them without
66 : // changing the control file format version.
67 6 : #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
68 : pub struct State {
69 : pub segments: Vec<PartialRemoteSegment>,
70 : }
71 :
72 : impl State {
73 : /// Find an Uploaded segment. There should be only one Uploaded segment at a time.
74 0 : fn uploaded_segment(&self) -> Option<PartialRemoteSegment> {
75 0 : self.segments
76 0 : .iter()
77 0 : .find(|seg| seg.status == UploadStatus::Uploaded)
78 0 : .cloned()
79 0 : }
80 : }
81 :
82 : struct PartialBackup {
83 : wal_seg_size: usize,
84 : tli: FullAccessTimeline,
85 : conf: SafeKeeperConf,
86 : local_prefix: Utf8PathBuf,
87 : remote_timeline_path: RemotePath,
88 :
89 : state: State,
90 : }
91 :
92 : // Read-only methods for getting segment names
93 : impl PartialBackup {
94 0 : fn segno(&self, lsn: Lsn) -> XLogSegNo {
95 0 : lsn.segment_number(self.wal_seg_size)
96 0 : }
97 :
98 0 : fn segment_name(&self, segno: u64) -> String {
99 0 : XLogFileName(PG_TLI, segno, self.wal_seg_size)
100 0 : }
101 :
102 0 : fn remote_segment_name(
103 0 : &self,
104 0 : segno: u64,
105 0 : term: u64,
106 0 : commit_lsn: Lsn,
107 0 : flush_lsn: Lsn,
108 0 : ) -> String {
109 0 : format!(
110 0 : "{}_{}_{:016X}_{:016X}_sk{}.partial",
111 0 : self.segment_name(segno),
112 0 : term,
113 0 : flush_lsn.0,
114 0 : commit_lsn.0,
115 0 : self.conf.my_id.0,
116 0 : )
117 0 : }
118 :
119 0 : fn local_segment_name(&self, segno: u64) -> String {
120 0 : format!("{}.partial", self.segment_name(segno))
121 0 : }
122 : }
123 :
124 : impl PartialBackup {
125 : /// Takes a lock to read actual safekeeper state and returns a segment that should be uploaded.
126 0 : async fn prepare_upload(&self) -> PartialRemoteSegment {
127 : // this operation takes a lock to get the actual state
128 0 : let sk_info = self.tli.get_safekeeper_info(&self.conf).await;
129 0 : let flush_lsn = Lsn(sk_info.flush_lsn);
130 0 : let commit_lsn = Lsn(sk_info.commit_lsn);
131 0 : let term = sk_info.term;
132 0 : let segno = self.segno(flush_lsn);
133 0 :
134 0 : let name = self.remote_segment_name(segno, term, commit_lsn, flush_lsn);
135 0 :
136 0 : PartialRemoteSegment {
137 0 : status: UploadStatus::InProgress,
138 0 : name,
139 0 : commit_lsn,
140 0 : flush_lsn,
141 0 : term,
142 0 : }
143 0 : }
144 :
145 : /// Reads segment from disk and uploads it to the remote storage.
146 0 : async fn upload_segment(&mut self, prepared: PartialRemoteSegment) -> anyhow::Result<()> {
147 0 : let flush_lsn = prepared.flush_lsn;
148 0 : let segno = self.segno(flush_lsn);
149 0 :
150 0 : // We're going to backup bytes from the start of the segment up to flush_lsn.
151 0 : let backup_bytes = flush_lsn.segment_offset(self.wal_seg_size);
152 0 :
153 0 : let local_path = self.local_prefix.join(self.local_segment_name(segno));
154 0 : let remote_path = self.remote_timeline_path.join(&prepared.name);
155 0 :
156 0 : // Upload first `backup_bytes` bytes of the segment to the remote storage.
157 0 : wal_backup::backup_partial_segment(&local_path, &remote_path, backup_bytes).await?;
158 0 : PARTIAL_BACKUP_UPLOADED_BYTES.inc_by(backup_bytes as u64);
159 :
160 : // We uploaded the segment, now let's verify that the data is still actual.
161 : // If the term changed, we cannot guarantee the validity of the uploaded data.
162 : // If the term is the same, we know the data is not corrupted.
163 0 : let sk_info = self.tli.get_safekeeper_info(&self.conf).await;
164 0 : if sk_info.term != prepared.term {
165 0 : anyhow::bail!("term changed during upload");
166 0 : }
167 0 : assert!(prepared.commit_lsn <= Lsn(sk_info.commit_lsn));
168 0 : assert!(prepared.flush_lsn <= Lsn(sk_info.flush_lsn));
169 :
170 0 : Ok(())
171 0 : }
172 :
173 : /// Write new state to disk. If in-memory and on-disk states diverged, returns an error.
174 0 : async fn commit_state(&mut self, new_state: State) -> anyhow::Result<()> {
175 0 : self.tli
176 0 : .map_control_file(|cf| {
177 0 : if cf.partial_backup != self.state {
178 0 : let memory = self.state.clone();
179 0 : self.state = cf.partial_backup.clone();
180 0 : anyhow::bail!(
181 0 : "partial backup state diverged, memory={:?}, disk={:?}",
182 0 : memory,
183 0 : cf.partial_backup
184 0 : );
185 0 : }
186 0 :
187 0 : cf.partial_backup = new_state.clone();
188 0 : Ok(())
189 0 : })
190 0 : .await?;
191 : // update in-memory state
192 0 : self.state = new_state;
193 0 : Ok(())
194 0 : }
195 :
196 : /// Upload the latest version of the partial segment and garbage collect older versions.
197 0 : #[instrument(name = "upload", skip_all, fields(name = %prepared.name))]
198 : async fn do_upload(&mut self, prepared: &PartialRemoteSegment) -> anyhow::Result<()> {
199 : info!("starting upload {:?}", prepared);
200 :
201 : let state_0 = self.state.clone();
202 : let state_1 = {
203 : let mut state = state_0.clone();
204 : state.segments.push(prepared.clone());
205 : state
206 : };
207 :
208 : // we're going to upload a new segment, let's write it to disk to make GC later
209 : self.commit_state(state_1).await?;
210 :
211 : self.upload_segment(prepared.clone()).await?;
212 :
213 : let state_2 = {
214 : let mut state = state_0.clone();
215 : for seg in state.segments.iter_mut() {
216 : seg.status = UploadStatus::Deleting;
217 : }
218 : let mut actual_remote_segment = prepared.clone();
219 : actual_remote_segment.status = UploadStatus::Uploaded;
220 : state.segments.push(actual_remote_segment);
221 : state
222 : };
223 :
224 : // we've uploaded new segment, it's actual, all other segments should be GCed
225 : self.commit_state(state_2).await?;
226 : self.gc().await?;
227 :
228 : Ok(())
229 : }
230 :
231 : /// Delete all non-Uploaded segments from the remote storage. There should be only one
232 : /// Uploaded segment at a time.
233 0 : #[instrument(name = "gc", skip_all)]
234 : async fn gc(&mut self) -> anyhow::Result<()> {
235 : let mut segments_to_delete = vec![];
236 :
237 : let new_segments: Vec<PartialRemoteSegment> = self
238 : .state
239 : .segments
240 : .iter()
241 0 : .filter_map(|seg| {
242 0 : if seg.status == UploadStatus::Uploaded {
243 0 : Some(seg.clone())
244 : } else {
245 0 : segments_to_delete.push(seg.name.clone());
246 0 : None
247 : }
248 0 : })
249 : .collect();
250 :
251 : info!("deleting objects: {:?}", segments_to_delete);
252 : let mut objects_to_delete = vec![];
253 : for seg in segments_to_delete.iter() {
254 : let remote_path = self.remote_timeline_path.join(seg);
255 : objects_to_delete.push(remote_path);
256 : }
257 :
258 : // removing segments from remote storage
259 : wal_backup::delete_objects(&objects_to_delete).await?;
260 :
261 : // now we can update the state on disk
262 : let new_state = {
263 : let mut state = self.state.clone();
264 : state.segments = new_segments;
265 : state
266 : };
267 : self.commit_state(new_state).await?;
268 :
269 : Ok(())
270 : }
271 : }
272 :
273 0 : #[instrument(name = "Partial backup", skip_all, fields(ttid = %tli.ttid))]
274 : pub async fn main_task(tli: FullAccessTimeline, conf: SafeKeeperConf) {
275 : debug!("started");
276 : let await_duration = conf.partial_backup_timeout;
277 :
278 : let (_, persistent_state) = tli.get_state().await;
279 : let mut commit_lsn_rx = tli.get_commit_lsn_watch_rx();
280 : let mut flush_lsn_rx = tli.get_term_flush_lsn_watch_rx();
281 : let wal_seg_size = tli.get_wal_seg_size().await;
282 :
283 : let local_prefix = tli.get_timeline_dir();
284 : let remote_timeline_path = match remote_timeline_path(&tli.ttid) {
285 : Ok(path) => path,
286 : Err(e) => {
287 : error!("failed to create remote path: {:?}", e);
288 : return;
289 : }
290 : };
291 :
292 : let mut backup = PartialBackup {
293 : wal_seg_size,
294 : tli,
295 : state: persistent_state.partial_backup,
296 : conf,
297 : local_prefix,
298 : remote_timeline_path,
299 : };
300 :
301 : debug!("state: {:?}", backup.state);
302 :
303 : // The general idea is that each safekeeper keeps only one partial segment
304 : // both in remote storage and in local state. If this is not true, something
305 : // went wrong.
306 : const MAX_SIMULTANEOUS_SEGMENTS: usize = 10;
307 :
308 : 'outer: loop {
309 : if backup.state.segments.len() > MAX_SIMULTANEOUS_SEGMENTS {
310 : warn!(
311 : "too many segments in control_file state, running gc: {}",
312 : backup.state.segments.len()
313 : );
314 :
315 0 : backup.gc().await.unwrap_or_else(|e| {
316 0 : error!("failed to run gc: {:#}", e);
317 0 : });
318 : }
319 :
320 : // wait until we have something to upload
321 : let uploaded_segment = backup.state.uploaded_segment();
322 : if let Some(seg) = &uploaded_segment {
323 : // if we already uploaded something, wait until we have something new
324 : while flush_lsn_rx.borrow().lsn == seg.flush_lsn
325 : && *commit_lsn_rx.borrow() == seg.commit_lsn
326 : && flush_lsn_rx.borrow().term == seg.term
327 : {
328 : tokio::select! {
329 : _ = backup.tli.cancel.cancelled() => {
330 : info!("timeline canceled");
331 : return;
332 : }
333 : _ = commit_lsn_rx.changed() => {}
334 : _ = flush_lsn_rx.changed() => {}
335 : }
336 : }
337 : }
338 :
339 : // if we don't have any data and zero LSNs, wait for something
340 : while flush_lsn_rx.borrow().lsn == Lsn(0) {
341 : tokio::select! {
342 : _ = backup.tli.cancel.cancelled() => {
343 : info!("timeline canceled");
344 : return;
345 : }
346 : _ = flush_lsn_rx.changed() => {}
347 : }
348 : }
349 :
350 : // fixing the segno and waiting some time to prevent reuploading the same segment too often
351 : let pending_segno = backup.segno(flush_lsn_rx.borrow().lsn);
352 : let timeout = tokio::time::sleep(await_duration);
353 : tokio::pin!(timeout);
354 : let mut timeout_expired = false;
355 :
356 : // waiting until timeout expires OR segno changes
357 : 'inner: loop {
358 : tokio::select! {
359 : _ = backup.tli.cancel.cancelled() => {
360 : info!("timeline canceled");
361 : return;
362 : }
363 : _ = commit_lsn_rx.changed() => {}
364 : _ = flush_lsn_rx.changed() => {
365 : let segno = backup.segno(flush_lsn_rx.borrow().lsn);
366 : if segno != pending_segno {
367 : // previous segment is no longer partial, aborting the wait
368 : break 'inner;
369 : }
370 : }
371 : _ = &mut timeout => {
372 : // timeout expired, now we are ready for upload
373 : timeout_expired = true;
374 : break 'inner;
375 : }
376 : }
377 : }
378 :
379 : if !timeout_expired {
380 : // likely segno has changed, let's try again in the next iteration
381 : continue 'outer;
382 : }
383 :
384 : let prepared = backup.prepare_upload().await;
385 : if let Some(seg) = &uploaded_segment {
386 : if seg.eq_without_status(&prepared) {
387 : // we already uploaded this segment, nothing to do
388 : continue 'outer;
389 : }
390 : }
391 :
392 : match backup.do_upload(&prepared).await {
393 : Ok(()) => {
394 : debug!(
395 : "uploaded {} up to flush_lsn {}",
396 : prepared.name, prepared.flush_lsn
397 : );
398 : PARTIAL_BACKUP_UPLOADS.with_label_values(&["ok"]).inc();
399 : }
400 : Err(e) => {
401 : info!("failed to upload {}: {:#}", prepared.name, e);
402 : PARTIAL_BACKUP_UPLOADS.with_label_values(&["error"]).inc();
403 : }
404 : }
405 : }
406 : }
|