Line data Source code
1 : //! This module has everything to deal with WAL -- reading and writing to disk.
2 : //!
3 : //! Safekeeper WAL is stored in the timeline directory, in format similar to pg_wal.
4 : //! PG timeline is always 1, so WAL segments are usually have names like this:
5 : //! - 000000010000000000000001
6 : //! - 000000010000000000000002.partial
7 : //!
8 : //! Note that last file has `.partial` suffix, that's different from postgres.
9 :
10 : use anyhow::{bail, Context, Result};
11 : use bytes::Bytes;
12 : use camino::{Utf8Path, Utf8PathBuf};
13 : use futures::future::BoxFuture;
14 : use postgres_ffi::v14::xlog_utils::{IsPartialXLogFileName, IsXLogFileName, XLogFromFileName};
15 : use postgres_ffi::{dispatch_pgversion, XLogSegNo, PG_TLI};
16 : use remote_storage::RemotePath;
17 : use std::cmp::{max, min};
18 : use std::io::{self, SeekFrom};
19 : use std::pin::Pin;
20 : use tokio::fs::{self, remove_file, File, OpenOptions};
21 : use tokio::io::{AsyncRead, AsyncWriteExt};
22 : use tokio::io::{AsyncReadExt, AsyncSeekExt};
23 : use tracing::*;
24 : use utils::crashsafe::durable_rename;
25 :
26 : use crate::metrics::{time_io_closure, WalStorageMetrics, REMOVED_WAL_SEGMENTS};
27 : use crate::state::TimelinePersistentState;
28 : use crate::wal_backup::{read_object, remote_timeline_path};
29 : use crate::SafeKeeperConf;
30 : use postgres_ffi::waldecoder::WalStreamDecoder;
31 : use postgres_ffi::XLogFileName;
32 : use postgres_ffi::XLOG_BLCKSZ;
33 : use pq_proto::SystemId;
34 : use utils::{id::TenantTimelineId, lsn::Lsn};
35 :
36 : #[async_trait::async_trait]
37 : pub trait Storage {
38 : /// LSN of last durably stored WAL record.
39 : fn flush_lsn(&self) -> Lsn;
40 :
41 : /// Initialize segment by creating proper long header at the beginning of
42 : /// the segment and short header at the page of given LSN. This is only used
43 : /// for timeline initialization because compute will stream data only since
44 : /// init_lsn. Other segment headers are included in compute stream.
45 : async fn initialize_first_segment(&mut self, init_lsn: Lsn) -> Result<()>;
46 :
47 : /// Write piece of WAL from buf to disk, but not necessarily sync it.
48 : async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()>;
49 :
50 : /// Truncate WAL at specified LSN, which must be the end of WAL record.
51 : async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()>;
52 :
53 : /// Durably store WAL on disk, up to the last written WAL record.
54 : async fn flush_wal(&mut self) -> Result<()>;
55 :
56 : /// Remove all segments <= given segno. Returns function doing that as we
57 : /// want to perform it without timeline lock.
58 : fn remove_up_to(&self, segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>>;
59 :
60 : /// Release resources associated with the storage -- technically, close FDs.
61 : /// Currently we don't remove timelines until restart (#3146), so need to
62 : /// spare descriptors. This would be useful for temporary tli detach as
63 : /// well.
64 0 : fn close(&mut self) {}
65 :
66 : /// Get metrics for this timeline.
67 : fn get_metrics(&self) -> WalStorageMetrics;
68 : }
69 :
70 : /// PhysicalStorage is a storage that stores WAL on disk. Writes are separated from flushes
71 : /// for better performance. Storage is initialized in the constructor.
72 : ///
73 : /// WAL is stored in segments, each segment is a file. Last segment has ".partial" suffix in
74 : /// its filename and may be not fully flushed.
75 : ///
76 : /// Relationship of LSNs:
77 : /// `write_lsn` >= `write_record_lsn` >= `flush_record_lsn`
78 : ///
79 : /// When storage is created first time, all LSNs are zeroes and there are no segments on disk.
80 : pub struct PhysicalStorage {
81 : metrics: WalStorageMetrics,
82 : timeline_dir: Utf8PathBuf,
83 : conf: SafeKeeperConf,
84 :
85 : /// Size of WAL segment in bytes.
86 : wal_seg_size: usize,
87 : pg_version: u32,
88 : system_id: u64,
89 :
90 : /// Written to disk, but possibly still in the cache and not fully persisted.
91 : /// Also can be ahead of record_lsn, if happen to be in the middle of a WAL record.
92 : write_lsn: Lsn,
93 :
94 : /// The LSN of the last WAL record written to disk. Still can be not fully flushed.
95 : write_record_lsn: Lsn,
96 :
97 : /// The LSN of the last WAL record flushed to disk.
98 : flush_record_lsn: Lsn,
99 :
100 : /// Decoder is required for detecting boundaries of WAL records.
101 : decoder: WalStreamDecoder,
102 :
103 : /// Cached open file for the last segment.
104 : ///
105 : /// If Some(file) is open, then it always:
106 : /// - has ".partial" suffix
107 : /// - points to write_lsn, so no seek is needed for writing
108 : /// - doesn't point to the end of the segment
109 : file: Option<File>,
110 :
111 : /// When false, we have just initialized storage using the LSN from find_end_of_wal().
112 : /// In this case, [`write_lsn`] can be less than actually written WAL on disk. In particular,
113 : /// there can be a case with unexpected .partial file.
114 : ///
115 : /// Imagine the following:
116 : /// - 000000010000000000000001
117 : /// - it was fully written, but the last record is split between 2 segments
118 : /// - after restart, `find_end_of_wal()` returned 0/1FFFFF0, which is in the end of this segment
119 : /// - `write_lsn`, `write_record_lsn` and `flush_record_lsn` were initialized to 0/1FFFFF0
120 : /// - 000000010000000000000002.partial
121 : /// - it has only 1 byte written, which is not enough to make a full WAL record
122 : ///
123 : /// Partial segment 002 has no WAL records, and it will be removed by the next truncate_wal().
124 : /// This flag will be set to true after the first truncate_wal() call.
125 : ///
126 : /// [`write_lsn`]: Self::write_lsn
127 : is_truncated_after_restart: bool,
128 : }
129 :
130 : impl PhysicalStorage {
131 : /// Create new storage. If commit_lsn is not zero, flush_lsn is tried to be restored from
132 : /// the disk. Otherwise, all LSNs are set to zero.
133 0 : pub fn new(
134 0 : ttid: &TenantTimelineId,
135 0 : timeline_dir: Utf8PathBuf,
136 0 : conf: &SafeKeeperConf,
137 0 : state: &TimelinePersistentState,
138 0 : ) -> Result<PhysicalStorage> {
139 0 : let wal_seg_size = state.server.wal_seg_size as usize;
140 :
141 : // Find out where stored WAL ends, starting at commit_lsn which is a
142 : // known recent record boundary (unless we don't have WAL at all).
143 : //
144 : // NB: find_end_of_wal MUST be backwards compatible with the previously
145 : // written WAL. If find_end_of_wal fails to read any WAL written by an
146 : // older version of the code, we could lose data forever.
147 0 : let write_lsn = if state.commit_lsn == Lsn(0) {
148 0 : Lsn(0)
149 : } else {
150 0 : let version = state.server.pg_version / 10000;
151 0 :
152 0 : dispatch_pgversion!(
153 0 : version,
154 0 : pgv::xlog_utils::find_end_of_wal(
155 0 : timeline_dir.as_std_path(),
156 0 : wal_seg_size,
157 0 : state.commit_lsn,
158 0 : )?,
159 0 : bail!("unsupported postgres version: {}", version)
160 : )
161 : };
162 :
163 : // TODO: do we really know that write_lsn is fully flushed to disk?
164 : // If not, maybe it's better to call fsync() here to be sure?
165 0 : let flush_lsn = write_lsn;
166 0 :
167 0 : debug!(
168 0 : "initialized storage for timeline {}, flush_lsn={}, commit_lsn={}, peer_horizon_lsn={}",
169 : ttid.timeline_id, flush_lsn, state.commit_lsn, state.peer_horizon_lsn,
170 : );
171 0 : if flush_lsn < state.commit_lsn || flush_lsn < state.peer_horizon_lsn {
172 0 : warn!("timeline {} potential data loss: flush_lsn by find_end_of_wal is less than either commit_lsn or peer_horizon_lsn from control file", ttid.timeline_id);
173 0 : }
174 :
175 0 : Ok(PhysicalStorage {
176 0 : metrics: WalStorageMetrics::default(),
177 0 : timeline_dir,
178 0 : conf: conf.clone(),
179 0 : wal_seg_size,
180 0 : pg_version: state.server.pg_version,
181 0 : system_id: state.server.system_id,
182 0 : write_lsn,
183 0 : write_record_lsn: write_lsn,
184 0 : flush_record_lsn: flush_lsn,
185 0 : decoder: WalStreamDecoder::new(write_lsn, state.server.pg_version / 10000),
186 0 : file: None,
187 0 : is_truncated_after_restart: false,
188 0 : })
189 0 : }
190 :
191 : /// Get all known state of the storage.
192 0 : pub fn internal_state(&self) -> (Lsn, Lsn, Lsn, bool) {
193 0 : (
194 0 : self.write_lsn,
195 0 : self.write_record_lsn,
196 0 : self.flush_record_lsn,
197 0 : self.file.is_some(),
198 0 : )
199 0 : }
200 :
201 : /// Call fdatasync if config requires so.
202 0 : async fn fdatasync_file(&mut self, file: &File) -> Result<()> {
203 0 : if !self.conf.no_sync {
204 0 : self.metrics
205 0 : .observe_flush_seconds(time_io_closure(file.sync_data()).await?);
206 0 : }
207 0 : Ok(())
208 0 : }
209 :
210 : /// Open or create WAL segment file. Caller must call seek to the wanted position.
211 : /// Returns `file` and `is_partial`.
212 0 : async fn open_or_create(&mut self, segno: XLogSegNo) -> Result<(File, bool)> {
213 0 : let (wal_file_path, wal_file_partial_path) =
214 0 : wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
215 :
216 : // Try to open already completed segment
217 0 : if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path).await {
218 0 : Ok((file, false))
219 0 : } else if let Ok(file) = OpenOptions::new()
220 0 : .write(true)
221 0 : .open(&wal_file_partial_path)
222 0 : .await
223 : {
224 : // Try to open existing partial file
225 0 : Ok((file, true))
226 : } else {
227 : // Create and fill new partial file
228 : //
229 : // We're using fdatasync during WAL writing, so file size must not
230 : // change; to this end it is filled with zeros here. To avoid using
231 : // half initialized segment, first bake it under tmp filename and
232 : // then rename.
233 0 : let tmp_path = self.timeline_dir.join("waltmp");
234 : #[allow(clippy::suspicious_open_options)]
235 0 : let mut file = OpenOptions::new()
236 0 : .create(true)
237 0 : .write(true)
238 0 : .open(&tmp_path)
239 0 : .await
240 0 : .with_context(|| format!("Failed to open tmp wal file {:?}", &tmp_path))?;
241 :
242 0 : write_zeroes(&mut file, self.wal_seg_size).await?;
243 :
244 : // Note: this doesn't get into observe_flush_seconds metric. But
245 : // segment init should be separate metric, if any.
246 0 : if let Err(e) =
247 0 : durable_rename(&tmp_path, &wal_file_partial_path, !self.conf.no_sync).await
248 : {
249 : // Probably rename succeeded, but fsync of it failed. Remove
250 : // the file then to avoid using it.
251 0 : remove_file(wal_file_partial_path)
252 0 : .await
253 0 : .or_else(utils::fs_ext::ignore_not_found)?;
254 0 : return Err(e.into());
255 0 : }
256 0 : Ok((file, true))
257 : }
258 0 : }
259 :
260 : /// Write WAL bytes, which are known to be located in a single WAL segment.
261 0 : async fn write_in_segment(&mut self, segno: u64, xlogoff: usize, buf: &[u8]) -> Result<()> {
262 0 : let mut file = if let Some(file) = self.file.take() {
263 0 : file
264 : } else {
265 0 : let (mut file, is_partial) = self.open_or_create(segno).await?;
266 0 : assert!(is_partial, "unexpected write into non-partial segment file");
267 0 : file.seek(SeekFrom::Start(xlogoff as u64)).await?;
268 0 : file
269 : };
270 :
271 0 : file.write_all(buf).await?;
272 : // Note: flush just ensures write above reaches the OS (this is not
273 : // needed in case of sync IO as Write::write there calls directly write
274 : // syscall, but needed in case of async). It does *not* fsyncs the file.
275 0 : file.flush().await?;
276 :
277 0 : if xlogoff + buf.len() == self.wal_seg_size {
278 : // If we reached the end of a WAL segment, flush and close it.
279 0 : self.fdatasync_file(&file).await?;
280 :
281 : // Rename partial file to completed file
282 0 : let (wal_file_path, wal_file_partial_path) =
283 0 : wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
284 0 : fs::rename(wal_file_partial_path, wal_file_path).await?;
285 0 : } else {
286 0 : // otherwise, file can be reused later
287 0 : self.file = Some(file);
288 0 : }
289 :
290 0 : Ok(())
291 0 : }
292 :
293 : /// Writes WAL to the segment files, until everything is writed. If some segments
294 : /// are fully written, they are flushed to disk. The last (partial) segment can
295 : /// be flushed separately later.
296 : ///
297 : /// Updates `write_lsn`.
298 0 : async fn write_exact(&mut self, pos: Lsn, mut buf: &[u8]) -> Result<()> {
299 0 : if self.write_lsn != pos {
300 : // need to flush the file before discarding it
301 0 : if let Some(file) = self.file.take() {
302 0 : self.fdatasync_file(&file).await?;
303 0 : }
304 :
305 0 : self.write_lsn = pos;
306 0 : }
307 :
308 0 : while !buf.is_empty() {
309 : // Extract WAL location for this block
310 0 : let xlogoff = self.write_lsn.segment_offset(self.wal_seg_size);
311 0 : let segno = self.write_lsn.segment_number(self.wal_seg_size);
312 :
313 : // If crossing a WAL boundary, only write up until we reach wal segment size.
314 0 : let bytes_write = if xlogoff + buf.len() > self.wal_seg_size {
315 0 : self.wal_seg_size - xlogoff
316 : } else {
317 0 : buf.len()
318 : };
319 :
320 0 : self.write_in_segment(segno, xlogoff, &buf[..bytes_write])
321 0 : .await?;
322 0 : self.write_lsn += bytes_write as u64;
323 0 : buf = &buf[bytes_write..];
324 : }
325 :
326 0 : Ok(())
327 0 : }
328 : }
329 :
330 : #[async_trait::async_trait]
331 : impl Storage for PhysicalStorage {
332 : /// flush_lsn returns LSN of last durably stored WAL record.
333 0 : fn flush_lsn(&self) -> Lsn {
334 0 : self.flush_record_lsn
335 0 : }
336 :
337 0 : async fn initialize_first_segment(&mut self, init_lsn: Lsn) -> Result<()> {
338 0 : let segno = init_lsn.segment_number(self.wal_seg_size);
339 0 : let (mut file, _) = self.open_or_create(segno).await?;
340 0 : let major_pg_version = self.pg_version / 10000;
341 0 : let wal_seg =
342 0 : postgres_ffi::generate_wal_segment(segno, self.system_id, major_pg_version, init_lsn)?;
343 0 : file.seek(SeekFrom::Start(0)).await?;
344 0 : file.write_all(&wal_seg).await?;
345 0 : file.flush().await?;
346 0 : info!("initialized segno {} at lsn {}", segno, init_lsn);
347 0 : // note: file is *not* fsynced
348 0 : Ok(())
349 0 : }
350 :
351 : /// Write WAL to disk.
352 0 : async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
353 0 : // Disallow any non-sequential writes, which can result in gaps or overwrites.
354 0 : // If we need to move the pointer, use truncate_wal() instead.
355 0 : if self.write_lsn > startpos {
356 0 : bail!(
357 0 : "write_wal rewrites WAL written before, write_lsn={}, startpos={}",
358 0 : self.write_lsn,
359 0 : startpos
360 0 : );
361 0 : }
362 0 : if self.write_lsn < startpos && self.write_lsn != Lsn(0) {
363 0 : bail!(
364 0 : "write_wal creates gap in written WAL, write_lsn={}, startpos={}",
365 0 : self.write_lsn,
366 0 : startpos
367 0 : );
368 0 : }
369 0 :
370 0 : let write_seconds = time_io_closure(self.write_exact(startpos, buf)).await?;
371 0 : // WAL is written, updating write metrics
372 0 : self.metrics.observe_write_seconds(write_seconds);
373 0 : self.metrics.observe_write_bytes(buf.len());
374 0 :
375 0 : // figure out last record's end lsn for reporting (if we got the
376 0 : // whole record)
377 0 : if self.decoder.available() != startpos {
378 0 : info!(
379 0 : "restart decoder from {} to {}",
380 0 : self.decoder.available(),
381 0 : startpos,
382 0 : );
383 0 : let pg_version = self.decoder.pg_version;
384 0 : self.decoder = WalStreamDecoder::new(startpos, pg_version);
385 0 : }
386 0 : self.decoder.feed_bytes(buf);
387 0 : loop {
388 0 : match self.decoder.poll_decode()? {
389 0 : None => break, // no full record yet
390 0 : Some((lsn, _rec)) => {
391 0 : self.write_record_lsn = lsn;
392 0 : }
393 0 : }
394 0 : }
395 0 :
396 0 : Ok(())
397 0 : }
398 :
399 0 : async fn flush_wal(&mut self) -> Result<()> {
400 0 : if self.flush_record_lsn == self.write_record_lsn {
401 0 : // no need to do extra flush
402 0 : return Ok(());
403 0 : }
404 0 :
405 0 : if let Some(unflushed_file) = self.file.take() {
406 0 : self.fdatasync_file(&unflushed_file).await?;
407 0 : self.file = Some(unflushed_file);
408 0 : } else {
409 0 : // We have unflushed data (write_lsn != flush_lsn), but no file.
410 0 : // This should only happen if last file was fully written and flushed,
411 0 : // but haven't updated flush_lsn yet.
412 0 : if self.write_lsn.segment_offset(self.wal_seg_size) != 0 {
413 0 : bail!(
414 0 : "unexpected unflushed data with no open file, write_lsn={}, flush_lsn={}",
415 0 : self.write_lsn,
416 0 : self.flush_record_lsn
417 0 : );
418 0 : }
419 0 : }
420 0 :
421 0 : // everything is flushed now, let's update flush_lsn
422 0 : self.flush_record_lsn = self.write_record_lsn;
423 0 : Ok(())
424 0 : }
425 :
426 : /// Truncate written WAL by removing all WAL segments after the given LSN.
427 : /// end_pos must point to the end of the WAL record.
428 0 : async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
429 0 : // Streaming must not create a hole, so truncate cannot be called on non-written lsn
430 0 : if self.write_lsn != Lsn(0) && end_pos > self.write_lsn {
431 0 : bail!(
432 0 : "truncate_wal called on non-written WAL, write_lsn={}, end_pos={}",
433 0 : self.write_lsn,
434 0 : end_pos
435 0 : );
436 0 : }
437 0 :
438 0 : // Quick exit if nothing to do to avoid writing up to 16 MiB of zeros on
439 0 : // disk (this happens on each connect).
440 0 : if self.is_truncated_after_restart
441 0 : && end_pos == self.write_lsn
442 0 : && end_pos == self.flush_record_lsn
443 0 : {
444 0 : return Ok(());
445 0 : }
446 0 :
447 0 : // Close previously opened file, if any
448 0 : if let Some(unflushed_file) = self.file.take() {
449 0 : self.fdatasync_file(&unflushed_file).await?;
450 0 : }
451 0 :
452 0 : let xlogoff = end_pos.segment_offset(self.wal_seg_size);
453 0 : let segno = end_pos.segment_number(self.wal_seg_size);
454 0 :
455 0 : // Remove all segments after the given LSN.
456 0 : remove_segments_from_disk(&self.timeline_dir, self.wal_seg_size, |x| x > segno).await?;
457 0 :
458 0 : let (mut file, is_partial) = self.open_or_create(segno).await?;
459 0 :
460 0 : // Fill end with zeroes
461 0 : file.seek(SeekFrom::Start(xlogoff as u64)).await?;
462 0 : write_zeroes(&mut file, self.wal_seg_size - xlogoff).await?;
463 0 : self.fdatasync_file(&file).await?;
464 0 :
465 0 : if !is_partial {
466 0 : // Make segment partial once again
467 0 : let (wal_file_path, wal_file_partial_path) =
468 0 : wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
469 0 : fs::rename(wal_file_path, wal_file_partial_path).await?;
470 0 : }
471 0 :
472 0 : // Update LSNs
473 0 : self.write_lsn = end_pos;
474 0 : self.write_record_lsn = end_pos;
475 0 : self.flush_record_lsn = end_pos;
476 0 : self.is_truncated_after_restart = true;
477 0 : Ok(())
478 0 : }
479 :
480 0 : fn remove_up_to(&self, segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>> {
481 0 : let timeline_dir = self.timeline_dir.clone();
482 0 : let wal_seg_size = self.wal_seg_size;
483 0 : Box::pin(async move {
484 0 : remove_segments_from_disk(&timeline_dir, wal_seg_size, |x| x <= segno_up_to).await
485 0 : })
486 0 : }
487 :
488 0 : fn close(&mut self) {
489 0 : // close happens in destructor
490 0 : let _open_file = self.file.take();
491 0 : }
492 :
493 0 : fn get_metrics(&self) -> WalStorageMetrics {
494 0 : self.metrics.clone()
495 0 : }
496 : }
497 :
498 : /// Remove all WAL segments in timeline_dir that match the given predicate.
499 0 : async fn remove_segments_from_disk(
500 0 : timeline_dir: &Utf8Path,
501 0 : wal_seg_size: usize,
502 0 : remove_predicate: impl Fn(XLogSegNo) -> bool,
503 0 : ) -> Result<()> {
504 0 : let mut n_removed = 0;
505 0 : let mut min_removed = u64::MAX;
506 0 : let mut max_removed = u64::MIN;
507 :
508 0 : let mut entries = fs::read_dir(timeline_dir).await?;
509 0 : while let Some(entry) = entries.next_entry().await? {
510 0 : let entry_path = entry.path();
511 0 : let fname = entry_path.file_name().unwrap();
512 :
513 0 : if let Some(fname_str) = fname.to_str() {
514 : /* Ignore files that are not XLOG segments */
515 0 : if !IsXLogFileName(fname_str) && !IsPartialXLogFileName(fname_str) {
516 0 : continue;
517 0 : }
518 0 : let (segno, _) = XLogFromFileName(fname_str, wal_seg_size);
519 0 : if remove_predicate(segno) {
520 0 : remove_file(entry_path).await?;
521 0 : n_removed += 1;
522 0 : min_removed = min(min_removed, segno);
523 0 : max_removed = max(max_removed, segno);
524 0 : REMOVED_WAL_SEGMENTS.inc();
525 0 : }
526 0 : }
527 : }
528 :
529 0 : if n_removed > 0 {
530 0 : info!(
531 0 : "removed {} WAL segments [{}; {}]",
532 : n_removed, min_removed, max_removed
533 : );
534 0 : }
535 0 : Ok(())
536 0 : }
537 :
538 : pub struct WalReader {
539 : remote_path: RemotePath,
540 : timeline_dir: Utf8PathBuf,
541 : wal_seg_size: usize,
542 : pos: Lsn,
543 : wal_segment: Option<Pin<Box<dyn AsyncRead + Send + Sync>>>,
544 :
545 : // S3 will be used to read WAL if LSN is not available locally
546 : enable_remote_read: bool,
547 :
548 : // We don't have WAL locally if LSN is less than local_start_lsn
549 : local_start_lsn: Lsn,
550 : // We will respond with zero-ed bytes before this Lsn as long as
551 : // pos is in the same segment as timeline_start_lsn.
552 : timeline_start_lsn: Lsn,
553 : // integer version number of PostgreSQL, e.g. 14; 15; 16
554 : pg_version: u32,
555 : system_id: SystemId,
556 : timeline_start_segment: Option<Bytes>,
557 : }
558 :
559 : impl WalReader {
560 0 : pub fn new(
561 0 : ttid: &TenantTimelineId,
562 0 : timeline_dir: Utf8PathBuf,
563 0 : state: &TimelinePersistentState,
564 0 : start_pos: Lsn,
565 0 : enable_remote_read: bool,
566 0 : ) -> Result<Self> {
567 0 : if state.server.wal_seg_size == 0 || state.local_start_lsn == Lsn(0) {
568 0 : bail!("state uninitialized, no data to read");
569 0 : }
570 0 :
571 0 : // TODO: Upgrade to bail!() once we know this couldn't possibly happen
572 0 : if state.timeline_start_lsn == Lsn(0) {
573 0 : warn!("timeline_start_lsn uninitialized before initializing wal reader");
574 0 : }
575 :
576 0 : if start_pos
577 0 : < state
578 0 : .timeline_start_lsn
579 0 : .segment_lsn(state.server.wal_seg_size as usize)
580 : {
581 0 : bail!(
582 0 : "Requested streaming from {}, which is before the start of the timeline {}, and also doesn't start at the first segment of that timeline",
583 0 : start_pos,
584 0 : state.timeline_start_lsn
585 0 : );
586 0 : }
587 0 :
588 0 : Ok(Self {
589 0 : remote_path: remote_timeline_path(ttid)?,
590 0 : timeline_dir,
591 0 : wal_seg_size: state.server.wal_seg_size as usize,
592 0 : pos: start_pos,
593 0 : wal_segment: None,
594 0 : enable_remote_read,
595 0 : local_start_lsn: state.local_start_lsn,
596 0 : timeline_start_lsn: state.timeline_start_lsn,
597 0 : pg_version: state.server.pg_version / 10000,
598 0 : system_id: state.server.system_id,
599 0 : timeline_start_segment: None,
600 : })
601 0 : }
602 :
603 : /// Read WAL at current position into provided buf, returns number of bytes
604 : /// read. It can be smaller than buf size only if segment boundary is
605 : /// reached.
606 0 : pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
607 0 : // If this timeline is new, we may not have a full segment yet, so
608 0 : // we pad the first bytes of the timeline's first WAL segment with 0s
609 0 : if self.pos < self.timeline_start_lsn {
610 0 : debug_assert_eq!(
611 0 : self.pos.segment_number(self.wal_seg_size),
612 0 : self.timeline_start_lsn.segment_number(self.wal_seg_size)
613 : );
614 :
615 : // All bytes after timeline_start_lsn are in WAL, but those before
616 : // are not, so we manually construct an empty segment for the bytes
617 : // not available in this timeline.
618 0 : if self.timeline_start_segment.is_none() {
619 0 : let it = postgres_ffi::generate_wal_segment(
620 0 : self.timeline_start_lsn.segment_number(self.wal_seg_size),
621 0 : self.system_id,
622 0 : self.pg_version,
623 0 : self.timeline_start_lsn,
624 0 : )?;
625 0 : self.timeline_start_segment = Some(it);
626 0 : }
627 :
628 0 : assert!(self.timeline_start_segment.is_some());
629 0 : let segment = self.timeline_start_segment.take().unwrap();
630 0 :
631 0 : let seg_bytes = &segment[..];
632 0 :
633 0 : // How much of the current segment have we already consumed?
634 0 : let pos_seg_offset = self.pos.segment_offset(self.wal_seg_size);
635 0 :
636 0 : // How many bytes may we consume in total?
637 0 : let tl_start_seg_offset = self.timeline_start_lsn.segment_offset(self.wal_seg_size);
638 0 :
639 0 : debug_assert!(seg_bytes.len() > pos_seg_offset);
640 0 : debug_assert!(seg_bytes.len() > tl_start_seg_offset);
641 :
642 : // Copy as many bytes as possible into the buffer
643 0 : let len = (tl_start_seg_offset - pos_seg_offset).min(buf.len());
644 0 : buf[0..len].copy_from_slice(&seg_bytes[pos_seg_offset..pos_seg_offset + len]);
645 0 :
646 0 : self.pos += len as u64;
647 0 :
648 0 : // If we're done with the segment, we can release it's memory.
649 0 : // However, if we're not yet done, store it so that we don't have to
650 0 : // construct the segment the next time this function is called.
651 0 : if self.pos < self.timeline_start_lsn {
652 0 : self.timeline_start_segment = Some(segment);
653 0 : }
654 :
655 0 : return Ok(len);
656 0 : }
657 :
658 0 : let mut wal_segment = match self.wal_segment.take() {
659 0 : Some(reader) => reader,
660 0 : None => self.open_segment().await?,
661 : };
662 :
663 : // How much to read and send in message? We cannot cross the WAL file
664 : // boundary, and we don't want send more than provided buffer.
665 0 : let xlogoff = self.pos.segment_offset(self.wal_seg_size);
666 0 : let send_size = min(buf.len(), self.wal_seg_size - xlogoff);
667 0 :
668 0 : // Read some data from the file.
669 0 : let buf = &mut buf[0..send_size];
670 0 : let send_size = wal_segment.read_exact(buf).await?;
671 0 : self.pos += send_size as u64;
672 0 :
673 0 : // Decide whether to reuse this file. If we don't set wal_segment here
674 0 : // a new reader will be opened next time.
675 0 : if self.pos.segment_offset(self.wal_seg_size) != 0 {
676 0 : self.wal_segment = Some(wal_segment);
677 0 : }
678 :
679 0 : Ok(send_size)
680 0 : }
681 :
682 : /// Open WAL segment at the current position of the reader.
683 0 : async fn open_segment(&self) -> Result<Pin<Box<dyn AsyncRead + Send + Sync>>> {
684 0 : let xlogoff = self.pos.segment_offset(self.wal_seg_size);
685 0 : let segno = self.pos.segment_number(self.wal_seg_size);
686 0 : let wal_file_name = XLogFileName(PG_TLI, segno, self.wal_seg_size);
687 0 :
688 0 : // Try to open local file, if we may have WAL locally
689 0 : if self.pos >= self.local_start_lsn {
690 0 : let res = open_wal_file(&self.timeline_dir, segno, self.wal_seg_size).await;
691 0 : match res {
692 0 : Ok((mut file, _)) => {
693 0 : file.seek(SeekFrom::Start(xlogoff as u64)).await?;
694 0 : return Ok(Box::pin(file));
695 : }
696 0 : Err(e) => {
697 0 : let is_not_found = e.chain().any(|e| {
698 0 : if let Some(e) = e.downcast_ref::<io::Error>() {
699 0 : e.kind() == io::ErrorKind::NotFound
700 : } else {
701 0 : false
702 : }
703 0 : });
704 0 : if !is_not_found {
705 0 : return Err(e);
706 0 : }
707 : // NotFound is expected, fall through to remote read
708 : }
709 : };
710 0 : }
711 :
712 : // Try to open remote file, if remote reads are enabled
713 0 : if self.enable_remote_read {
714 0 : let remote_wal_file_path = self.remote_path.join(&wal_file_name);
715 0 : return read_object(&remote_wal_file_path, xlogoff as u64).await;
716 0 : }
717 0 :
718 0 : bail!("WAL segment is not found")
719 0 : }
720 : }
721 :
722 : /// Zero block for filling created WAL segments.
723 : const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ];
724 :
725 : /// Helper for filling file with zeroes.
726 0 : async fn write_zeroes(file: &mut File, mut count: usize) -> Result<()> {
727 0 : fail::fail_point!("sk-write-zeroes", |_| {
728 0 : info!("write_zeroes hit failpoint");
729 0 : Err(anyhow::anyhow!("failpoint: sk-write-zeroes"))
730 0 : });
731 :
732 0 : while count >= XLOG_BLCKSZ {
733 0 : file.write_all(ZERO_BLOCK).await?;
734 0 : count -= XLOG_BLCKSZ;
735 : }
736 0 : file.write_all(&ZERO_BLOCK[0..count]).await?;
737 0 : file.flush().await?;
738 0 : Ok(())
739 0 : }
740 :
741 : /// Helper function for opening WAL segment `segno` in `dir`. Returns file and
742 : /// whether it is .partial.
743 0 : pub(crate) async fn open_wal_file(
744 0 : timeline_dir: &Utf8Path,
745 0 : segno: XLogSegNo,
746 0 : wal_seg_size: usize,
747 0 : ) -> Result<(tokio::fs::File, bool)> {
748 0 : let (wal_file_path, wal_file_partial_path) = wal_file_paths(timeline_dir, segno, wal_seg_size)?;
749 :
750 : // First try to open the .partial file.
751 0 : let mut partial_path = wal_file_path.to_owned();
752 0 : partial_path.set_extension("partial");
753 0 : if let Ok(opened_file) = tokio::fs::File::open(&wal_file_partial_path).await {
754 0 : return Ok((opened_file, true));
755 0 : }
756 :
757 : // If that failed, try it without the .partial extension.
758 0 : let pf = tokio::fs::File::open(&wal_file_path)
759 0 : .await
760 0 : .with_context(|| format!("failed to open WAL file {:#}", wal_file_path))
761 0 : .map_err(|e| {
762 0 : warn!("{}", e);
763 0 : e
764 0 : })?;
765 :
766 0 : Ok((pf, false))
767 0 : }
768 :
769 : /// Helper returning full path to WAL segment file and its .partial brother.
770 0 : pub fn wal_file_paths(
771 0 : timeline_dir: &Utf8Path,
772 0 : segno: XLogSegNo,
773 0 : wal_seg_size: usize,
774 0 : ) -> Result<(Utf8PathBuf, Utf8PathBuf)> {
775 0 : let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size);
776 0 : let wal_file_path = timeline_dir.join(wal_file_name.clone());
777 0 : let wal_file_partial_path = timeline_dir.join(wal_file_name + ".partial");
778 0 : Ok((wal_file_path, wal_file_partial_path))
779 0 : }
|