Line data Source code
1 : //! This module has everything to deal with WAL -- reading and writing to disk.
2 : //!
3 : //! Safekeeper WAL is stored in the timeline directory, in format similar to pg_wal.
4 : //! PG timeline is always 1, so WAL segments are usually have names like this:
5 : //! - 000000010000000000000001
6 : //! - 000000010000000000000002.partial
7 : //!
8 : //! Note that last file has `.partial` suffix, that's different from postgres.
9 :
10 : use anyhow::{bail, Context, Result};
11 : use bytes::Bytes;
12 : use camino::{Utf8Path, Utf8PathBuf};
13 : use futures::future::BoxFuture;
14 : use postgres_ffi::v14::xlog_utils::{IsPartialXLogFileName, IsXLogFileName, XLogFromFileName};
15 : use postgres_ffi::{dispatch_pgversion, XLogSegNo, PG_TLI};
16 : use remote_storage::RemotePath;
17 : use std::cmp::{max, min};
18 : use std::io::{self, SeekFrom};
19 : use std::pin::Pin;
20 : use tokio::fs::{self, remove_file, File, OpenOptions};
21 : use tokio::io::{AsyncRead, AsyncWriteExt};
22 : use tokio::io::{AsyncReadExt, AsyncSeekExt};
23 : use tracing::*;
24 : use utils::crashsafe::durable_rename;
25 :
26 : use crate::metrics::{time_io_closure, WalStorageMetrics, REMOVED_WAL_SEGMENTS};
27 : use crate::state::TimelinePersistentState;
28 : use crate::wal_backup::read_object;
29 : use crate::SafeKeeperConf;
30 : use postgres_ffi::waldecoder::WalStreamDecoder;
31 : use postgres_ffi::XLogFileName;
32 : use postgres_ffi::XLOG_BLCKSZ;
33 : use pq_proto::SystemId;
34 : use utils::{id::TenantTimelineId, lsn::Lsn};
35 :
36 : #[async_trait::async_trait]
37 : pub trait Storage {
38 : /// LSN of last durably stored WAL record.
39 : fn flush_lsn(&self) -> Lsn;
40 :
41 : /// Write piece of WAL from buf to disk, but not necessarily sync it.
42 : async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()>;
43 :
44 : /// Truncate WAL at specified LSN, which must be the end of WAL record.
45 : async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()>;
46 :
47 : /// Durably store WAL on disk, up to the last written WAL record.
48 : async fn flush_wal(&mut self) -> Result<()>;
49 :
50 : /// Remove all segments <= given segno. Returns function doing that as we
51 : /// want to perform it without timeline lock.
52 : fn remove_up_to(&self, segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>>;
53 :
54 : /// Release resources associated with the storage -- technically, close FDs.
55 : /// Currently we don't remove timelines until restart (#3146), so need to
56 : /// spare descriptors. This would be useful for temporary tli detach as
57 : /// well.
58 0 : fn close(&mut self) {}
59 :
60 : /// Get metrics for this timeline.
61 : fn get_metrics(&self) -> WalStorageMetrics;
62 : }
63 :
64 : /// PhysicalStorage is a storage that stores WAL on disk. Writes are separated from flushes
65 : /// for better performance. Storage is initialized in the constructor.
66 : ///
67 : /// WAL is stored in segments, each segment is a file. Last segment has ".partial" suffix in
68 : /// its filename and may be not fully flushed.
69 : ///
70 : /// Relationship of LSNs:
71 : /// `write_lsn` >= `write_record_lsn` >= `flush_record_lsn`
72 : ///
73 : /// When storage is created first time, all LSNs are zeroes and there are no segments on disk.
74 : pub struct PhysicalStorage {
75 : metrics: WalStorageMetrics,
76 : timeline_dir: Utf8PathBuf,
77 : conf: SafeKeeperConf,
78 :
79 : /// Size of WAL segment in bytes.
80 : wal_seg_size: usize,
81 :
82 : /// Written to disk, but possibly still in the cache and not fully persisted.
83 : /// Also can be ahead of record_lsn, if happen to be in the middle of a WAL record.
84 : write_lsn: Lsn,
85 :
86 : /// The LSN of the last WAL record written to disk. Still can be not fully flushed.
87 : write_record_lsn: Lsn,
88 :
89 : /// The LSN of the last WAL record flushed to disk.
90 : flush_record_lsn: Lsn,
91 :
92 : /// Decoder is required for detecting boundaries of WAL records.
93 : decoder: WalStreamDecoder,
94 :
95 : /// Cached open file for the last segment.
96 : ///
97 : /// If Some(file) is open, then it always:
98 : /// - has ".partial" suffix
99 : /// - points to write_lsn, so no seek is needed for writing
100 : /// - doesn't point to the end of the segment
101 : file: Option<File>,
102 :
103 : /// When false, we have just initialized storage using the LSN from find_end_of_wal().
104 : /// In this case, [`write_lsn`] can be less than actually written WAL on disk. In particular,
105 : /// there can be a case with unexpected .partial file.
106 : ///
107 : /// Imagine the following:
108 : /// - 000000010000000000000001
109 : /// - it was fully written, but the last record is split between 2 segments
110 : /// - after restart, `find_end_of_wal()` returned 0/1FFFFF0, which is in the end of this segment
111 : /// - `write_lsn`, `write_record_lsn` and `flush_record_lsn` were initialized to 0/1FFFFF0
112 : /// - 000000010000000000000002.partial
113 : /// - it has only 1 byte written, which is not enough to make a full WAL record
114 : ///
115 : /// Partial segment 002 has no WAL records, and it will be removed by the next truncate_wal().
116 : /// This flag will be set to true after the first truncate_wal() call.
117 : ///
118 : /// [`write_lsn`]: Self::write_lsn
119 : is_truncated_after_restart: bool,
120 : }
121 :
122 : impl PhysicalStorage {
123 : /// Create new storage. If commit_lsn is not zero, flush_lsn is tried to be restored from
124 : /// the disk. Otherwise, all LSNs are set to zero.
125 661 : pub fn new(
126 661 : ttid: &TenantTimelineId,
127 661 : timeline_dir: Utf8PathBuf,
128 661 : conf: &SafeKeeperConf,
129 661 : state: &TimelinePersistentState,
130 661 : ) -> Result<PhysicalStorage> {
131 661 : let wal_seg_size = state.server.wal_seg_size as usize;
132 :
133 : // Find out where stored WAL ends, starting at commit_lsn which is a
134 : // known recent record boundary (unless we don't have WAL at all).
135 : //
136 : // NB: find_end_of_wal MUST be backwards compatible with the previously
137 : // written WAL. If find_end_of_wal fails to read any WAL written by an
138 : // older version of the code, we could lose data forever.
139 661 : let write_lsn = if state.commit_lsn == Lsn(0) {
140 476 : Lsn(0)
141 : } else {
142 185 : let version = state.server.pg_version / 10000;
143 185 :
144 185 : dispatch_pgversion!(
145 185 : version,
146 : pgv::xlog_utils::find_end_of_wal(
147 185 : timeline_dir.as_std_path(),
148 185 : wal_seg_size,
149 185 : state.commit_lsn,
150 0 : )?,
151 0 : bail!("unsupported postgres version: {}", version)
152 : )
153 : };
154 :
155 : // TODO: do we really know that write_lsn is fully flushed to disk?
156 : // If not, maybe it's better to call fsync() here to be sure?
157 661 : let flush_lsn = write_lsn;
158 661 :
159 661 : debug!(
160 0 : "initialized storage for timeline {}, flush_lsn={}, commit_lsn={}, peer_horizon_lsn={}",
161 0 : ttid.timeline_id, flush_lsn, state.commit_lsn, state.peer_horizon_lsn,
162 0 : );
163 661 : if flush_lsn < state.commit_lsn || flush_lsn < state.peer_horizon_lsn {
164 0 : warn!("timeline {} potential data loss: flush_lsn by find_end_of_wal is less than either commit_lsn or peer_horizon_lsn from control file", ttid.timeline_id);
165 661 : }
166 :
167 661 : Ok(PhysicalStorage {
168 661 : metrics: WalStorageMetrics::default(),
169 661 : timeline_dir,
170 661 : conf: conf.clone(),
171 661 : wal_seg_size,
172 661 : write_lsn,
173 661 : write_record_lsn: write_lsn,
174 661 : flush_record_lsn: flush_lsn,
175 661 : decoder: WalStreamDecoder::new(write_lsn, state.server.pg_version / 10000),
176 661 : file: None,
177 661 : is_truncated_after_restart: false,
178 661 : })
179 661 : }
180 :
181 : /// Get all known state of the storage.
182 5 : pub fn internal_state(&self) -> (Lsn, Lsn, Lsn, bool) {
183 5 : (
184 5 : self.write_lsn,
185 5 : self.write_record_lsn,
186 5 : self.flush_record_lsn,
187 5 : self.file.is_some(),
188 5 : )
189 5 : }
190 :
191 : /// Call fdatasync if config requires so.
192 977169 : async fn fdatasync_file(&mut self, file: &File) -> Result<()> {
193 977169 : if !self.conf.no_sync {
194 0 : self.metrics
195 0 : .observe_flush_seconds(time_io_closure(file.sync_data()).await?);
196 977169 : }
197 977169 : Ok(())
198 977169 : }
199 :
200 : /// Open or create WAL segment file. Caller must call seek to the wanted position.
201 : /// Returns `file` and `is_partial`.
202 1900 : async fn open_or_create(&mut self, segno: XLogSegNo) -> Result<(File, bool)> {
203 1900 : let (wal_file_path, wal_file_partial_path) =
204 1900 : wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
205 :
206 : // Try to open already completed segment
207 1900 : if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path).await {
208 0 : Ok((file, false))
209 1900 : } else if let Ok(file) = OpenOptions::new()
210 1900 : .write(true)
211 1900 : .open(&wal_file_partial_path)
212 1855 : .await
213 : {
214 : // Try to open existing partial file
215 644 : Ok((file, true))
216 : } else {
217 : // Create and fill new partial file
218 : //
219 : // We're using fdatasync during WAL writing, so file size must not
220 : // change; to this end it is filled with zeros here. To avoid using
221 : // half initialized segment, first bake it under tmp filename and
222 : // then rename.
223 1256 : let tmp_path = self.timeline_dir.join("waltmp");
224 1256 : let mut file = OpenOptions::new()
225 1256 : .create(true)
226 1256 : .write(true)
227 1256 : .open(&tmp_path)
228 1220 : .await
229 1256 : .with_context(|| format!("Failed to open tmp wal file {:?}", &tmp_path))?;
230 :
231 2414878 : write_zeroes(&mut file, self.wal_seg_size).await?;
232 :
233 : // Note: this doesn't get into observe_flush_seconds metric. But
234 : // segment init should be separate metric, if any.
235 0 : if let Err(e) =
236 1254 : durable_rename(&tmp_path, &wal_file_partial_path, !self.conf.no_sync).await
237 : {
238 : // Probably rename succeeded, but fsync of it failed. Remove
239 : // the file then to avoid using it.
240 0 : remove_file(wal_file_partial_path)
241 0 : .await
242 0 : .or_else(utils::fs_ext::ignore_not_found)?;
243 0 : return Err(e.into());
244 1254 : }
245 1254 : Ok((file, true))
246 : }
247 1900 : }
248 :
249 : /// Write WAL bytes, which are known to be located in a single WAL segment.
250 1713279 : async fn write_in_segment(&mut self, segno: u64, xlogoff: usize, buf: &[u8]) -> Result<()> {
251 1713279 : let mut file = if let Some(file) = self.file.take() {
252 1711940 : file
253 : } else {
254 1479199 : let (mut file, is_partial) = self.open_or_create(segno).await?;
255 1337 : assert!(is_partial, "unexpected write into non-partial segment file");
256 1337 : file.seek(SeekFrom::Start(xlogoff as u64)).await?;
257 1337 : file
258 : };
259 :
260 1713277 : file.write_all(buf).await?;
261 : // Note: flush just ensures write above reaches the OS (this is not
262 : // needed in case of sync IO as Write::write there calls directly write
263 : // syscall, but needed in case of async). It does *not* fsyncs the file.
264 1713277 : file.flush().await?;
265 :
266 1713277 : if xlogoff + buf.len() == self.wal_seg_size {
267 : // If we reached the end of a WAL segment, flush and close it.
268 780 : self.fdatasync_file(&file).await?;
269 :
270 : // Rename partial file to completed file
271 780 : let (wal_file_path, wal_file_partial_path) =
272 780 : wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
273 780 : fs::rename(wal_file_partial_path, wal_file_path).await?;
274 1712497 : } else {
275 1712497 : // otherwise, file can be reused later
276 1712497 : self.file = Some(file);
277 1712497 : }
278 :
279 1713277 : Ok(())
280 1713279 : }
281 :
282 : /// Writes WAL to the segment files, until everything is writed. If some segments
283 : /// are fully written, they are flushed to disk. The last (partial) segment can
284 : /// be flushed separately later.
285 : ///
286 : /// Updates `write_lsn`.
287 1712564 : async fn write_exact(&mut self, pos: Lsn, mut buf: &[u8]) -> Result<()> {
288 1712564 : if self.write_lsn != pos {
289 : // need to flush the file before discarding it
290 0 : if let Some(file) = self.file.take() {
291 0 : self.fdatasync_file(&file).await?;
292 0 : }
293 :
294 0 : self.write_lsn = pos;
295 1712564 : }
296 :
297 3425841 : while !buf.is_empty() {
298 : // Extract WAL location for this block
299 1713279 : let xlogoff = self.write_lsn.segment_offset(self.wal_seg_size);
300 1713279 : let segno = self.write_lsn.segment_number(self.wal_seg_size);
301 :
302 : // If crossing a WAL boundary, only write up until we reach wal segment size.
303 1713279 : let bytes_write = if xlogoff + buf.len() > self.wal_seg_size {
304 715 : self.wal_seg_size - xlogoff
305 : } else {
306 1712564 : buf.len()
307 : };
308 :
309 1713279 : self.write_in_segment(segno, xlogoff, &buf[..bytes_write])
310 3147876 : .await?;
311 1713277 : self.write_lsn += bytes_write as u64;
312 1713277 : buf = &buf[bytes_write..];
313 : }
314 :
315 1712562 : Ok(())
316 1712564 : }
317 : }
318 :
319 : #[async_trait::async_trait]
320 : impl Storage for PhysicalStorage {
321 : /// flush_lsn returns LSN of last durably stored WAL record.
322 8710085 : fn flush_lsn(&self) -> Lsn {
323 8710085 : self.flush_record_lsn
324 8710085 : }
325 :
326 : /// Write WAL to disk.
327 1712564 : async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
328 : // Disallow any non-sequential writes, which can result in gaps or overwrites.
329 : // If we need to move the pointer, use truncate_wal() instead.
330 1712564 : if self.write_lsn > startpos {
331 0 : bail!(
332 0 : "write_wal rewrites WAL written before, write_lsn={}, startpos={}",
333 0 : self.write_lsn,
334 0 : startpos
335 0 : );
336 1712564 : }
337 1712564 : if self.write_lsn < startpos && self.write_lsn != Lsn(0) {
338 0 : bail!(
339 0 : "write_wal creates gap in written WAL, write_lsn={}, startpos={}",
340 0 : self.write_lsn,
341 0 : startpos
342 0 : );
343 1712564 : }
344 :
345 3147876 : let write_seconds = time_io_closure(self.write_exact(startpos, buf)).await?;
346 : // WAL is written, updating write metrics
347 1712562 : self.metrics.observe_write_seconds(write_seconds);
348 1712562 : self.metrics.observe_write_bytes(buf.len());
349 1712562 :
350 1712562 : // figure out last record's end lsn for reporting (if we got the
351 1712562 : // whole record)
352 1712562 : if self.decoder.available() != startpos {
353 483 : info!(
354 483 : "restart decoder from {} to {}",
355 483 : self.decoder.available(),
356 483 : startpos,
357 483 : );
358 483 : let pg_version = self.decoder.pg_version;
359 483 : self.decoder = WalStreamDecoder::new(startpos, pg_version);
360 1712079 : }
361 1712562 : self.decoder.feed_bytes(buf);
362 : loop {
363 83977895 : match self.decoder.poll_decode()? {
364 1712562 : None => break, // no full record yet
365 82265333 : Some((lsn, _rec)) => {
366 82265333 : self.write_record_lsn = lsn;
367 82265333 : }
368 : }
369 : }
370 :
371 1712562 : Ok(())
372 5137692 : }
373 :
374 1662134 : async fn flush_wal(&mut self) -> Result<()> {
375 1662134 : if self.flush_record_lsn == self.write_record_lsn {
376 : // no need to do extra flush
377 686298 : return Ok(());
378 975836 : }
379 :
380 975836 : if let Some(unflushed_file) = self.file.take() {
381 975819 : self.fdatasync_file(&unflushed_file).await?;
382 975819 : self.file = Some(unflushed_file);
383 : } else {
384 : // We have unflushed data (write_lsn != flush_lsn), but no file.
385 : // This should only happen if last file was fully written and flushed,
386 : // but haven't updated flush_lsn yet.
387 17 : if self.write_lsn.segment_offset(self.wal_seg_size) != 0 {
388 0 : bail!(
389 0 : "unexpected unflushed data with no open file, write_lsn={}, flush_lsn={}",
390 0 : self.write_lsn,
391 0 : self.flush_record_lsn
392 0 : );
393 17 : }
394 : }
395 :
396 : // everything is flushed now, let's update flush_lsn
397 975836 : self.flush_record_lsn = self.write_record_lsn;
398 975836 : Ok(())
399 4986402 : }
400 :
401 : /// Truncate written WAL by removing all WAL segments after the given LSN.
402 : /// end_pos must point to the end of the WAL record.
403 868 : async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
404 : // Streaming must not create a hole, so truncate cannot be called on non-written lsn
405 868 : if self.write_lsn != Lsn(0) && end_pos > self.write_lsn {
406 0 : bail!(
407 0 : "truncate_wal called on non-written WAL, write_lsn={}, end_pos={}",
408 0 : self.write_lsn,
409 0 : end_pos
410 0 : );
411 868 : }
412 868 :
413 868 : // Quick exit if nothing to do to avoid writing up to 16 MiB of zeros on
414 868 : // disk (this happens on each connect).
415 868 : if self.is_truncated_after_restart
416 316 : && end_pos == self.write_lsn
417 307 : && end_pos == self.flush_record_lsn
418 : {
419 307 : return Ok(());
420 561 : }
421 :
422 : // Close previously opened file, if any
423 561 : if let Some(unflushed_file) = self.file.take() {
424 9 : self.fdatasync_file(&unflushed_file).await?;
425 552 : }
426 :
427 561 : let xlogoff = end_pos.segment_offset(self.wal_seg_size);
428 561 : let segno = end_pos.segment_number(self.wal_seg_size);
429 561 :
430 561 : // Remove all segments after the given LSN.
431 561 : remove_segments_from_disk(&self.timeline_dir, self.wal_seg_size, |x| x > segno).await?;
432 :
433 941791 : let (mut file, is_partial) = self.open_or_create(segno).await?;
434 :
435 : // Fill end with zeroes
436 561 : file.seek(SeekFrom::Start(xlogoff as u64)).await?;
437 645393 : write_zeroes(&mut file, self.wal_seg_size - xlogoff).await?;
438 561 : self.fdatasync_file(&file).await?;
439 :
440 561 : if !is_partial {
441 : // Make segment partial once again
442 0 : let (wal_file_path, wal_file_partial_path) =
443 0 : wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
444 0 : fs::rename(wal_file_path, wal_file_partial_path).await?;
445 561 : }
446 :
447 : // Update LSNs
448 561 : self.write_lsn = end_pos;
449 561 : self.write_record_lsn = end_pos;
450 561 : self.flush_record_lsn = end_pos;
451 561 : self.is_truncated_after_restart = true;
452 561 : Ok(())
453 2604 : }
454 :
455 31 : fn remove_up_to(&self, segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>> {
456 31 : let timeline_dir = self.timeline_dir.clone();
457 31 : let wal_seg_size = self.wal_seg_size;
458 31 : Box::pin(async move {
459 65 : remove_segments_from_disk(&timeline_dir, wal_seg_size, |x| x <= segno_up_to).await
460 31 : })
461 31 : }
462 :
463 28 : fn close(&mut self) {
464 28 : // close happens in destructor
465 28 : let _open_file = self.file.take();
466 28 : }
467 :
468 51 : fn get_metrics(&self) -> WalStorageMetrics {
469 51 : self.metrics.clone()
470 51 : }
471 : }
472 :
473 : /// Remove all WAL segments in timeline_dir that match the given predicate.
474 592 : async fn remove_segments_from_disk(
475 592 : timeline_dir: &Utf8Path,
476 592 : wal_seg_size: usize,
477 592 : remove_predicate: impl Fn(XLogSegNo) -> bool,
478 592 : ) -> Result<()> {
479 592 : let mut n_removed = 0;
480 592 : let mut min_removed = u64::MAX;
481 592 : let mut max_removed = u64::MIN;
482 :
483 592 : let mut entries = fs::read_dir(timeline_dir).await?;
484 1446 : while let Some(entry) = entries.next_entry().await? {
485 854 : let entry_path = entry.path();
486 854 : let fname = entry_path.file_name().unwrap();
487 :
488 854 : if let Some(fname_str) = fname.to_str() {
489 : /* Ignore files that are not XLOG segments */
490 854 : if !IsXLogFileName(fname_str) && !IsPartialXLogFileName(fname_str) {
491 593 : continue;
492 261 : }
493 261 : let (segno, _) = XLogFromFileName(fname_str, wal_seg_size);
494 261 : if remove_predicate(segno) {
495 14 : remove_file(entry_path).await?;
496 14 : n_removed += 1;
497 14 : min_removed = min(min_removed, segno);
498 14 : max_removed = max(max_removed, segno);
499 14 : REMOVED_WAL_SEGMENTS.inc();
500 247 : }
501 0 : }
502 : }
503 :
504 592 : if n_removed > 0 {
505 7 : info!(
506 7 : "removed {} WAL segments [{}; {}]",
507 7 : n_removed, min_removed, max_removed
508 7 : );
509 585 : }
510 592 : Ok(())
511 592 : }
512 :
513 : pub struct WalReader {
514 : workdir: Utf8PathBuf,
515 : timeline_dir: Utf8PathBuf,
516 : wal_seg_size: usize,
517 : pos: Lsn,
518 : wal_segment: Option<Pin<Box<dyn AsyncRead + Send + Sync>>>,
519 :
520 : // S3 will be used to read WAL if LSN is not available locally
521 : enable_remote_read: bool,
522 :
523 : // We don't have WAL locally if LSN is less than local_start_lsn
524 : local_start_lsn: Lsn,
525 : // We will respond with zero-ed bytes before this Lsn as long as
526 : // pos is in the same segment as timeline_start_lsn.
527 : timeline_start_lsn: Lsn,
528 : // integer version number of PostgreSQL, e.g. 14; 15; 16
529 : pg_version: u32,
530 : system_id: SystemId,
531 : timeline_start_segment: Option<Bytes>,
532 : }
533 :
534 : impl WalReader {
535 868 : pub fn new(
536 868 : workdir: Utf8PathBuf,
537 868 : timeline_dir: Utf8PathBuf,
538 868 : state: &TimelinePersistentState,
539 868 : start_pos: Lsn,
540 868 : enable_remote_read: bool,
541 868 : ) -> Result<Self> {
542 868 : if state.server.wal_seg_size == 0 || state.local_start_lsn == Lsn(0) {
543 0 : bail!("state uninitialized, no data to read");
544 868 : }
545 868 :
546 868 : // TODO: Upgrade to bail!() once we know this couldn't possibly happen
547 868 : if state.timeline_start_lsn == Lsn(0) {
548 2 : warn!("timeline_start_lsn uninitialized before initializing wal reader");
549 866 : }
550 :
551 868 : if start_pos
552 868 : < state
553 868 : .timeline_start_lsn
554 868 : .segment_lsn(state.server.wal_seg_size as usize)
555 : {
556 0 : bail!(
557 0 : "Requested streaming from {}, which is before the start of the timeline {}, and also doesn't start at the first segment of that timeline",
558 0 : start_pos,
559 0 : state.timeline_start_lsn
560 0 : );
561 868 : }
562 868 :
563 868 : Ok(Self {
564 868 : workdir,
565 868 : timeline_dir,
566 868 : wal_seg_size: state.server.wal_seg_size as usize,
567 868 : pos: start_pos,
568 868 : wal_segment: None,
569 868 : enable_remote_read,
570 868 : local_start_lsn: state.local_start_lsn,
571 868 : timeline_start_lsn: state.timeline_start_lsn,
572 868 : pg_version: state.server.pg_version / 10000,
573 868 : system_id: state.server.system_id,
574 868 : timeline_start_segment: None,
575 868 : })
576 868 : }
577 :
578 : /// Read WAL at current position into provided buf, returns number of bytes
579 : /// read. It can be smaller than buf size only if segment boundary is
580 : /// reached.
581 807471 : pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
582 807471 : // If this timeline is new, we may not have a full segment yet, so
583 807471 : // we pad the first bytes of the timeline's first WAL segment with 0s
584 807471 : if self.pos < self.timeline_start_lsn {
585 : debug_assert_eq!(
586 318 : self.pos.segment_number(self.wal_seg_size),
587 318 : self.timeline_start_lsn.segment_number(self.wal_seg_size)
588 : );
589 :
590 : // All bytes after timeline_start_lsn are in WAL, but those before
591 : // are not, so we manually construct an empty segment for the bytes
592 : // not available in this timeline.
593 318 : if self.timeline_start_segment.is_none() {
594 6 : let it = postgres_ffi::generate_wal_segment(
595 6 : self.timeline_start_lsn.segment_number(self.wal_seg_size),
596 6 : self.system_id,
597 6 : self.pg_version,
598 6 : self.timeline_start_lsn,
599 6 : )?;
600 6 : self.timeline_start_segment = Some(it);
601 312 : }
602 :
603 318 : assert!(self.timeline_start_segment.is_some());
604 318 : let segment = self.timeline_start_segment.take().unwrap();
605 318 :
606 318 : let seg_bytes = &segment[..];
607 318 :
608 318 : // How much of the current segment have we already consumed?
609 318 : let pos_seg_offset = self.pos.segment_offset(self.wal_seg_size);
610 318 :
611 318 : // How many bytes may we consume in total?
612 318 : let tl_start_seg_offset = self.timeline_start_lsn.segment_offset(self.wal_seg_size);
613 :
614 318 : debug_assert!(seg_bytes.len() > pos_seg_offset);
615 318 : debug_assert!(seg_bytes.len() > tl_start_seg_offset);
616 :
617 : // Copy as many bytes as possible into the buffer
618 318 : let len = (tl_start_seg_offset - pos_seg_offset).min(buf.len());
619 318 : buf[0..len].copy_from_slice(&seg_bytes[pos_seg_offset..pos_seg_offset + len]);
620 318 :
621 318 : self.pos += len as u64;
622 318 :
623 318 : // If we're done with the segment, we can release it's memory.
624 318 : // However, if we're not yet done, store it so that we don't have to
625 318 : // construct the segment the next time this function is called.
626 318 : if self.pos < self.timeline_start_lsn {
627 312 : self.timeline_start_segment = Some(segment);
628 312 : }
629 :
630 318 : return Ok(len);
631 807153 : }
632 :
633 807153 : let mut wal_segment = match self.wal_segment.take() {
634 805520 : Some(reader) => reader,
635 3499 : None => self.open_segment().await?,
636 : };
637 :
638 : // How much to read and send in message? We cannot cross the WAL file
639 : // boundary, and we don't want send more than provided buffer.
640 807153 : let xlogoff = self.pos.segment_offset(self.wal_seg_size);
641 807153 : let send_size = min(buf.len(), self.wal_seg_size - xlogoff);
642 807153 :
643 807153 : // Read some data from the file.
644 807153 : let buf = &mut buf[0..send_size];
645 807153 : let send_size = wal_segment.read_exact(buf).await?;
646 807150 : self.pos += send_size as u64;
647 807150 :
648 807150 : // Decide whether to reuse this file. If we don't set wal_segment here
649 807150 : // a new reader will be opened next time.
650 807150 : if self.pos.segment_offset(self.wal_seg_size) != 0 {
651 806295 : self.wal_segment = Some(wal_segment);
652 806295 : }
653 :
654 807150 : Ok(send_size)
655 807468 : }
656 :
657 : /// Open WAL segment at the current position of the reader.
658 1633 : async fn open_segment(&self) -> Result<Pin<Box<dyn AsyncRead + Send + Sync>>> {
659 1633 : let xlogoff = self.pos.segment_offset(self.wal_seg_size);
660 1633 : let segno = self.pos.segment_number(self.wal_seg_size);
661 1633 : let wal_file_name = XLogFileName(PG_TLI, segno, self.wal_seg_size);
662 1633 : let wal_file_path = self.timeline_dir.join(wal_file_name);
663 1633 :
664 1633 : // Try to open local file, if we may have WAL locally
665 1633 : if self.pos >= self.local_start_lsn {
666 1824 : let res = Self::open_wal_file(&wal_file_path).await;
667 1631 : match res {
668 1595 : Ok(mut file) => {
669 1595 : file.seek(SeekFrom::Start(xlogoff as u64)).await?;
670 1595 : return Ok(Box::pin(file));
671 : }
672 36 : Err(e) => {
673 72 : let is_not_found = e.chain().any(|e| {
674 72 : if let Some(e) = e.downcast_ref::<io::Error>() {
675 36 : e.kind() == io::ErrorKind::NotFound
676 : } else {
677 36 : false
678 : }
679 72 : });
680 36 : if !is_not_found {
681 0 : return Err(e);
682 36 : }
683 : // NotFound is expected, fall through to remote read
684 : }
685 : };
686 2 : }
687 :
688 : // Try to open remote file, if remote reads are enabled
689 38 : if self.enable_remote_read {
690 38 : let remote_wal_file_path = wal_file_path
691 38 : .strip_prefix(&self.workdir)
692 38 : .context("Failed to strip workdir prefix")
693 38 : .and_then(RemotePath::new)
694 38 : .with_context(|| {
695 0 : format!(
696 0 : "Failed to resolve remote part of path {:?} for base {:?}",
697 0 : wal_file_path, self.workdir,
698 0 : )
699 38 : })?;
700 113 : return read_object(&remote_wal_file_path, xlogoff as u64).await;
701 0 : }
702 0 :
703 0 : bail!("WAL segment is not found")
704 1633 : }
705 :
706 : /// Helper function for opening a wal file.
707 1631 : async fn open_wal_file(wal_file_path: &Utf8Path) -> Result<tokio::fs::File> {
708 1631 : // First try to open the .partial file.
709 1631 : let mut partial_path = wal_file_path.to_owned();
710 1631 : partial_path.set_extension("partial");
711 1631 : if let Ok(opened_file) = tokio::fs::File::open(&partial_path).await {
712 1410 : return Ok(opened_file);
713 221 : }
714 221 :
715 221 : // If that failed, try it without the .partial extension.
716 221 : tokio::fs::File::open(&wal_file_path)
717 218 : .await
718 221 : .with_context(|| format!("Failed to open WAL file {:?}", wal_file_path))
719 221 : .map_err(|e| {
720 36 : warn!("{}", e);
721 36 : e
722 221 : })
723 1631 : }
724 : }
725 :
726 : /// Zero block for filling created WAL segments.
727 : const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ];
728 :
729 : /// Helper for filling file with zeroes.
730 1817 : async fn write_zeroes(file: &mut File, mut count: usize) -> Result<()> {
731 1817 : fail::fail_point!("sk-write-zeroes", |_| {
732 2 : info!("write_zeroes hit failpoint");
733 2 : Err(anyhow::anyhow!("failpoint: sk-write-zeroes"))
734 1817 : });
735 :
736 3235841 : while count >= XLOG_BLCKSZ {
737 3234026 : file.write_all(ZERO_BLOCK).await?;
738 3234026 : count -= XLOG_BLCKSZ;
739 : }
740 1815 : file.write_all(&ZERO_BLOCK[0..count]).await?;
741 1815 : file.flush().await?;
742 1815 : Ok(())
743 1817 : }
744 :
745 : /// Helper returning full path to WAL segment file and its .partial brother.
746 2728 : pub fn wal_file_paths(
747 2728 : timeline_dir: &Utf8Path,
748 2728 : segno: XLogSegNo,
749 2728 : wal_seg_size: usize,
750 2728 : ) -> Result<(Utf8PathBuf, Utf8PathBuf)> {
751 2728 : let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size);
752 2728 : let wal_file_path = timeline_dir.join(wal_file_name.clone());
753 2728 : let wal_file_partial_path = timeline_dir.join(wal_file_name + ".partial");
754 2728 : Ok((wal_file_path, wal_file_partial_path))
755 2728 : }
|