Line data Source code
1 : //! This module has everything to deal with WAL -- reading and writing to disk.
2 : //!
3 : //! Safekeeper WAL is stored in the timeline directory, in format similar to pg_wal.
4 : //! PG timeline is always 1, so WAL segments are usually have names like this:
5 : //! - 000000010000000000000001
6 : //! - 000000010000000000000002.partial
7 : //!
8 : //! Note that last file has `.partial` suffix, that's different from postgres.
9 :
10 : use anyhow::{bail, Context, Result};
11 : use bytes::Bytes;
12 : use camino::{Utf8Path, Utf8PathBuf};
13 : use futures::future::BoxFuture;
14 : use postgres_ffi::v14::xlog_utils::{IsPartialXLogFileName, IsXLogFileName, XLogFromFileName};
15 : use postgres_ffi::{dispatch_pgversion, XLogSegNo, PG_TLI};
16 : use remote_storage::RemotePath;
17 : use std::cmp::{max, min};
18 : use std::future::Future;
19 : use std::io::{self, SeekFrom};
20 : use std::pin::Pin;
21 : use tokio::fs::{self, remove_file, File, OpenOptions};
22 : use tokio::io::{AsyncRead, AsyncWriteExt};
23 : use tokio::io::{AsyncReadExt, AsyncSeekExt};
24 : use tracing::*;
25 : use utils::crashsafe::durable_rename;
26 :
27 : use crate::metrics::{
28 : time_io_closure, WalStorageMetrics, REMOVED_WAL_SEGMENTS, WAL_STORAGE_OPERATION_SECONDS,
29 : };
30 : use crate::state::TimelinePersistentState;
31 : use crate::wal_backup::{read_object, remote_timeline_path};
32 : use postgres_ffi::waldecoder::WalStreamDecoder;
33 : use postgres_ffi::XLogFileName;
34 : use pq_proto::SystemId;
35 : use utils::{id::TenantTimelineId, lsn::Lsn};
36 :
37 : pub trait Storage {
38 : // Last written LSN.
39 : fn write_lsn(&self) -> Lsn;
40 : /// LSN of last durably stored WAL record.
41 : fn flush_lsn(&self) -> Lsn;
42 :
43 : /// Initialize segment by creating proper long header at the beginning of
44 : /// the segment and short header at the page of given LSN. This is only used
45 : /// for timeline initialization because compute will stream data only since
46 : /// init_lsn. Other segment headers are included in compute stream.
47 : fn initialize_first_segment(
48 : &mut self,
49 : init_lsn: Lsn,
50 : ) -> impl Future<Output = Result<()>> + Send;
51 :
52 : /// Write piece of WAL from buf to disk, but not necessarily sync it.
53 : fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> impl Future<Output = Result<()>> + Send;
54 :
55 : /// Truncate WAL at specified LSN, which must be the end of WAL record.
56 : fn truncate_wal(&mut self, end_pos: Lsn) -> impl Future<Output = Result<()>> + Send;
57 :
58 : /// Durably store WAL on disk, up to the last written WAL record.
59 : fn flush_wal(&mut self) -> impl Future<Output = Result<()>> + Send;
60 :
61 : /// Remove all segments <= given segno. Returns function doing that as we
62 : /// want to perform it without timeline lock.
63 : fn remove_up_to(&self, segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>>;
64 :
65 : /// Release resources associated with the storage -- technically, close FDs.
66 : /// Currently we don't remove timelines until restart (#3146), so need to
67 : /// spare descriptors. This would be useful for temporary tli detach as
68 : /// well.
69 0 : fn close(&mut self) {}
70 :
71 : /// Get metrics for this timeline.
72 : fn get_metrics(&self) -> WalStorageMetrics;
73 : }
74 :
75 : /// PhysicalStorage is a storage that stores WAL on disk. Writes are separated from flushes
76 : /// for better performance. Storage is initialized in the constructor.
77 : ///
78 : /// WAL is stored in segments, each segment is a file. Last segment has ".partial" suffix in
79 : /// its filename and may be not fully flushed.
80 : ///
81 : /// Relationship of LSNs:
82 : /// `write_lsn` >= `write_record_lsn` >= `flush_record_lsn`
83 : ///
84 : /// When storage is created first time, all LSNs are zeroes and there are no segments on disk.
85 : pub struct PhysicalStorage {
86 : metrics: WalStorageMetrics,
87 : timeline_dir: Utf8PathBuf,
88 :
89 : /// Disables fsync if true.
90 : no_sync: bool,
91 :
92 : /// Size of WAL segment in bytes.
93 : wal_seg_size: usize,
94 : pg_version: u32,
95 : system_id: u64,
96 :
97 : /// Written to disk, but possibly still in the cache and not fully persisted.
98 : /// Also can be ahead of record_lsn, if happen to be in the middle of a WAL record.
99 : write_lsn: Lsn,
100 :
101 : /// The LSN of the last WAL record written to disk. Still can be not fully
102 : /// flushed.
103 : ///
104 : /// Note: Normally it (and flush_record_lsn) is <= write_lsn, but after xlog
105 : /// switch ingest the reverse is true because we don't bump write_lsn up to
106 : /// the next segment: WAL stream from the compute doesn't have the gap and
107 : /// for simplicity / as a sanity check we disallow any non-sequential
108 : /// writes, so write zeros as is.
109 : ///
110 : /// Similar effect is in theory possible due to LSN alignment: if record
111 : /// ends at *2, decoder will report end lsn as *8 even though we haven't
112 : /// written these zeros yet. In practice compute likely never sends
113 : /// non-aligned chunks of data.
114 : write_record_lsn: Lsn,
115 :
116 : /// The last LSN flushed to disk. May be in the middle of a record.
117 : ///
118 : /// NB: when the rest of the system refers to `flush_lsn`, it usually
119 : /// actually refers to `flush_record_lsn`. This ambiguity can be dangerous
120 : /// and should be resolved.
121 : flush_lsn: Lsn,
122 :
123 : /// The LSN of the last WAL record flushed to disk.
124 : flush_record_lsn: Lsn,
125 :
126 : /// Decoder is required for detecting boundaries of WAL records.
127 : decoder: WalStreamDecoder,
128 :
129 : /// Cached open file for the last segment.
130 : ///
131 : /// If Some(file) is open, then it always:
132 : /// - has ".partial" suffix
133 : /// - points to write_lsn, so no seek is needed for writing
134 : /// - doesn't point to the end of the segment
135 : file: Option<File>,
136 :
137 : /// When true, WAL truncation potentially has been interrupted and we need
138 : /// to finish it before allowing WAL writes; see truncate_wal for details.
139 : /// In this case [`write_lsn`] can be less than actually written WAL on
140 : /// disk. In particular, there can be a case with unexpected .partial file.
141 : ///
142 : /// Imagine the following:
143 : /// - 000000010000000000000001
144 : /// - it was fully written, but the last record is split between 2
145 : /// segments
146 : /// - after restart, `find_end_of_wal()` returned 0/1FFFFF0, which is in
147 : /// the end of this segment
148 : /// - `write_lsn`, `write_record_lsn` and `flush_record_lsn` were
149 : /// initialized to 0/1FFFFF0
150 : /// - 000000010000000000000002.partial
151 : /// - it has only 1 byte written, which is not enough to make a full WAL
152 : /// record
153 : ///
154 : /// Partial segment 002 has no WAL records, and it will be removed by the
155 : /// next truncate_wal(). This flag will be set to true after the first
156 : /// truncate_wal() call.
157 : ///
158 : /// [`write_lsn`]: Self::write_lsn
159 : pending_wal_truncation: bool,
160 : }
161 :
162 : impl PhysicalStorage {
163 : /// Create new storage. If commit_lsn is not zero, flush_lsn is tried to be restored from
164 : /// the disk. Otherwise, all LSNs are set to zero.
165 0 : pub fn new(
166 0 : ttid: &TenantTimelineId,
167 0 : timeline_dir: &Utf8Path,
168 0 : state: &TimelinePersistentState,
169 0 : no_sync: bool,
170 0 : ) -> Result<PhysicalStorage> {
171 0 : let wal_seg_size = state.server.wal_seg_size as usize;
172 :
173 : // Find out where stored WAL ends, starting at commit_lsn which is a
174 : // known recent record boundary (unless we don't have WAL at all).
175 : //
176 : // NB: find_end_of_wal MUST be backwards compatible with the previously
177 : // written WAL. If find_end_of_wal fails to read any WAL written by an
178 : // older version of the code, we could lose data forever.
179 0 : let write_lsn = if state.commit_lsn == Lsn(0) {
180 0 : Lsn(0)
181 : } else {
182 0 : let version = state.server.pg_version / 10000;
183 0 :
184 0 : dispatch_pgversion!(
185 0 : version,
186 0 : pgv::xlog_utils::find_end_of_wal(
187 0 : timeline_dir.as_std_path(),
188 0 : wal_seg_size,
189 0 : state.commit_lsn,
190 0 : )?,
191 0 : bail!("unsupported postgres version: {}", version)
192 : )
193 : };
194 :
195 : // note: this assumes we fsync'ed whole datadir on start.
196 0 : let flush_lsn = write_lsn;
197 0 :
198 0 : debug!(
199 0 : "initialized storage for timeline {}, flush_lsn={}, commit_lsn={}, peer_horizon_lsn={}",
200 : ttid.timeline_id, flush_lsn, state.commit_lsn, state.peer_horizon_lsn,
201 : );
202 0 : if flush_lsn < state.commit_lsn {
203 0 : bail!("timeline {} potential data loss: flush_lsn {} by find_end_of_wal is less than commit_lsn {} from control file", ttid.timeline_id, flush_lsn, state.commit_lsn);
204 0 : }
205 0 : if flush_lsn < state.peer_horizon_lsn {
206 0 : warn!(
207 0 : "timeline {}: flush_lsn {} is less than cfile peer_horizon_lsn {}",
208 : ttid.timeline_id, flush_lsn, state.peer_horizon_lsn
209 : );
210 0 : }
211 :
212 0 : Ok(PhysicalStorage {
213 0 : metrics: WalStorageMetrics::default(),
214 0 : timeline_dir: timeline_dir.to_path_buf(),
215 0 : no_sync,
216 0 : wal_seg_size,
217 0 : pg_version: state.server.pg_version,
218 0 : system_id: state.server.system_id,
219 0 : write_lsn,
220 0 : write_record_lsn: write_lsn,
221 0 : flush_lsn,
222 0 : flush_record_lsn: flush_lsn,
223 0 : decoder: WalStreamDecoder::new(write_lsn, state.server.pg_version / 10000),
224 0 : file: None,
225 0 : pending_wal_truncation: true,
226 0 : })
227 0 : }
228 :
229 : /// Get all known state of the storage.
230 0 : pub fn internal_state(&self) -> (Lsn, Lsn, Lsn, bool) {
231 0 : (
232 0 : self.write_lsn,
233 0 : self.write_record_lsn,
234 0 : self.flush_record_lsn,
235 0 : self.file.is_some(),
236 0 : )
237 0 : }
238 :
239 : /// Call fsync if config requires so.
240 0 : async fn fsync_file(&mut self, file: &File) -> Result<()> {
241 0 : if !self.no_sync {
242 0 : self.metrics
243 0 : .observe_flush_seconds(time_io_closure(file.sync_all()).await?);
244 0 : }
245 0 : Ok(())
246 0 : }
247 :
248 : /// Call fdatasync if config requires so.
249 0 : async fn fdatasync_file(&mut self, file: &File) -> Result<()> {
250 0 : if !self.no_sync {
251 0 : self.metrics
252 0 : .observe_flush_seconds(time_io_closure(file.sync_data()).await?);
253 0 : }
254 0 : Ok(())
255 0 : }
256 :
257 : /// Open or create WAL segment file. Caller must call seek to the wanted position.
258 : /// Returns `file` and `is_partial`.
259 0 : async fn open_or_create(&mut self, segno: XLogSegNo) -> Result<(File, bool)> {
260 0 : let (wal_file_path, wal_file_partial_path) =
261 0 : wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size);
262 :
263 : // Try to open already completed segment
264 0 : if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path).await {
265 0 : Ok((file, false))
266 0 : } else if let Ok(file) = OpenOptions::new()
267 0 : .write(true)
268 0 : .open(&wal_file_partial_path)
269 0 : .await
270 : {
271 : // Try to open existing partial file
272 0 : Ok((file, true))
273 : } else {
274 0 : let _timer = WAL_STORAGE_OPERATION_SECONDS
275 0 : .with_label_values(&["initialize_segment"])
276 0 : .start_timer();
277 0 : // Create and fill new partial file
278 0 : //
279 0 : // We're using fdatasync during WAL writing, so file size must not
280 0 : // change; to this end it is filled with zeros here. To avoid using
281 0 : // half initialized segment, first bake it under tmp filename and
282 0 : // then rename.
283 0 : let tmp_path = self.timeline_dir.join("waltmp");
284 0 : let file = File::create(&tmp_path)
285 0 : .await
286 0 : .with_context(|| format!("Failed to open tmp wal file {:?}", &tmp_path))?;
287 :
288 0 : fail::fail_point!("sk-zero-segment", |_| {
289 0 : info!("sk-zero-segment failpoint hit");
290 0 : Err(anyhow::anyhow!("failpoint: sk-zero-segment"))
291 0 : });
292 0 : file.set_len(self.wal_seg_size as u64).await?;
293 :
294 0 : if let Err(e) = durable_rename(&tmp_path, &wal_file_partial_path, !self.no_sync).await {
295 : // Probably rename succeeded, but fsync of it failed. Remove
296 : // the file then to avoid using it.
297 0 : remove_file(wal_file_partial_path)
298 0 : .await
299 0 : .or_else(utils::fs_ext::ignore_not_found)?;
300 0 : return Err(e.into());
301 0 : }
302 0 : Ok((file, true))
303 : }
304 0 : }
305 :
306 : /// Write WAL bytes, which are known to be located in a single WAL segment. Returns true if the
307 : /// segment was completed, closed, and flushed to disk.
308 0 : async fn write_in_segment(&mut self, segno: u64, xlogoff: usize, buf: &[u8]) -> Result<bool> {
309 0 : let mut file = if let Some(file) = self.file.take() {
310 0 : file
311 : } else {
312 0 : let (mut file, is_partial) = self.open_or_create(segno).await?;
313 0 : assert!(is_partial, "unexpected write into non-partial segment file");
314 0 : file.seek(SeekFrom::Start(xlogoff as u64)).await?;
315 0 : file
316 : };
317 :
318 0 : file.write_all(buf).await?;
319 : // Note: flush just ensures write above reaches the OS (this is not
320 : // needed in case of sync IO as Write::write there calls directly write
321 : // syscall, but needed in case of async). It does *not* fsyncs the file.
322 0 : file.flush().await?;
323 :
324 0 : if xlogoff + buf.len() == self.wal_seg_size {
325 : // If we reached the end of a WAL segment, flush and close it.
326 0 : self.fdatasync_file(&file).await?;
327 :
328 : // Rename partial file to completed file
329 0 : let (wal_file_path, wal_file_partial_path) =
330 0 : wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size);
331 0 : fs::rename(wal_file_partial_path, wal_file_path).await?;
332 0 : Ok(true)
333 : } else {
334 : // otherwise, file can be reused later
335 0 : self.file = Some(file);
336 0 : Ok(false)
337 : }
338 0 : }
339 :
340 : /// Writes WAL to the segment files, until everything is writed. If some segments
341 : /// are fully written, they are flushed to disk. The last (partial) segment can
342 : /// be flushed separately later.
343 : ///
344 : /// Updates `write_lsn` and `flush_lsn`.
345 0 : async fn write_exact(&mut self, pos: Lsn, mut buf: &[u8]) -> Result<()> {
346 0 : // TODO: this shouldn't be possible, except possibly with write_lsn == 0.
347 0 : // Rename this method to `append_exact`, and make it append-only, removing
348 0 : // the `pos` parameter and this check. For this reason, we don't update
349 0 : // `flush_lsn` here.
350 0 : if self.write_lsn != pos {
351 : // need to flush the file before discarding it
352 0 : if let Some(file) = self.file.take() {
353 0 : self.fdatasync_file(&file).await?;
354 0 : }
355 :
356 0 : self.write_lsn = pos;
357 0 : }
358 :
359 0 : while !buf.is_empty() {
360 : // Extract WAL location for this block
361 0 : let xlogoff = self.write_lsn.segment_offset(self.wal_seg_size);
362 0 : let segno = self.write_lsn.segment_number(self.wal_seg_size);
363 :
364 : // If crossing a WAL boundary, only write up until we reach wal segment size.
365 0 : let bytes_write = if xlogoff + buf.len() > self.wal_seg_size {
366 0 : self.wal_seg_size - xlogoff
367 : } else {
368 0 : buf.len()
369 : };
370 :
371 0 : let flushed = self
372 0 : .write_in_segment(segno, xlogoff, &buf[..bytes_write])
373 0 : .await?;
374 0 : self.write_lsn += bytes_write as u64;
375 0 : if flushed {
376 0 : self.flush_lsn = self.write_lsn;
377 0 : }
378 0 : buf = &buf[bytes_write..];
379 : }
380 :
381 0 : Ok(())
382 0 : }
383 : }
384 :
385 : impl Storage for PhysicalStorage {
386 : // Last written LSN.
387 0 : fn write_lsn(&self) -> Lsn {
388 0 : self.write_lsn
389 0 : }
390 : /// flush_lsn returns LSN of last durably stored WAL record.
391 : ///
392 : /// TODO: flush_lsn() returns flush_record_lsn, but write_lsn() returns write_lsn: confusing.
393 : #[allow(clippy::misnamed_getters)]
394 0 : fn flush_lsn(&self) -> Lsn {
395 0 : self.flush_record_lsn
396 0 : }
397 :
398 0 : async fn initialize_first_segment(&mut self, init_lsn: Lsn) -> Result<()> {
399 0 : let _timer = WAL_STORAGE_OPERATION_SECONDS
400 0 : .with_label_values(&["initialize_first_segment"])
401 0 : .start_timer();
402 0 :
403 0 : let segno = init_lsn.segment_number(self.wal_seg_size);
404 0 : let (mut file, _) = self.open_or_create(segno).await?;
405 0 : let major_pg_version = self.pg_version / 10000;
406 0 : let wal_seg =
407 0 : postgres_ffi::generate_wal_segment(segno, self.system_id, major_pg_version, init_lsn)?;
408 0 : file.seek(SeekFrom::Start(0)).await?;
409 0 : file.write_all(&wal_seg).await?;
410 0 : file.flush().await?;
411 0 : info!("initialized segno {} at lsn {}", segno, init_lsn);
412 : // note: file is *not* fsynced
413 0 : Ok(())
414 0 : }
415 :
416 : /// Write WAL to disk.
417 0 : async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
418 0 : // Disallow any non-sequential writes, which can result in gaps or overwrites.
419 0 : // If we need to move the pointer, use truncate_wal() instead.
420 0 : if self.write_lsn > startpos {
421 0 : bail!(
422 0 : "write_wal rewrites WAL written before, write_lsn={}, startpos={}",
423 0 : self.write_lsn,
424 0 : startpos
425 0 : );
426 0 : }
427 0 : if self.write_lsn < startpos && self.write_lsn != Lsn(0) {
428 0 : bail!(
429 0 : "write_wal creates gap in written WAL, write_lsn={}, startpos={}",
430 0 : self.write_lsn,
431 0 : startpos
432 0 : );
433 0 : }
434 0 : if self.pending_wal_truncation {
435 0 : bail!(
436 0 : "write_wal called with pending WAL truncation, write_lsn={}, startpos={}",
437 0 : self.write_lsn,
438 0 : startpos
439 0 : );
440 0 : }
441 :
442 0 : let write_seconds = time_io_closure(self.write_exact(startpos, buf)).await?;
443 : // WAL is written, updating write metrics
444 0 : self.metrics.observe_write_seconds(write_seconds);
445 0 : self.metrics.observe_write_bytes(buf.len());
446 0 :
447 0 : // Figure out the last record's end LSN and update `write_record_lsn`
448 0 : // (if we got a whole record). The write may also have closed and
449 0 : // flushed a segment, so update `flush_record_lsn` as well.
450 0 : if self.decoder.available() != startpos {
451 0 : info!(
452 0 : "restart decoder from {} to {}",
453 0 : self.decoder.available(),
454 : startpos,
455 : );
456 0 : let pg_version = self.decoder.pg_version;
457 0 : self.decoder = WalStreamDecoder::new(startpos, pg_version);
458 0 : }
459 0 : self.decoder.feed_bytes(buf);
460 0 :
461 0 : if self.write_record_lsn <= self.flush_lsn {
462 0 : // We may have flushed a previously written record.
463 0 : self.flush_record_lsn = self.write_record_lsn;
464 0 : }
465 0 : while let Some((lsn, _rec)) = self.decoder.poll_decode()? {
466 0 : self.write_record_lsn = lsn;
467 0 : if lsn <= self.flush_lsn {
468 0 : self.flush_record_lsn = lsn;
469 0 : }
470 : }
471 :
472 0 : Ok(())
473 0 : }
474 :
475 0 : async fn flush_wal(&mut self) -> Result<()> {
476 0 : if self.flush_record_lsn == self.write_record_lsn {
477 : // no need to do extra flush
478 0 : return Ok(());
479 0 : }
480 :
481 0 : if let Some(unflushed_file) = self.file.take() {
482 0 : self.fdatasync_file(&unflushed_file).await?;
483 0 : self.file = Some(unflushed_file);
484 : } else {
485 : // We have unflushed data (write_lsn != flush_lsn), but no file. This
486 : // shouldn't happen, since the segment is flushed on close.
487 0 : bail!(
488 0 : "unexpected unflushed data with no open file, write_lsn={}, flush_lsn={}",
489 0 : self.write_lsn,
490 0 : self.flush_record_lsn
491 0 : );
492 : }
493 :
494 : // everything is flushed now, let's update flush_lsn
495 0 : self.flush_lsn = self.write_lsn;
496 0 : self.flush_record_lsn = self.write_record_lsn;
497 0 : Ok(())
498 0 : }
499 :
500 : /// Truncate written WAL by removing all WAL segments after the given LSN.
501 : /// end_pos must point to the end of the WAL record.
502 0 : async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
503 0 : let _timer = WAL_STORAGE_OPERATION_SECONDS
504 0 : .with_label_values(&["truncate_wal"])
505 0 : .start_timer();
506 0 :
507 0 : // Streaming must not create a hole, so truncate cannot be called on
508 0 : // non-written lsn.
509 0 : if self.write_record_lsn != Lsn(0) && end_pos > self.write_record_lsn {
510 0 : bail!(
511 0 : "truncate_wal called on non-written WAL, write_record_lsn={}, end_pos={}",
512 0 : self.write_record_lsn,
513 0 : end_pos
514 0 : );
515 0 : }
516 0 :
517 0 : // Quick exit if nothing to do and we know that the state is clean to
518 0 : // avoid writing up to 16 MiB of zeros on disk (this happens on each
519 0 : // connect).
520 0 : if !self.pending_wal_truncation
521 0 : && end_pos == self.write_lsn
522 0 : && end_pos == self.flush_record_lsn
523 : {
524 0 : return Ok(());
525 0 : }
526 0 :
527 0 : // Atomicity: we start with LSNs reset because once on disk deletion is
528 0 : // started it can't be reversed. However, we might crash/error in the
529 0 : // middle, leaving garbage above the truncation point. In theory,
530 0 : // concatenated with previous records it might form bogus WAL (though
531 0 : // very unlikely in practice because CRC would guard from that). To
532 0 : // protect, set pending_wal_truncation flag before beginning: it means
533 0 : // truncation must be retried and WAL writes are prohibited until it
534 0 : // succeeds. Flag is also set on boot because we don't know if the last
535 0 : // state was clean.
536 0 : //
537 0 : // Protocol (HandleElected before first AppendRequest) ensures we'll
538 0 : // always try to ensure clean truncation before any writes.
539 0 : self.pending_wal_truncation = true;
540 0 :
541 0 : self.write_lsn = end_pos;
542 0 : self.flush_lsn = end_pos;
543 0 : self.write_record_lsn = end_pos;
544 0 : self.flush_record_lsn = end_pos;
545 :
546 : // Close previously opened file, if any
547 0 : if let Some(unflushed_file) = self.file.take() {
548 0 : self.fdatasync_file(&unflushed_file).await?;
549 0 : }
550 :
551 0 : let xlogoff = end_pos.segment_offset(self.wal_seg_size);
552 0 : let segno = end_pos.segment_number(self.wal_seg_size);
553 0 :
554 0 : // Remove all segments after the given LSN.
555 0 : remove_segments_from_disk(&self.timeline_dir, self.wal_seg_size, |x| x > segno).await?;
556 :
557 0 : let (file, is_partial) = self.open_or_create(segno).await?;
558 :
559 : // Fill end with zeroes
560 0 : file.set_len(xlogoff as u64).await?;
561 0 : file.set_len(self.wal_seg_size as u64).await?;
562 0 : self.fsync_file(&file).await?;
563 :
564 0 : if !is_partial {
565 : // Make segment partial once again
566 0 : let (wal_file_path, wal_file_partial_path) =
567 0 : wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size);
568 0 : fs::rename(wal_file_path, wal_file_partial_path).await?;
569 0 : }
570 :
571 0 : self.pending_wal_truncation = false;
572 0 : Ok(())
573 0 : }
574 :
575 0 : fn remove_up_to(&self, segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>> {
576 0 : let timeline_dir = self.timeline_dir.clone();
577 0 : let wal_seg_size = self.wal_seg_size;
578 0 : Box::pin(async move {
579 0 : remove_segments_from_disk(&timeline_dir, wal_seg_size, |x| x <= segno_up_to).await
580 0 : })
581 0 : }
582 :
583 0 : fn close(&mut self) {
584 0 : // close happens in destructor
585 0 : let _open_file = self.file.take();
586 0 : }
587 :
588 0 : fn get_metrics(&self) -> WalStorageMetrics {
589 0 : self.metrics.clone()
590 0 : }
591 : }
592 :
593 : /// Remove all WAL segments in timeline_dir that match the given predicate.
594 0 : async fn remove_segments_from_disk(
595 0 : timeline_dir: &Utf8Path,
596 0 : wal_seg_size: usize,
597 0 : remove_predicate: impl Fn(XLogSegNo) -> bool,
598 0 : ) -> Result<()> {
599 0 : let _timer = WAL_STORAGE_OPERATION_SECONDS
600 0 : .with_label_values(&["remove_segments_from_disk"])
601 0 : .start_timer();
602 0 :
603 0 : let mut n_removed = 0;
604 0 : let mut min_removed = u64::MAX;
605 0 : let mut max_removed = u64::MIN;
606 :
607 0 : let mut entries = fs::read_dir(timeline_dir).await?;
608 0 : while let Some(entry) = entries.next_entry().await? {
609 0 : let entry_path = entry.path();
610 0 : let fname = entry_path.file_name().unwrap();
611 0 : /* Ignore files that are not XLOG segments */
612 0 : if !IsXLogFileName(fname) && !IsPartialXLogFileName(fname) {
613 0 : continue;
614 0 : }
615 0 : let (segno, _) = XLogFromFileName(fname, wal_seg_size)?;
616 0 : if remove_predicate(segno) {
617 0 : remove_file(entry_path).await?;
618 0 : n_removed += 1;
619 0 : min_removed = min(min_removed, segno);
620 0 : max_removed = max(max_removed, segno);
621 0 : REMOVED_WAL_SEGMENTS.inc();
622 0 : }
623 : }
624 :
625 0 : if n_removed > 0 {
626 0 : info!(
627 0 : "removed {} WAL segments [{}; {}]",
628 : n_removed, min_removed, max_removed
629 : );
630 0 : }
631 0 : Ok(())
632 0 : }
633 :
634 : pub struct WalReader {
635 : remote_path: RemotePath,
636 : timeline_dir: Utf8PathBuf,
637 : wal_seg_size: usize,
638 : pos: Lsn,
639 : wal_segment: Option<Pin<Box<dyn AsyncRead + Send + Sync>>>,
640 :
641 : // S3 will be used to read WAL if LSN is not available locally
642 : enable_remote_read: bool,
643 :
644 : // We don't have WAL locally if LSN is less than local_start_lsn
645 : local_start_lsn: Lsn,
646 : // We will respond with zero-ed bytes before this Lsn as long as
647 : // pos is in the same segment as timeline_start_lsn.
648 : timeline_start_lsn: Lsn,
649 : // integer version number of PostgreSQL, e.g. 14; 15; 16
650 : pg_version: u32,
651 : system_id: SystemId,
652 : timeline_start_segment: Option<Bytes>,
653 : }
654 :
655 : impl WalReader {
656 0 : pub fn new(
657 0 : ttid: &TenantTimelineId,
658 0 : timeline_dir: Utf8PathBuf,
659 0 : state: &TimelinePersistentState,
660 0 : start_pos: Lsn,
661 0 : enable_remote_read: bool,
662 0 : ) -> Result<Self> {
663 0 : if state.server.wal_seg_size == 0 || state.local_start_lsn == Lsn(0) {
664 0 : bail!("state uninitialized, no data to read");
665 0 : }
666 0 :
667 0 : // TODO: Upgrade to bail!() once we know this couldn't possibly happen
668 0 : if state.timeline_start_lsn == Lsn(0) {
669 0 : warn!("timeline_start_lsn uninitialized before initializing wal reader");
670 0 : }
671 :
672 0 : if start_pos
673 0 : < state
674 0 : .timeline_start_lsn
675 0 : .segment_lsn(state.server.wal_seg_size as usize)
676 : {
677 0 : bail!(
678 0 : "Requested streaming from {}, which is before the start of the timeline {}, and also doesn't start at the first segment of that timeline",
679 0 : start_pos,
680 0 : state.timeline_start_lsn
681 0 : );
682 0 : }
683 0 :
684 0 : Ok(Self {
685 0 : remote_path: remote_timeline_path(ttid)?,
686 0 : timeline_dir,
687 0 : wal_seg_size: state.server.wal_seg_size as usize,
688 0 : pos: start_pos,
689 0 : wal_segment: None,
690 0 : enable_remote_read,
691 0 : local_start_lsn: state.local_start_lsn,
692 0 : timeline_start_lsn: state.timeline_start_lsn,
693 0 : pg_version: state.server.pg_version / 10000,
694 0 : system_id: state.server.system_id,
695 0 : timeline_start_segment: None,
696 : })
697 0 : }
698 :
699 : /// Read WAL at current position into provided buf, returns number of bytes
700 : /// read. It can be smaller than buf size only if segment boundary is
701 : /// reached.
702 0 : pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
703 0 : // If this timeline is new, we may not have a full segment yet, so
704 0 : // we pad the first bytes of the timeline's first WAL segment with 0s
705 0 : if self.pos < self.timeline_start_lsn {
706 0 : debug_assert_eq!(
707 0 : self.pos.segment_number(self.wal_seg_size),
708 0 : self.timeline_start_lsn.segment_number(self.wal_seg_size)
709 : );
710 :
711 : // All bytes after timeline_start_lsn are in WAL, but those before
712 : // are not, so we manually construct an empty segment for the bytes
713 : // not available in this timeline.
714 0 : if self.timeline_start_segment.is_none() {
715 0 : let it = postgres_ffi::generate_wal_segment(
716 0 : self.timeline_start_lsn.segment_number(self.wal_seg_size),
717 0 : self.system_id,
718 0 : self.pg_version,
719 0 : self.timeline_start_lsn,
720 0 : )?;
721 0 : self.timeline_start_segment = Some(it);
722 0 : }
723 :
724 0 : assert!(self.timeline_start_segment.is_some());
725 0 : let segment = self.timeline_start_segment.take().unwrap();
726 0 :
727 0 : let seg_bytes = &segment[..];
728 0 :
729 0 : // How much of the current segment have we already consumed?
730 0 : let pos_seg_offset = self.pos.segment_offset(self.wal_seg_size);
731 0 :
732 0 : // How many bytes may we consume in total?
733 0 : let tl_start_seg_offset = self.timeline_start_lsn.segment_offset(self.wal_seg_size);
734 0 :
735 0 : debug_assert!(seg_bytes.len() > pos_seg_offset);
736 0 : debug_assert!(seg_bytes.len() > tl_start_seg_offset);
737 :
738 : // Copy as many bytes as possible into the buffer
739 0 : let len = (tl_start_seg_offset - pos_seg_offset).min(buf.len());
740 0 : buf[0..len].copy_from_slice(&seg_bytes[pos_seg_offset..pos_seg_offset + len]);
741 0 :
742 0 : self.pos += len as u64;
743 0 :
744 0 : // If we're done with the segment, we can release it's memory.
745 0 : // However, if we're not yet done, store it so that we don't have to
746 0 : // construct the segment the next time this function is called.
747 0 : if self.pos < self.timeline_start_lsn {
748 0 : self.timeline_start_segment = Some(segment);
749 0 : }
750 :
751 0 : return Ok(len);
752 0 : }
753 :
754 0 : let mut wal_segment = match self.wal_segment.take() {
755 0 : Some(reader) => reader,
756 0 : None => self.open_segment().await?,
757 : };
758 :
759 : // How much to read and send in message? We cannot cross the WAL file
760 : // boundary, and we don't want send more than provided buffer.
761 0 : let xlogoff = self.pos.segment_offset(self.wal_seg_size);
762 0 : let send_size = min(buf.len(), self.wal_seg_size - xlogoff);
763 0 :
764 0 : // Read some data from the file.
765 0 : let buf = &mut buf[0..send_size];
766 0 : let send_size = wal_segment.read_exact(buf).await?;
767 0 : self.pos += send_size as u64;
768 0 :
769 0 : // Decide whether to reuse this file. If we don't set wal_segment here
770 0 : // a new reader will be opened next time.
771 0 : if self.pos.segment_offset(self.wal_seg_size) != 0 {
772 0 : self.wal_segment = Some(wal_segment);
773 0 : }
774 :
775 0 : Ok(send_size)
776 0 : }
777 :
778 : /// Open WAL segment at the current position of the reader.
779 0 : async fn open_segment(&self) -> Result<Pin<Box<dyn AsyncRead + Send + Sync>>> {
780 0 : let xlogoff = self.pos.segment_offset(self.wal_seg_size);
781 0 : let segno = self.pos.segment_number(self.wal_seg_size);
782 0 : let wal_file_name = XLogFileName(PG_TLI, segno, self.wal_seg_size);
783 0 :
784 0 : // Try to open local file, if we may have WAL locally
785 0 : if self.pos >= self.local_start_lsn {
786 0 : let res = open_wal_file(&self.timeline_dir, segno, self.wal_seg_size).await;
787 0 : match res {
788 0 : Ok((mut file, _)) => {
789 0 : file.seek(SeekFrom::Start(xlogoff as u64)).await?;
790 0 : return Ok(Box::pin(file));
791 : }
792 0 : Err(e) => {
793 0 : let is_not_found = e.chain().any(|e| {
794 0 : if let Some(e) = e.downcast_ref::<io::Error>() {
795 0 : e.kind() == io::ErrorKind::NotFound
796 : } else {
797 0 : false
798 : }
799 0 : });
800 0 : if !is_not_found {
801 0 : return Err(e);
802 0 : }
803 : // NotFound is expected, fall through to remote read
804 : }
805 : };
806 0 : }
807 :
808 : // Try to open remote file, if remote reads are enabled
809 0 : if self.enable_remote_read {
810 0 : let remote_wal_file_path = self.remote_path.join(&wal_file_name);
811 0 : return read_object(&remote_wal_file_path, xlogoff as u64).await;
812 0 : }
813 0 :
814 0 : bail!("WAL segment is not found")
815 0 : }
816 : }
817 :
818 : /// Helper function for opening WAL segment `segno` in `dir`. Returns file and
819 : /// whether it is .partial.
820 0 : pub(crate) async fn open_wal_file(
821 0 : timeline_dir: &Utf8Path,
822 0 : segno: XLogSegNo,
823 0 : wal_seg_size: usize,
824 0 : ) -> Result<(tokio::fs::File, bool)> {
825 0 : let (wal_file_path, wal_file_partial_path) = wal_file_paths(timeline_dir, segno, wal_seg_size);
826 0 :
827 0 : // First try to open the .partial file.
828 0 : let mut partial_path = wal_file_path.to_owned();
829 0 : partial_path.set_extension("partial");
830 0 : if let Ok(opened_file) = tokio::fs::File::open(&wal_file_partial_path).await {
831 0 : return Ok((opened_file, true));
832 0 : }
833 :
834 : // If that failed, try it without the .partial extension.
835 0 : let pf = tokio::fs::File::open(&wal_file_path)
836 0 : .await
837 0 : .with_context(|| format!("failed to open WAL file {:#}", wal_file_path))
838 0 : .map_err(|e| {
839 0 : warn!("{}", e);
840 0 : e
841 0 : })?;
842 :
843 0 : Ok((pf, false))
844 0 : }
845 :
846 : /// Helper returning full path to WAL segment file and its .partial brother.
847 0 : pub fn wal_file_paths(
848 0 : timeline_dir: &Utf8Path,
849 0 : segno: XLogSegNo,
850 0 : wal_seg_size: usize,
851 0 : ) -> (Utf8PathBuf, Utf8PathBuf) {
852 0 : let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size);
853 0 : let wal_file_path = timeline_dir.join(wal_file_name.clone());
854 0 : let wal_file_partial_path = timeline_dir.join(wal_file_name + ".partial");
855 0 : (wal_file_path, wal_file_partial_path)
856 0 : }
|