Line data Source code
1 : //! This module has everything to deal with WAL -- reading and writing to disk.
2 : //!
3 : //! Safekeeper WAL is stored in the timeline directory, in format similar to pg_wal.
4 : //! PG timeline is always 1, so WAL segments are usually have names like this:
5 : //! - 000000010000000000000001
6 : //! - 000000010000000000000002.partial
7 : //!
8 : //! Note that last file has `.partial` suffix, that's different from postgres.
9 :
10 : use std::cmp::{max, min};
11 : use std::future::Future;
12 : use std::io::{ErrorKind, SeekFrom};
13 : use std::pin::Pin;
14 :
15 : use anyhow::{Context, Result, bail};
16 : use bytes::Bytes;
17 : use camino::{Utf8Path, Utf8PathBuf};
18 : use futures::future::BoxFuture;
19 : use postgres_ffi::v14::xlog_utils::{IsPartialXLogFileName, IsXLogFileName, XLogFromFileName};
20 : use postgres_ffi::waldecoder::WalStreamDecoder;
21 : use postgres_ffi::{PG_TLI, XLogFileName, XLogSegNo, dispatch_pgversion};
22 : use postgres_versioninfo::{PgMajorVersion, PgVersionId};
23 : use pq_proto::SystemId;
24 : use remote_storage::RemotePath;
25 : use std::sync::Arc;
26 : use tokio::fs::{self, File, OpenOptions, remove_file};
27 : use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
28 : use tracing::*;
29 : use utils::crashsafe::durable_rename;
30 : use utils::id::TenantTimelineId;
31 : use utils::lsn::Lsn;
32 :
33 : use crate::metrics::{
34 : REMOVED_WAL_SEGMENTS, WAL_DISK_IO_ERRORS, WAL_STORAGE_OPERATION_SECONDS, WalStorageMetrics,
35 : time_io_closure,
36 : };
37 : use crate::state::TimelinePersistentState;
38 : use crate::wal_backup::{WalBackup, read_object, remote_timeline_path};
39 :
40 : pub trait Storage {
41 : // Last written LSN.
42 : fn write_lsn(&self) -> Lsn;
43 : /// LSN of last durably stored WAL record.
44 : fn flush_lsn(&self) -> Lsn;
45 :
46 : /// Initialize segment by creating proper long header at the beginning of
47 : /// the segment and short header at the page of given LSN. This is only used
48 : /// for timeline initialization because compute will stream data only since
49 : /// init_lsn. Other segment headers are included in compute stream.
50 : fn initialize_first_segment(
51 : &mut self,
52 : init_lsn: Lsn,
53 : ) -> impl Future<Output = Result<()>> + Send;
54 :
55 : /// Write piece of WAL from buf to disk, but not necessarily sync it.
56 : fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> impl Future<Output = Result<()>> + Send;
57 :
58 : /// Truncate WAL at specified LSN, which must be the end of WAL record.
59 : fn truncate_wal(&mut self, end_pos: Lsn) -> impl Future<Output = Result<()>> + Send;
60 :
61 : /// Durably store WAL on disk, up to the last written WAL record.
62 : fn flush_wal(&mut self) -> impl Future<Output = Result<()>> + Send;
63 :
64 : /// Remove all segments <= given segno. Returns function doing that as we
65 : /// want to perform it without timeline lock.
66 : fn remove_up_to(&self, segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>>;
67 :
68 : /// Release resources associated with the storage -- technically, close FDs.
69 : /// Currently we don't remove timelines until restart (#3146), so need to
70 : /// spare descriptors. This would be useful for temporary tli detach as
71 : /// well.
72 0 : fn close(&mut self) {}
73 :
74 : /// Get metrics for this timeline.
75 : fn get_metrics(&self) -> WalStorageMetrics;
76 : }
77 :
78 : /// PhysicalStorage is a storage that stores WAL on disk. Writes are separated from flushes
79 : /// for better performance. Storage is initialized in the constructor.
80 : ///
81 : /// WAL is stored in segments, each segment is a file. Last segment has ".partial" suffix in
82 : /// its filename and may be not fully flushed.
83 : ///
84 : /// Relationship of LSNs:
85 : /// `write_lsn` >= `write_record_lsn` >= `flush_record_lsn`
86 : ///
87 : /// When storage is created first time, all LSNs are zeroes and there are no segments on disk.
88 : pub struct PhysicalStorage {
89 : metrics: WalStorageMetrics,
90 : timeline_dir: Utf8PathBuf,
91 :
92 : /// Disables fsync if true.
93 : no_sync: bool,
94 :
95 : /// Size of WAL segment in bytes.
96 : wal_seg_size: usize,
97 : pg_version: PgVersionId,
98 : system_id: u64,
99 :
100 : /// Written to disk, but possibly still in the cache and not fully persisted.
101 : /// Also can be ahead of record_lsn, if happen to be in the middle of a WAL record.
102 : write_lsn: Lsn,
103 :
104 : /// The LSN of the last WAL record written to disk. Still can be not fully
105 : /// flushed.
106 : ///
107 : /// Note: Normally it (and flush_record_lsn) is <= write_lsn, but after xlog
108 : /// switch ingest the reverse is true because we don't bump write_lsn up to
109 : /// the next segment: WAL stream from the compute doesn't have the gap and
110 : /// for simplicity / as a sanity check we disallow any non-sequential
111 : /// writes, so write zeros as is.
112 : ///
113 : /// Similar effect is in theory possible due to LSN alignment: if record
114 : /// ends at *2, decoder will report end lsn as *8 even though we haven't
115 : /// written these zeros yet. In practice compute likely never sends
116 : /// non-aligned chunks of data.
117 : write_record_lsn: Lsn,
118 :
119 : /// The last LSN flushed to disk. May be in the middle of a record.
120 : ///
121 : /// NB: when the rest of the system refers to `flush_lsn`, it usually
122 : /// actually refers to `flush_record_lsn`. This ambiguity can be dangerous
123 : /// and should be resolved.
124 : flush_lsn: Lsn,
125 :
126 : /// The LSN of the last WAL record flushed to disk.
127 : flush_record_lsn: Lsn,
128 :
129 : /// Decoder is required for detecting boundaries of WAL records.
130 : decoder: WalStreamDecoder,
131 :
132 : /// Cached open file for the last segment.
133 : ///
134 : /// If Some(file) is open, then it always:
135 : /// - has ".partial" suffix
136 : /// - points to write_lsn, so no seek is needed for writing
137 : /// - doesn't point to the end of the segment
138 : file: Option<File>,
139 :
140 : /// When true, WAL truncation potentially has been interrupted and we need
141 : /// to finish it before allowing WAL writes; see truncate_wal for details.
142 : /// In this case [`write_lsn`] can be less than actually written WAL on
143 : /// disk. In particular, there can be a case with unexpected .partial file.
144 : ///
145 : /// Imagine the following:
146 : /// - 000000010000000000000001
147 : /// - it was fully written, but the last record is split between 2
148 : /// segments
149 : /// - after restart, `find_end_of_wal()` returned 0/1FFFFF0, which is in
150 : /// the end of this segment
151 : /// - `write_lsn`, `write_record_lsn` and `flush_record_lsn` were
152 : /// initialized to 0/1FFFFF0
153 : /// - 000000010000000000000002.partial
154 : /// - it has only 1 byte written, which is not enough to make a full WAL
155 : /// record
156 : ///
157 : /// Partial segment 002 has no WAL records, and it will be removed by the
158 : /// next truncate_wal(). This flag will be set to false after the first
159 : /// successful truncate_wal() call.
160 : ///
161 : /// [`write_lsn`]: Self::write_lsn
162 : pending_wal_truncation: bool,
163 : }
164 :
165 : impl PhysicalStorage {
166 : /// Create new storage. If commit_lsn is not zero, flush_lsn is tried to be restored from
167 : /// the disk. Otherwise, all LSNs are set to zero.
168 5 : pub fn new(
169 5 : ttid: &TenantTimelineId,
170 5 : timeline_dir: &Utf8Path,
171 5 : state: &TimelinePersistentState,
172 5 : no_sync: bool,
173 5 : ) -> Result<PhysicalStorage> {
174 5 : let wal_seg_size = state.server.wal_seg_size as usize;
175 :
176 : // Find out where stored WAL ends, starting at commit_lsn which is a
177 : // known recent record boundary (unless we don't have WAL at all).
178 : //
179 : // NB: find_end_of_wal MUST be backwards compatible with the previously
180 : // written WAL. If find_end_of_wal fails to read any WAL written by an
181 : // older version of the code, we could lose data forever.
182 5 : let write_lsn = if state.commit_lsn == Lsn(0) {
183 5 : Lsn(0)
184 : } else {
185 0 : let version = PgMajorVersion::try_from(state.server.pg_version).unwrap();
186 :
187 0 : dispatch_pgversion!(
188 0 : version,
189 0 : pgv::xlog_utils::find_end_of_wal(
190 0 : timeline_dir.as_std_path(),
191 0 : wal_seg_size,
192 0 : state.commit_lsn,
193 0 : )?,
194 0 : bail!("unsupported postgres version: {}", version)
195 : )
196 : };
197 :
198 : // note: this assumes we fsync'ed whole datadir on start.
199 5 : let flush_lsn = write_lsn;
200 :
201 5 : debug!(
202 0 : "initialized storage for timeline {}, flush_lsn={}, commit_lsn={}, peer_horizon_lsn={}",
203 : ttid.timeline_id, flush_lsn, state.commit_lsn, state.peer_horizon_lsn,
204 : );
205 5 : if flush_lsn < state.commit_lsn {
206 : // note: can never happen. find_end_of_wal returns provided start_lsn
207 : // (state.commit_lsn in our case) if it doesn't find anything.
208 0 : bail!(
209 0 : "timeline {} potential data loss: flush_lsn {} by find_end_of_wal is less than commit_lsn {} from control file",
210 : ttid.timeline_id,
211 : flush_lsn,
212 : state.commit_lsn
213 : );
214 5 : }
215 5 : if flush_lsn < state.peer_horizon_lsn {
216 0 : warn!(
217 0 : "timeline {}: flush_lsn {} is less than cfile peer_horizon_lsn {}",
218 : ttid.timeline_id, flush_lsn, state.peer_horizon_lsn
219 : );
220 5 : }
221 :
222 5 : Ok(PhysicalStorage {
223 5 : metrics: WalStorageMetrics::default(),
224 5 : timeline_dir: timeline_dir.to_path_buf(),
225 5 : no_sync,
226 5 : wal_seg_size,
227 5 : pg_version: state.server.pg_version,
228 5 : system_id: state.server.system_id,
229 5 : write_lsn,
230 5 : write_record_lsn: write_lsn,
231 5 : flush_lsn,
232 5 : flush_record_lsn: flush_lsn,
233 5 : decoder: WalStreamDecoder::new(
234 5 : write_lsn,
235 5 : PgMajorVersion::try_from(state.server.pg_version).unwrap(),
236 5 : ),
237 5 : file: None,
238 5 : pending_wal_truncation: true,
239 5 : })
240 5 : }
241 :
242 : /// Get all known state of the storage.
243 0 : pub fn internal_state(&self) -> (Lsn, Lsn, Lsn, bool) {
244 0 : (
245 0 : self.write_lsn,
246 0 : self.write_record_lsn,
247 0 : self.flush_record_lsn,
248 0 : self.file.is_some(),
249 0 : )
250 0 : }
251 :
252 : /// Call fsync if config requires so.
253 5 : async fn fsync_file(&mut self, file: &File) -> Result<()> {
254 5 : if !self.no_sync {
255 5 : self.metrics
256 5 : .observe_flush_seconds(time_io_closure(file.sync_all()).await?);
257 0 : }
258 5 : Ok(())
259 5 : }
260 :
261 : /// Call fdatasync if config requires so.
262 620 : async fn fdatasync_file(&mut self, file: &File) -> Result<()> {
263 620 : if !self.no_sync {
264 620 : self.metrics
265 620 : .observe_flush_seconds(time_io_closure(file.sync_data()).await?);
266 0 : }
267 620 : Ok(())
268 620 : }
269 :
270 : /// Open or create WAL segment file. Caller must call seek to the wanted position.
271 : /// Returns `file` and `is_partial`.
272 15 : async fn open_or_create(&mut self, segno: XLogSegNo) -> Result<(File, bool)> {
273 15 : let (wal_file_path, wal_file_partial_path) =
274 15 : wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size);
275 :
276 : // Try to open already completed segment
277 15 : if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path).await {
278 0 : Ok((file, false))
279 15 : } else if let Ok(file) = OpenOptions::new()
280 15 : .write(true)
281 15 : .open(&wal_file_partial_path)
282 15 : .await
283 : {
284 : // Try to open existing partial file
285 10 : Ok((file, true))
286 : } else {
287 5 : let _timer = WAL_STORAGE_OPERATION_SECONDS
288 5 : .with_label_values(&["initialize_segment"])
289 5 : .start_timer();
290 : // Create and fill new partial file
291 : //
292 : // We're using fdatasync during WAL writing, so file size must not
293 : // change; to this end it is filled with zeros here. To avoid using
294 : // half initialized segment, first bake it under tmp filename and
295 : // then rename.
296 5 : let tmp_path = self.timeline_dir.join("waltmp");
297 5 : let file: File = File::create(&tmp_path).await.with_context(|| {
298 : /* BEGIN_HADRON */
299 0 : WAL_DISK_IO_ERRORS.inc();
300 : /* END_HADRON */
301 0 : format!("Failed to open tmp wal file {:?}", &tmp_path)
302 0 : })?;
303 :
304 5 : fail::fail_point!("sk-zero-segment", |_| {
305 0 : info!("sk-zero-segment failpoint hit");
306 0 : Err(anyhow::anyhow!("failpoint: sk-zero-segment"))
307 0 : });
308 5 : file.set_len(self.wal_seg_size as u64).await?;
309 :
310 5 : if let Err(e) = durable_rename(&tmp_path, &wal_file_partial_path, !self.no_sync).await {
311 : // Probably rename succeeded, but fsync of it failed. Remove
312 : // the file then to avoid using it.
313 0 : remove_file(wal_file_partial_path)
314 0 : .await
315 0 : .or_else(utils::fs_ext::ignore_not_found)?;
316 0 : return Err(e.into());
317 5 : }
318 5 : Ok((file, true))
319 : }
320 15 : }
321 :
322 : /// Write WAL bytes, which are known to be located in a single WAL segment. Returns true if the
323 : /// segment was completed, closed, and flushed to disk.
324 620 : async fn write_in_segment(&mut self, segno: u64, xlogoff: usize, buf: &[u8]) -> Result<bool> {
325 620 : let mut file = if let Some(file) = self.file.take() {
326 615 : file
327 : } else {
328 5 : let (mut file, is_partial) = self.open_or_create(segno).await?;
329 5 : assert!(is_partial, "unexpected write into non-partial segment file");
330 5 : file.seek(SeekFrom::Start(xlogoff as u64)).await?;
331 5 : file
332 : };
333 :
334 620 : file.write_all(buf).await?;
335 : // Note: flush just ensures write above reaches the OS (this is not
336 : // needed in case of sync IO as Write::write there calls directly write
337 : // syscall, but needed in case of async). It does *not* fsyncs the file.
338 620 : file.flush().await?;
339 :
340 620 : if xlogoff + buf.len() == self.wal_seg_size {
341 : // If we reached the end of a WAL segment, flush and close it.
342 0 : self.fdatasync_file(&file).await?;
343 :
344 : // Rename partial file to completed file
345 0 : let (wal_file_path, wal_file_partial_path) =
346 0 : wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size);
347 0 : fs::rename(wal_file_partial_path, wal_file_path).await?;
348 0 : Ok(true)
349 : } else {
350 : // otherwise, file can be reused later
351 620 : self.file = Some(file);
352 620 : Ok(false)
353 : }
354 620 : }
355 :
356 : /// Writes WAL to the segment files, until everything is writed. If some segments
357 : /// are fully written, they are flushed to disk. The last (partial) segment can
358 : /// be flushed separately later.
359 : ///
360 : /// Updates `write_lsn` and `flush_lsn`.
361 620 : async fn write_exact(&mut self, pos: Lsn, mut buf: &[u8]) -> Result<()> {
362 : // TODO: this shouldn't be possible, except possibly with write_lsn == 0.
363 : // Rename this method to `append_exact`, and make it append-only, removing
364 : // the `pos` parameter and this check. For this reason, we don't update
365 : // `flush_lsn` here.
366 620 : if self.write_lsn != pos {
367 : // need to flush the file before discarding it
368 0 : if let Some(file) = self.file.take() {
369 0 : self.fdatasync_file(&file).await?;
370 0 : }
371 :
372 0 : self.write_lsn = pos;
373 620 : }
374 :
375 1240 : while !buf.is_empty() {
376 : // Extract WAL location for this block
377 620 : let xlogoff = self.write_lsn.segment_offset(self.wal_seg_size);
378 620 : let segno = self.write_lsn.segment_number(self.wal_seg_size);
379 :
380 : // If crossing a WAL boundary, only write up until we reach wal segment size.
381 620 : let bytes_write = if xlogoff + buf.len() > self.wal_seg_size {
382 0 : self.wal_seg_size - xlogoff
383 : } else {
384 620 : buf.len()
385 : };
386 :
387 620 : let flushed = self
388 620 : .write_in_segment(segno, xlogoff, &buf[..bytes_write])
389 620 : .await
390 : /* BEGIN_HADRON */
391 620 : .inspect_err(|_| WAL_DISK_IO_ERRORS.inc())?;
392 : /* END_HADRON */
393 :
394 620 : self.write_lsn += bytes_write as u64;
395 620 : if flushed {
396 0 : self.flush_lsn = self.write_lsn;
397 620 : }
398 620 : buf = &buf[bytes_write..];
399 : }
400 :
401 620 : Ok(())
402 620 : }
403 : }
404 :
405 : impl Storage for PhysicalStorage {
406 : // Last written LSN.
407 620 : fn write_lsn(&self) -> Lsn {
408 620 : self.write_lsn
409 620 : }
410 : /// flush_lsn returns LSN of last durably stored WAL record.
411 : ///
412 : /// TODO: flush_lsn() returns flush_record_lsn, but write_lsn() returns write_lsn: confusing.
413 : #[allow(clippy::misnamed_getters)]
414 5161 : fn flush_lsn(&self) -> Lsn {
415 5161 : self.flush_record_lsn
416 5161 : }
417 :
418 5 : async fn initialize_first_segment(&mut self, init_lsn: Lsn) -> Result<()> {
419 5 : let _timer = WAL_STORAGE_OPERATION_SECONDS
420 5 : .with_label_values(&["initialize_first_segment"])
421 5 : .start_timer();
422 :
423 5 : let segno = init_lsn.segment_number(self.wal_seg_size);
424 5 : let (mut file, _) = self.open_or_create(segno).await?;
425 5 : let major_pg_version = PgMajorVersion::try_from(self.pg_version).unwrap();
426 5 : let wal_seg =
427 5 : postgres_ffi::generate_wal_segment(segno, self.system_id, major_pg_version, init_lsn)?;
428 5 : file.seek(SeekFrom::Start(0)).await?;
429 5 : file.write_all(&wal_seg).await?;
430 5 : file.flush().await?;
431 5 : info!("initialized segno {} at lsn {}", segno, init_lsn);
432 : // note: file is *not* fsynced
433 5 : Ok(())
434 5 : }
435 :
436 : /// Write WAL to disk.
437 620 : async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
438 : // Disallow any non-sequential writes, which can result in gaps or overwrites.
439 : // If we need to move the pointer, use truncate_wal() instead.
440 620 : if self.write_lsn > startpos {
441 0 : bail!(
442 0 : "write_wal rewrites WAL written before, write_lsn={}, startpos={}",
443 : self.write_lsn,
444 : startpos
445 : );
446 620 : }
447 620 : if self.write_lsn < startpos && self.write_lsn != Lsn(0) {
448 0 : bail!(
449 0 : "write_wal creates gap in written WAL, write_lsn={}, startpos={}",
450 : self.write_lsn,
451 : startpos
452 : );
453 620 : }
454 620 : if self.pending_wal_truncation {
455 0 : bail!(
456 0 : "write_wal called with pending WAL truncation, write_lsn={}, startpos={}",
457 : self.write_lsn,
458 : startpos
459 : );
460 620 : }
461 :
462 620 : let write_seconds = time_io_closure(self.write_exact(startpos, buf)).await?;
463 : // WAL is written, updating write metrics
464 620 : self.metrics.observe_write_seconds(write_seconds);
465 620 : self.metrics.observe_write_bytes(buf.len());
466 :
467 : // Figure out the last record's end LSN and update `write_record_lsn`
468 : // (if we got a whole record). The write may also have closed and
469 : // flushed a segment, so update `flush_record_lsn` as well.
470 620 : if self.decoder.available() != startpos {
471 5 : info!(
472 0 : "restart decoder from {} to {}",
473 0 : self.decoder.available(),
474 : startpos,
475 : );
476 5 : let pg_version = self.decoder.pg_version;
477 5 : self.decoder = WalStreamDecoder::new(startpos, pg_version);
478 615 : }
479 620 : self.decoder.feed_bytes(buf);
480 :
481 620 : if self.write_record_lsn <= self.flush_lsn {
482 620 : // We may have flushed a previously written record.
483 620 : self.flush_record_lsn = self.write_record_lsn;
484 620 : }
485 1240 : while let Some((lsn, _rec)) = self.decoder.poll_decode()? {
486 620 : self.write_record_lsn = lsn;
487 620 : if lsn <= self.flush_lsn {
488 0 : self.flush_record_lsn = lsn;
489 620 : }
490 : }
491 :
492 620 : Ok(())
493 620 : }
494 :
495 620 : async fn flush_wal(&mut self) -> Result<()> {
496 620 : if self.flush_record_lsn == self.write_record_lsn {
497 : // no need to do extra flush
498 0 : return Ok(());
499 620 : }
500 :
501 620 : if let Some(unflushed_file) = self.file.take() {
502 620 : self.fdatasync_file(&unflushed_file)
503 620 : .await
504 : /* BEGIN_HADRON */
505 620 : .inspect_err(|_| WAL_DISK_IO_ERRORS.inc())?;
506 : /* END_HADRON */
507 620 : self.file = Some(unflushed_file);
508 : } else {
509 : // We have unflushed data (write_lsn != flush_lsn), but no file. This
510 : // shouldn't happen, since the segment is flushed on close.
511 0 : bail!(
512 0 : "unexpected unflushed data with no open file, write_lsn={}, flush_lsn={}",
513 : self.write_lsn,
514 : self.flush_record_lsn
515 : );
516 : }
517 :
518 : // everything is flushed now, let's update flush_lsn
519 620 : self.flush_lsn = self.write_lsn;
520 620 : self.flush_record_lsn = self.write_record_lsn;
521 620 : Ok(())
522 620 : }
523 :
524 : /// Truncate written WAL by removing all WAL segments after the given LSN.
525 : /// end_pos must point to the end of the WAL record.
526 5 : async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
527 5 : let _timer = WAL_STORAGE_OPERATION_SECONDS
528 5 : .with_label_values(&["truncate_wal"])
529 5 : .start_timer();
530 :
531 : // Streaming must not create a hole, so truncate cannot be called on
532 : // non-written lsn.
533 5 : if self.write_record_lsn != Lsn(0) && end_pos > self.write_record_lsn {
534 0 : bail!(
535 0 : "truncate_wal called on non-written WAL, write_record_lsn={}, end_pos={}",
536 : self.write_record_lsn,
537 : end_pos
538 : );
539 5 : }
540 :
541 : // Quick exit if nothing to do and we know that the state is clean to
542 : // avoid writing up to 16 MiB of zeros on disk (this happens on each
543 : // connect).
544 5 : if !self.pending_wal_truncation
545 0 : && end_pos == self.write_lsn
546 0 : && end_pos == self.flush_record_lsn
547 : {
548 0 : return Ok(());
549 5 : }
550 :
551 : // Atomicity: we start with LSNs reset because once on disk deletion is
552 : // started it can't be reversed. However, we might crash/error in the
553 : // middle, leaving garbage above the truncation point. In theory,
554 : // concatenated with previous records it might form bogus WAL (though
555 : // very unlikely in practice because CRC would guard from that). To
556 : // protect, set pending_wal_truncation flag before beginning: it means
557 : // truncation must be retried and WAL writes are prohibited until it
558 : // succeeds. Flag is also set on boot because we don't know if the last
559 : // state was clean.
560 : //
561 : // Protocol (HandleElected before first AppendRequest) ensures we'll
562 : // always try to ensure clean truncation before any writes.
563 5 : self.pending_wal_truncation = true;
564 :
565 5 : self.write_lsn = end_pos;
566 5 : self.flush_lsn = end_pos;
567 5 : self.write_record_lsn = end_pos;
568 5 : self.flush_record_lsn = end_pos;
569 :
570 : // Close previously opened file, if any
571 5 : if let Some(unflushed_file) = self.file.take() {
572 0 : self.fdatasync_file(&unflushed_file).await?;
573 5 : }
574 :
575 5 : let xlogoff = end_pos.segment_offset(self.wal_seg_size);
576 5 : let segno = end_pos.segment_number(self.wal_seg_size);
577 :
578 : // Remove all segments after the given LSN.
579 5 : remove_segments_from_disk(&self.timeline_dir, self.wal_seg_size, |x| x > segno).await?;
580 :
581 5 : let (file, is_partial) = self.open_or_create(segno).await?;
582 :
583 : // Fill end with zeroes
584 5 : file.set_len(xlogoff as u64).await?;
585 5 : file.set_len(self.wal_seg_size as u64).await?;
586 5 : self.fsync_file(&file).await?;
587 :
588 5 : if !is_partial {
589 : // Make segment partial once again
590 0 : let (wal_file_path, wal_file_partial_path) =
591 0 : wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size);
592 0 : fs::rename(wal_file_path, wal_file_partial_path).await?;
593 5 : }
594 :
595 5 : self.pending_wal_truncation = false;
596 5 : info!("truncated WAL to {}", end_pos);
597 5 : Ok(())
598 5 : }
599 :
600 0 : fn remove_up_to(&self, segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>> {
601 0 : let timeline_dir = self.timeline_dir.clone();
602 0 : let wal_seg_size = self.wal_seg_size;
603 0 : Box::pin(async move {
604 0 : remove_segments_from_disk(&timeline_dir, wal_seg_size, |x| x <= segno_up_to).await
605 0 : })
606 0 : }
607 :
608 0 : fn close(&mut self) {
609 : // close happens in destructor
610 0 : let _open_file = self.file.take();
611 0 : }
612 :
613 0 : fn get_metrics(&self) -> WalStorageMetrics {
614 0 : self.metrics.clone()
615 0 : }
616 : }
617 :
618 : /// Remove all WAL segments in timeline_dir that match the given predicate.
619 5 : async fn remove_segments_from_disk(
620 5 : timeline_dir: &Utf8Path,
621 5 : wal_seg_size: usize,
622 5 : remove_predicate: impl Fn(XLogSegNo) -> bool,
623 5 : ) -> Result<()> {
624 5 : let _timer = WAL_STORAGE_OPERATION_SECONDS
625 5 : .with_label_values(&["remove_segments_from_disk"])
626 5 : .start_timer();
627 :
628 5 : let mut n_removed = 0;
629 5 : let mut min_removed = u64::MAX;
630 5 : let mut max_removed = u64::MIN;
631 :
632 5 : let mut entries = fs::read_dir(timeline_dir).await?;
633 15 : while let Some(entry) = entries.next_entry().await? {
634 10 : let entry_path = entry.path();
635 10 : let fname = entry_path.file_name().unwrap();
636 : /* Ignore files that are not XLOG segments */
637 10 : if !IsXLogFileName(fname) && !IsPartialXLogFileName(fname) {
638 5 : continue;
639 5 : }
640 5 : let (segno, _) = XLogFromFileName(fname, wal_seg_size)?;
641 5 : if remove_predicate(segno) {
642 0 : remove_file(entry_path).await?;
643 0 : n_removed += 1;
644 0 : min_removed = min(min_removed, segno);
645 0 : max_removed = max(max_removed, segno);
646 0 : REMOVED_WAL_SEGMENTS.inc();
647 5 : }
648 : }
649 :
650 5 : if n_removed > 0 {
651 0 : info!(
652 0 : "removed {} WAL segments [{}; {}]",
653 : n_removed, min_removed, max_removed
654 : );
655 5 : }
656 5 : Ok(())
657 5 : }
658 :
659 : pub struct WalReader {
660 : remote_path: RemotePath,
661 : timeline_dir: Utf8PathBuf,
662 : wal_seg_size: usize,
663 : pos: Lsn,
664 : wal_segment: Option<Pin<Box<dyn AsyncRead + Send + Sync>>>,
665 :
666 : // S3 will be used to read WAL if LSN is not available locally
667 : wal_backup: Arc<WalBackup>,
668 :
669 : // We don't have WAL locally if LSN is less than local_start_lsn
670 : local_start_lsn: Lsn,
671 : // We will respond with zero-ed bytes before this Lsn as long as
672 : // pos is in the same segment as timeline_start_lsn.
673 : timeline_start_lsn: Lsn,
674 : // integer version number of PostgreSQL, e.g. 14; 15; 16
675 : pg_version: PgMajorVersion,
676 : system_id: SystemId,
677 : timeline_start_segment: Option<Bytes>,
678 : }
679 :
680 : impl WalReader {
681 9 : pub fn new(
682 9 : ttid: &TenantTimelineId,
683 9 : timeline_dir: Utf8PathBuf,
684 9 : state: &TimelinePersistentState,
685 9 : start_pos: Lsn,
686 9 : wal_backup: Arc<WalBackup>,
687 9 : ) -> Result<Self> {
688 9 : if state.server.wal_seg_size == 0 || state.local_start_lsn == Lsn(0) {
689 0 : bail!("state uninitialized, no data to read");
690 9 : }
691 :
692 : // TODO: Upgrade to bail!() once we know this couldn't possibly happen
693 9 : if state.timeline_start_lsn == Lsn(0) {
694 0 : warn!("timeline_start_lsn uninitialized before initializing wal reader");
695 9 : }
696 :
697 9 : if start_pos
698 9 : < state
699 9 : .timeline_start_lsn
700 9 : .segment_lsn(state.server.wal_seg_size as usize)
701 : {
702 0 : bail!(
703 0 : "Requested streaming from {}, which is before the start of the timeline {}, and also doesn't start at the first segment of that timeline",
704 : start_pos,
705 : state.timeline_start_lsn
706 : );
707 9 : }
708 :
709 : Ok(Self {
710 9 : remote_path: remote_timeline_path(ttid)?,
711 9 : timeline_dir,
712 9 : wal_seg_size: state.server.wal_seg_size as usize,
713 9 : pos: start_pos,
714 9 : wal_segment: None,
715 9 : wal_backup,
716 9 : local_start_lsn: state.local_start_lsn,
717 9 : timeline_start_lsn: state.timeline_start_lsn,
718 9 : pg_version: PgMajorVersion::try_from(state.server.pg_version).unwrap(),
719 9 : system_id: state.server.system_id,
720 9 : timeline_start_segment: None,
721 : })
722 9 : }
723 :
724 : /// Read WAL at current position into provided buf, returns number of bytes
725 : /// read. It can be smaller than buf size only if segment boundary is
726 : /// reached.
727 79 : pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
728 : // If this timeline is new, we may not have a full segment yet, so
729 : // we pad the first bytes of the timeline's first WAL segment with 0s
730 79 : if self.pos < self.timeline_start_lsn {
731 0 : debug_assert_eq!(
732 0 : self.pos.segment_number(self.wal_seg_size),
733 0 : self.timeline_start_lsn.segment_number(self.wal_seg_size)
734 : );
735 :
736 : // All bytes after timeline_start_lsn are in WAL, but those before
737 : // are not, so we manually construct an empty segment for the bytes
738 : // not available in this timeline.
739 0 : if self.timeline_start_segment.is_none() {
740 0 : let it = postgres_ffi::generate_wal_segment(
741 0 : self.timeline_start_lsn.segment_number(self.wal_seg_size),
742 0 : self.system_id,
743 0 : self.pg_version,
744 0 : self.timeline_start_lsn,
745 0 : )?;
746 0 : self.timeline_start_segment = Some(it);
747 0 : }
748 :
749 0 : assert!(self.timeline_start_segment.is_some());
750 0 : let segment = self.timeline_start_segment.take().unwrap();
751 :
752 0 : let seg_bytes = &segment[..];
753 :
754 : // How much of the current segment have we already consumed?
755 0 : let pos_seg_offset = self.pos.segment_offset(self.wal_seg_size);
756 :
757 : // How many bytes may we consume in total?
758 0 : let tl_start_seg_offset = self.timeline_start_lsn.segment_offset(self.wal_seg_size);
759 :
760 0 : debug_assert!(seg_bytes.len() > pos_seg_offset);
761 0 : debug_assert!(seg_bytes.len() > tl_start_seg_offset);
762 :
763 : // Copy as many bytes as possible into the buffer
764 0 : let len = (tl_start_seg_offset - pos_seg_offset).min(buf.len());
765 0 : buf[0..len].copy_from_slice(&seg_bytes[pos_seg_offset..pos_seg_offset + len]);
766 :
767 0 : self.pos += len as u64;
768 :
769 : // If we're done with the segment, we can release it's memory.
770 : // However, if we're not yet done, store it so that we don't have to
771 : // construct the segment the next time this function is called.
772 0 : if self.pos < self.timeline_start_lsn {
773 0 : self.timeline_start_segment = Some(segment);
774 0 : }
775 :
776 0 : return Ok(len);
777 79 : }
778 :
779 79 : let mut wal_segment = match self.wal_segment.take() {
780 70 : Some(reader) => reader,
781 9 : None => self.open_segment().await?,
782 : };
783 :
784 : // How much to read and send in message? We cannot cross the WAL file
785 : // boundary, and we don't want send more than provided buffer.
786 78 : let xlogoff = self.pos.segment_offset(self.wal_seg_size);
787 78 : let send_size = min(buf.len(), self.wal_seg_size - xlogoff);
788 :
789 : // Read some data from the file.
790 78 : let buf = &mut buf[0..send_size];
791 78 : let send_size = wal_segment.read_exact(buf).await?;
792 76 : self.pos += send_size as u64;
793 :
794 : // Decide whether to reuse this file. If we don't set wal_segment here
795 : // a new reader will be opened next time.
796 76 : if self.pos.segment_offset(self.wal_seg_size) != 0 {
797 76 : self.wal_segment = Some(wal_segment);
798 76 : }
799 :
800 76 : Ok(send_size)
801 76 : }
802 :
803 : /// Open WAL segment at the current position of the reader.
804 9 : async fn open_segment(&self) -> Result<Pin<Box<dyn AsyncRead + Send + Sync>>> {
805 9 : let xlogoff = self.pos.segment_offset(self.wal_seg_size);
806 9 : let segno = self.pos.segment_number(self.wal_seg_size);
807 9 : let wal_file_name = XLogFileName(PG_TLI, segno, self.wal_seg_size);
808 :
809 : // Try to open local file, if we may have WAL locally
810 9 : if self.pos >= self.local_start_lsn {
811 9 : let res = open_wal_file(&self.timeline_dir, segno, self.wal_seg_size).await?;
812 8 : if let Some((mut file, _)) = res {
813 8 : file.seek(SeekFrom::Start(xlogoff as u64)).await?;
814 8 : return Ok(Box::pin(file));
815 0 : } else {
816 0 : // NotFound is expected, fall through to remote read
817 0 : }
818 0 : }
819 :
820 : // Try to open remote file, if remote reads are enabled
821 0 : if let Some(storage) = self.wal_backup.get_storage() {
822 0 : let remote_wal_file_path = self.remote_path.join(&wal_file_name);
823 0 : return read_object(&storage, &remote_wal_file_path, xlogoff as u64).await;
824 0 : }
825 :
826 0 : bail!("WAL segment is not found")
827 8 : }
828 : }
829 :
830 : /// Helper function for opening WAL segment `segno` in `dir`. Returns file and
831 : /// whether it is .partial.
832 9 : pub(crate) async fn open_wal_file(
833 9 : timeline_dir: &Utf8Path,
834 9 : segno: XLogSegNo,
835 9 : wal_seg_size: usize,
836 9 : ) -> Result<Option<(tokio::fs::File, bool)>> {
837 9 : let (wal_file_path, wal_file_partial_path) = wal_file_paths(timeline_dir, segno, wal_seg_size);
838 :
839 : // First try to open the .partial file.
840 9 : let mut partial_path = wal_file_path.to_owned();
841 9 : partial_path.set_extension("partial");
842 9 : if let Ok(opened_file) = tokio::fs::File::open(&wal_file_partial_path).await {
843 8 : return Ok(Some((opened_file, true)));
844 0 : }
845 :
846 : // If that failed, try it without the .partial extension.
847 0 : let pf_res = tokio::fs::File::open(&wal_file_path).await;
848 0 : if let Err(e) = &pf_res {
849 0 : if e.kind() == ErrorKind::NotFound {
850 0 : return Ok(None);
851 0 : }
852 0 : }
853 0 : let pf = pf_res
854 0 : .with_context(|| format!("failed to open WAL file {wal_file_path:#}"))
855 0 : .map_err(|e| {
856 0 : warn!("{e}");
857 0 : e
858 0 : })?;
859 :
860 0 : Ok(Some((pf, false)))
861 8 : }
862 :
863 : /// Helper returning full path to WAL segment file and its .partial brother.
864 24 : pub fn wal_file_paths(
865 24 : timeline_dir: &Utf8Path,
866 24 : segno: XLogSegNo,
867 24 : wal_seg_size: usize,
868 24 : ) -> (Utf8PathBuf, Utf8PathBuf) {
869 24 : let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size);
870 24 : let wal_file_path = timeline_dir.join(wal_file_name.clone());
871 24 : let wal_file_partial_path = timeline_dir.join(wal_file_name + ".partial");
872 24 : (wal_file_path, wal_file_partial_path)
873 24 : }
|