TLA Line data Source code
1 : //! This module has everything to deal with WAL -- reading and writing to disk.
2 : //!
3 : //! Safekeeper WAL is stored in the timeline directory, in format similar to pg_wal.
4 : //! PG timeline is always 1, so WAL segments are usually have names like this:
5 : //! - 000000010000000000000001
6 : //! - 000000010000000000000002.partial
7 : //!
8 : //! Note that last file has `.partial` suffix, that's different from postgres.
9 :
10 : use anyhow::{bail, Context, Result};
11 : use bytes::Bytes;
12 : use camino::{Utf8Path, Utf8PathBuf};
13 : use futures::future::BoxFuture;
14 : use postgres_ffi::v14::xlog_utils::{IsPartialXLogFileName, IsXLogFileName, XLogFromFileName};
15 : use postgres_ffi::{dispatch_pgversion, XLogSegNo, PG_TLI};
16 : use remote_storage::RemotePath;
17 : use std::cmp::{max, min};
18 : use std::io::{self, SeekFrom};
19 : use std::pin::Pin;
20 : use tokio::fs::{self, remove_file, File, OpenOptions};
21 : use tokio::io::{AsyncRead, AsyncWriteExt};
22 : use tokio::io::{AsyncReadExt, AsyncSeekExt};
23 : use tracing::*;
24 :
25 : use crate::metrics::{time_io_closure, WalStorageMetrics, REMOVED_WAL_SEGMENTS};
26 : use crate::safekeeper::SafeKeeperState;
27 : use crate::wal_backup::read_object;
28 : use crate::SafeKeeperConf;
29 : use postgres_ffi::waldecoder::WalStreamDecoder;
30 : use postgres_ffi::XLogFileName;
31 : use postgres_ffi::XLOG_BLCKSZ;
32 : use pq_proto::SystemId;
33 : use utils::{id::TenantTimelineId, lsn::Lsn};
34 :
35 : #[async_trait::async_trait]
36 : pub trait Storage {
37 : /// LSN of last durably stored WAL record.
38 : fn flush_lsn(&self) -> Lsn;
39 :
40 : /// Write piece of WAL from buf to disk, but not necessarily sync it.
41 : async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()>;
42 :
43 : /// Truncate WAL at specified LSN, which must be the end of WAL record.
44 : async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()>;
45 :
46 : /// Durably store WAL on disk, up to the last written WAL record.
47 : async fn flush_wal(&mut self) -> Result<()>;
48 :
49 : /// Remove all segments <= given segno. Returns function doing that as we
50 : /// want to perform it without timeline lock.
51 : fn remove_up_to(&self, segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>>;
52 :
53 : /// Release resources associated with the storage -- technically, close FDs.
54 : /// Currently we don't remove timelines until restart (#3146), so need to
55 : /// spare descriptors. This would be useful for temporary tli detach as
56 : /// well.
57 UBC 0 : fn close(&mut self) {}
58 :
59 : /// Get metrics for this timeline.
60 : fn get_metrics(&self) -> WalStorageMetrics;
61 : }
62 :
63 : /// PhysicalStorage is a storage that stores WAL on disk. Writes are separated from flushes
64 : /// for better performance. Storage is initialized in the constructor.
65 : ///
66 : /// WAL is stored in segments, each segment is a file. Last segment has ".partial" suffix in
67 : /// its filename and may be not fully flushed.
68 : ///
69 : /// Relationship of LSNs:
70 : /// `write_lsn` >= `write_record_lsn` >= `flush_record_lsn`
71 : ///
72 : /// When storage is created first time, all LSNs are zeroes and there are no segments on disk.
73 : pub struct PhysicalStorage {
74 : metrics: WalStorageMetrics,
75 : timeline_dir: Utf8PathBuf,
76 : conf: SafeKeeperConf,
77 :
78 : /// Size of WAL segment in bytes.
79 : wal_seg_size: usize,
80 :
81 : /// Written to disk, but possibly still in the cache and not fully persisted.
82 : /// Also can be ahead of record_lsn, if happen to be in the middle of a WAL record.
83 : write_lsn: Lsn,
84 :
85 : /// The LSN of the last WAL record written to disk. Still can be not fully flushed.
86 : write_record_lsn: Lsn,
87 :
88 : /// The LSN of the last WAL record flushed to disk.
89 : flush_record_lsn: Lsn,
90 :
91 : /// Decoder is required for detecting boundaries of WAL records.
92 : decoder: WalStreamDecoder,
93 :
94 : /// Cached open file for the last segment.
95 : ///
96 : /// If Some(file) is open, then it always:
97 : /// - has ".partial" suffix
98 : /// - points to write_lsn, so no seek is needed for writing
99 : /// - doesn't point to the end of the segment
100 : file: Option<File>,
101 :
102 : /// When false, we have just initialized storage using the LSN from find_end_of_wal().
103 : /// In this case, [`write_lsn`] can be less than actually written WAL on disk. In particular,
104 : /// there can be a case with unexpected .partial file.
105 : ///
106 : /// Imagine the following:
107 : /// - 000000010000000000000001
108 : /// - it was fully written, but the last record is split between 2 segments
109 : /// - after restart, `find_end_of_wal()` returned 0/1FFFFF0, which is in the end of this segment
110 : /// - `write_lsn`, `write_record_lsn` and `flush_record_lsn` were initialized to 0/1FFFFF0
111 : /// - 000000010000000000000002.partial
112 : /// - it has only 1 byte written, which is not enough to make a full WAL record
113 : ///
114 : /// Partial segment 002 has no WAL records, and it will be removed by the next truncate_wal().
115 : /// This flag will be set to true after the first truncate_wal() call.
116 : ///
117 : /// [`write_lsn`]: Self::write_lsn
118 : is_truncated_after_restart: bool,
119 : }
120 :
121 : impl PhysicalStorage {
122 : /// Create new storage. If commit_lsn is not zero, flush_lsn is tried to be restored from
123 : /// the disk. Otherwise, all LSNs are set to zero.
124 CBC 631 : pub fn new(
125 631 : ttid: &TenantTimelineId,
126 631 : timeline_dir: Utf8PathBuf,
127 631 : conf: &SafeKeeperConf,
128 631 : state: &SafeKeeperState,
129 631 : ) -> Result<PhysicalStorage> {
130 631 : let wal_seg_size = state.server.wal_seg_size as usize;
131 :
132 : // Find out where stored WAL ends, starting at commit_lsn which is a
133 : // known recent record boundary (unless we don't have WAL at all).
134 : //
135 : // NB: find_end_of_wal MUST be backwards compatible with the previously
136 : // written WAL. If find_end_of_wal fails to read any WAL written by an
137 : // older version of the code, we could lose data forever.
138 631 : let write_lsn = if state.commit_lsn == Lsn(0) {
139 446 : Lsn(0)
140 : } else {
141 185 : let version = state.server.pg_version / 10000;
142 185 :
143 185 : dispatch_pgversion!(
144 185 : version,
145 : pgv::xlog_utils::find_end_of_wal(
146 185 : timeline_dir.as_std_path(),
147 185 : wal_seg_size,
148 185 : state.commit_lsn,
149 UBC 0 : )?,
150 0 : bail!("unsupported postgres version: {}", version)
151 : )
152 : };
153 :
154 : // TODO: do we really know that write_lsn is fully flushed to disk?
155 : // If not, maybe it's better to call fsync() here to be sure?
156 CBC 631 : let flush_lsn = write_lsn;
157 631 :
158 631 : debug!(
159 UBC 0 : "initialized storage for timeline {}, flush_lsn={}, commit_lsn={}, peer_horizon_lsn={}",
160 0 : ttid.timeline_id, flush_lsn, state.commit_lsn, state.peer_horizon_lsn,
161 0 : );
162 CBC 631 : if flush_lsn < state.commit_lsn || flush_lsn < state.peer_horizon_lsn {
163 UBC 0 : warn!("timeline {} potential data loss: flush_lsn by find_end_of_wal is less than either commit_lsn or peer_horizon_lsn from control file", ttid.timeline_id);
164 CBC 631 : }
165 :
166 631 : Ok(PhysicalStorage {
167 631 : metrics: WalStorageMetrics::default(),
168 631 : timeline_dir,
169 631 : conf: conf.clone(),
170 631 : wal_seg_size,
171 631 : write_lsn,
172 631 : write_record_lsn: write_lsn,
173 631 : flush_record_lsn: flush_lsn,
174 631 : decoder: WalStreamDecoder::new(write_lsn, state.server.pg_version / 10000),
175 631 : file: None,
176 631 : is_truncated_after_restart: false,
177 631 : })
178 631 : }
179 :
180 : /// Get all known state of the storage.
181 5 : pub fn internal_state(&self) -> (Lsn, Lsn, Lsn, bool) {
182 5 : (
183 5 : self.write_lsn,
184 5 : self.write_record_lsn,
185 5 : self.flush_record_lsn,
186 5 : self.file.is_some(),
187 5 : )
188 5 : }
189 :
190 : /// Call fdatasync if config requires so.
191 761678 : async fn fdatasync_file(&mut self, file: &File) -> Result<()> {
192 761678 : if !self.conf.no_sync {
193 UBC 0 : self.metrics
194 0 : .observe_flush_seconds(time_io_closure(file.sync_data()).await?);
195 CBC 761678 : }
196 761678 : Ok(())
197 761678 : }
198 :
199 : /// Call fsync if config requires so.
200 1129 : async fn fsync_file(&mut self, file: &File) -> Result<()> {
201 1129 : if !self.conf.no_sync {
202 UBC 0 : self.metrics
203 0 : .observe_flush_seconds(time_io_closure(file.sync_all()).await?);
204 CBC 1129 : }
205 1129 : Ok(())
206 1129 : }
207 :
208 : /// Open or create WAL segment file. Caller must call seek to the wanted position.
209 : /// Returns `file` and `is_partial`.
210 1730 : async fn open_or_create(&mut self, segno: XLogSegNo) -> Result<(File, bool)> {
211 1730 : let (wal_file_path, wal_file_partial_path) =
212 1730 : wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
213 :
214 : // Try to open already completed segment
215 1730 : if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path).await {
216 GBC 1 : Ok((file, false))
217 CBC 1729 : } else if let Ok(file) = OpenOptions::new()
218 1729 : .write(true)
219 1729 : .open(&wal_file_partial_path)
220 1687 : .await
221 : {
222 : // Try to open existing partial file
223 599 : Ok((file, true))
224 : } else {
225 : // Create and fill new partial file
226 1130 : let mut file = OpenOptions::new()
227 1130 : .create(true)
228 1130 : .write(true)
229 1130 : .open(&wal_file_partial_path)
230 1098 : .await
231 1130 : .with_context(|| format!("Failed to open log file {:?}", &wal_file_path))?;
232 :
233 2178089 : write_zeroes(&mut file, self.wal_seg_size).await?;
234 1129 : self.fsync_file(&file).await?;
235 1129 : Ok((file, true))
236 : }
237 1729 : }
238 :
239 : /// Write WAL bytes, which are known to be located in a single WAL segment.
240 1217223 : async fn write_in_segment(&mut self, segno: u64, xlogoff: usize, buf: &[u8]) -> Result<()> {
241 1217223 : let mut file = if let Some(file) = self.file.take() {
242 1216018 : file
243 : } else {
244 1306008 : let (mut file, is_partial) = self.open_or_create(segno).await?;
245 1204 : assert!(is_partial, "unexpected write into non-partial segment file");
246 1204 : file.seek(SeekFrom::Start(xlogoff as u64)).await?;
247 1204 : file
248 : };
249 :
250 1217222 : file.write_all(buf).await?;
251 : // Note: flush just ensures write above reaches the OS (this is not
252 : // needed in case of sync IO as Write::write there calls directly write
253 : // syscall, but needed in case of async). It does *not* fsyncs the file.
254 1217222 : file.flush().await?;
255 :
256 1217222 : if xlogoff + buf.len() == self.wal_seg_size {
257 : // If we reached the end of a WAL segment, flush and close it.
258 686 : self.fdatasync_file(&file).await?;
259 :
260 : // Rename partial file to completed file
261 686 : let (wal_file_path, wal_file_partial_path) =
262 686 : wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
263 686 : fs::rename(wal_file_partial_path, wal_file_path).await?;
264 1216536 : } else {
265 1216536 : // otherwise, file can be reused later
266 1216536 : self.file = Some(file);
267 1216536 : }
268 :
269 1217222 : Ok(())
270 1217222 : }
271 :
272 : /// Writes WAL to the segment files, until everything is writed. If some segments
273 : /// are fully written, they are flushed to disk. The last (partial) segment can
274 : /// be flushed separately later.
275 : ///
276 : /// Updates `write_lsn`.
277 1216619 : async fn write_exact(&mut self, pos: Lsn, mut buf: &[u8]) -> Result<()> {
278 1216619 : if self.write_lsn != pos {
279 : // need to flush the file before discarding it
280 UBC 0 : if let Some(file) = self.file.take() {
281 0 : self.fdatasync_file(&file).await?;
282 0 : }
283 :
284 0 : self.write_lsn = pos;
285 CBC 1216619 : }
286 :
287 2433841 : while !buf.is_empty() {
288 : // Extract WAL location for this block
289 1217223 : let xlogoff = self.write_lsn.segment_offset(self.wal_seg_size);
290 1217223 : let segno = self.write_lsn.segment_number(self.wal_seg_size);
291 :
292 : // If crossing a WAL boundary, only write up until we reach wal segment size.
293 1217223 : let bytes_write = if xlogoff + buf.len() > self.wal_seg_size {
294 604 : self.wal_seg_size - xlogoff
295 : } else {
296 1216619 : buf.len()
297 : };
298 :
299 1217223 : self.write_in_segment(segno, xlogoff, &buf[..bytes_write])
300 2485345 : .await?;
301 1217222 : self.write_lsn += bytes_write as u64;
302 1217222 : buf = &buf[bytes_write..];
303 : }
304 :
305 1216618 : Ok(())
306 1216618 : }
307 : }
308 :
309 : #[async_trait::async_trait]
310 : impl Storage for PhysicalStorage {
311 : /// flush_lsn returns LSN of last durably stored WAL record.
312 6587983 : fn flush_lsn(&self) -> Lsn {
313 6587983 : self.flush_record_lsn
314 6587983 : }
315 :
316 : /// Write WAL to disk.
317 1216619 : async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
318 : // Disallow any non-sequential writes, which can result in gaps or overwrites.
319 : // If we need to move the pointer, use truncate_wal() instead.
320 1216619 : if self.write_lsn > startpos {
321 UBC 0 : bail!(
322 0 : "write_wal rewrites WAL written before, write_lsn={}, startpos={}",
323 0 : self.write_lsn,
324 0 : startpos
325 0 : );
326 CBC 1216619 : }
327 1216619 : if self.write_lsn < startpos && self.write_lsn != Lsn(0) {
328 UBC 0 : bail!(
329 0 : "write_wal creates gap in written WAL, write_lsn={}, startpos={}",
330 0 : self.write_lsn,
331 0 : startpos
332 0 : );
333 CBC 1216619 : }
334 :
335 2485345 : let write_seconds = time_io_closure(self.write_exact(startpos, buf)).await?;
336 : // WAL is written, updating write metrics
337 1216618 : self.metrics.observe_write_seconds(write_seconds);
338 1216618 : self.metrics.observe_write_bytes(buf.len());
339 1216618 :
340 1216618 : // figure out last record's end lsn for reporting (if we got the
341 1216618 : // whole record)
342 1216618 : if self.decoder.available() != startpos {
343 446 : info!(
344 446 : "restart decoder from {} to {}",
345 446 : self.decoder.available(),
346 446 : startpos,
347 446 : );
348 446 : let pg_version = self.decoder.pg_version;
349 446 : self.decoder = WalStreamDecoder::new(startpos, pg_version);
350 1216172 : }
351 1216618 : self.decoder.feed_bytes(buf);
352 : loop {
353 73820978 : match self.decoder.poll_decode()? {
354 1216618 : None => break, // no full record yet
355 72604360 : Some((lsn, _rec)) => {
356 72604360 : self.write_record_lsn = lsn;
357 72604360 : }
358 : }
359 : }
360 :
361 1216618 : Ok(())
362 2433237 : }
363 :
364 1295974 : async fn flush_wal(&mut self) -> Result<()> {
365 1295974 : if self.flush_record_lsn == self.write_record_lsn {
366 : // no need to do extra flush
367 535492 : return Ok(());
368 760482 : }
369 :
370 760482 : if let Some(unflushed_file) = self.file.take() {
371 760462 : self.fdatasync_file(&unflushed_file).await?;
372 760462 : self.file = Some(unflushed_file);
373 : } else {
374 : // We have unflushed data (write_lsn != flush_lsn), but no file.
375 : // This should only happen if last file was fully written and flushed,
376 : // but haven't updated flush_lsn yet.
377 20 : if self.write_lsn.segment_offset(self.wal_seg_size) != 0 {
378 UBC 0 : bail!(
379 0 : "unexpected unflushed data with no open file, write_lsn={}, flush_lsn={}",
380 0 : self.write_lsn,
381 0 : self.flush_record_lsn
382 0 : );
383 CBC 20 : }
384 : }
385 :
386 : // everything is flushed now, let's update flush_lsn
387 760482 : self.flush_record_lsn = self.write_record_lsn;
388 760482 : Ok(())
389 2591948 : }
390 :
391 : /// Truncate written WAL by removing all WAL segments after the given LSN.
392 : /// end_pos must point to the end of the WAL record.
393 834 : async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> {
394 : // Streaming must not create a hole, so truncate cannot be called on non-written lsn
395 834 : if self.write_lsn != Lsn(0) && end_pos > self.write_lsn {
396 UBC 0 : bail!(
397 0 : "truncate_wal called on non-written WAL, write_lsn={}, end_pos={}",
398 0 : self.write_lsn,
399 0 : end_pos
400 0 : );
401 CBC 834 : }
402 834 :
403 834 : // Quick exit if nothing to do to avoid writing up to 16 MiB of zeros on
404 834 : // disk (this happens on each connect).
405 834 : if self.is_truncated_after_restart
406 314 : && end_pos == self.write_lsn
407 309 : && end_pos == self.flush_record_lsn
408 : {
409 309 : return Ok(());
410 525 : }
411 :
412 : // Close previously opened file, if any
413 525 : if let Some(unflushed_file) = self.file.take() {
414 5 : self.fdatasync_file(&unflushed_file).await?;
415 520 : }
416 :
417 525 : let xlogoff = end_pos.segment_offset(self.wal_seg_size);
418 525 : let segno = end_pos.segment_number(self.wal_seg_size);
419 525 :
420 525 : // Remove all segments after the given LSN.
421 525 : remove_segments_from_disk(&self.timeline_dir, self.wal_seg_size, |x| x > segno).await?;
422 :
423 876551 : let (mut file, is_partial) = self.open_or_create(segno).await?;
424 :
425 : // Fill end with zeroes
426 525 : file.seek(SeekFrom::Start(xlogoff as u64)).await?;
427 611191 : write_zeroes(&mut file, self.wal_seg_size - xlogoff).await?;
428 525 : self.fdatasync_file(&file).await?;
429 :
430 525 : if !is_partial {
431 : // Make segment partial once again
432 GBC 1 : let (wal_file_path, wal_file_partial_path) =
433 1 : wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?;
434 1 : fs::rename(wal_file_path, wal_file_partial_path).await?;
435 CBC 524 : }
436 :
437 : // Update LSNs
438 525 : self.write_lsn = end_pos;
439 525 : self.write_record_lsn = end_pos;
440 525 : self.flush_record_lsn = end_pos;
441 525 : self.is_truncated_after_restart = true;
442 525 : Ok(())
443 1668 : }
444 :
445 29 : fn remove_up_to(&self, segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>> {
446 29 : let timeline_dir = self.timeline_dir.clone();
447 29 : let wal_seg_size = self.wal_seg_size;
448 29 : Box::pin(async move {
449 60 : remove_segments_from_disk(&timeline_dir, wal_seg_size, |x| x <= segno_up_to).await
450 29 : })
451 29 : }
452 :
453 25 : fn close(&mut self) {
454 25 : // close happens in destructor
455 25 : let _open_file = self.file.take();
456 25 : }
457 :
458 51 : fn get_metrics(&self) -> WalStorageMetrics {
459 51 : self.metrics.clone()
460 51 : }
461 : }
462 :
463 : /// Remove all WAL segments in timeline_dir that match the given predicate.
464 554 : async fn remove_segments_from_disk(
465 554 : timeline_dir: &Utf8Path,
466 554 : wal_seg_size: usize,
467 554 : remove_predicate: impl Fn(XLogSegNo) -> bool,
468 554 : ) -> Result<()> {
469 554 : let mut n_removed = 0;
470 554 : let mut min_removed = u64::MAX;
471 554 : let mut max_removed = u64::MIN;
472 :
473 554 : let mut entries = fs::read_dir(timeline_dir).await?;
474 1322 : while let Some(entry) = entries.next_entry().await? {
475 768 : let entry_path = entry.path();
476 768 : let fname = entry_path.file_name().unwrap();
477 :
478 768 : if let Some(fname_str) = fname.to_str() {
479 : /* Ignore files that are not XLOG segments */
480 768 : if !IsXLogFileName(fname_str) && !IsPartialXLogFileName(fname_str) {
481 554 : continue;
482 214 : }
483 214 : let (segno, _) = XLogFromFileName(fname_str, wal_seg_size);
484 214 : if remove_predicate(segno) {
485 15 : remove_file(entry_path).await?;
486 15 : n_removed += 1;
487 15 : min_removed = min(min_removed, segno);
488 15 : max_removed = max(max_removed, segno);
489 15 : REMOVED_WAL_SEGMENTS.inc();
490 199 : }
491 UBC 0 : }
492 : }
493 :
494 CBC 554 : if n_removed > 0 {
495 8 : info!(
496 8 : "removed {} WAL segments [{}; {}]",
497 8 : n_removed, min_removed, max_removed
498 8 : );
499 546 : }
500 554 : Ok(())
501 554 : }
502 :
503 : pub struct WalReader {
504 : workdir: Utf8PathBuf,
505 : timeline_dir: Utf8PathBuf,
506 : wal_seg_size: usize,
507 : pos: Lsn,
508 : wal_segment: Option<Pin<Box<dyn AsyncRead + Send + Sync>>>,
509 :
510 : // S3 will be used to read WAL if LSN is not available locally
511 : enable_remote_read: bool,
512 :
513 : // We don't have WAL locally if LSN is less than local_start_lsn
514 : local_start_lsn: Lsn,
515 : // We will respond with zero-ed bytes before this Lsn as long as
516 : // pos is in the same segment as timeline_start_lsn.
517 : timeline_start_lsn: Lsn,
518 : // integer version number of PostgreSQL, e.g. 14; 15; 16
519 : pg_version: u32,
520 : system_id: SystemId,
521 : timeline_start_segment: Option<Bytes>,
522 : }
523 :
524 : impl WalReader {
525 846 : pub fn new(
526 846 : workdir: Utf8PathBuf,
527 846 : timeline_dir: Utf8PathBuf,
528 846 : state: &SafeKeeperState,
529 846 : start_pos: Lsn,
530 846 : enable_remote_read: bool,
531 846 : ) -> Result<Self> {
532 846 : if state.server.wal_seg_size == 0 || state.local_start_lsn == Lsn(0) {
533 UBC 0 : bail!("state uninitialized, no data to read");
534 CBC 846 : }
535 846 :
536 846 : // TODO: Upgrade to bail!() once we know this couldn't possibly happen
537 846 : if state.timeline_start_lsn == Lsn(0) {
538 1 : warn!("timeline_start_lsn uninitialized before initializing wal reader");
539 845 : }
540 :
541 846 : if start_pos
542 846 : < state
543 846 : .timeline_start_lsn
544 846 : .segment_lsn(state.server.wal_seg_size as usize)
545 : {
546 UBC 0 : bail!(
547 0 : "Requested streaming from {}, which is before the start of the timeline {}, and also doesn't start at the first segment of that timeline",
548 0 : start_pos,
549 0 : state.timeline_start_lsn
550 0 : );
551 CBC 846 : }
552 846 :
553 846 : Ok(Self {
554 846 : workdir,
555 846 : timeline_dir,
556 846 : wal_seg_size: state.server.wal_seg_size as usize,
557 846 : pos: start_pos,
558 846 : wal_segment: None,
559 846 : enable_remote_read,
560 846 : local_start_lsn: state.local_start_lsn,
561 846 : timeline_start_lsn: state.timeline_start_lsn,
562 846 : pg_version: state.server.pg_version / 10000,
563 846 : system_id: state.server.system_id,
564 846 : timeline_start_segment: None,
565 846 : })
566 846 : }
567 :
568 : /// Read WAL at current position into provided buf, returns number of bytes
569 : /// read. It can be smaller than buf size only if segment boundary is
570 : /// reached.
571 576387 : pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
572 576387 : // If this timeline is new, we may not have a full segment yet, so
573 576387 : // we pad the first bytes of the timeline's first WAL segment with 0s
574 576387 : if self.pos < self.timeline_start_lsn {
575 : debug_assert_eq!(
576 265 : self.pos.segment_number(self.wal_seg_size),
577 265 : self.timeline_start_lsn.segment_number(self.wal_seg_size)
578 : );
579 :
580 : // All bytes after timeline_start_lsn are in WAL, but those before
581 : // are not, so we manually construct an empty segment for the bytes
582 : // not available in this timeline.
583 265 : if self.timeline_start_segment.is_none() {
584 5 : let it = postgres_ffi::generate_wal_segment(
585 5 : self.timeline_start_lsn.segment_number(self.wal_seg_size),
586 5 : self.system_id,
587 5 : self.pg_version,
588 5 : self.timeline_start_lsn,
589 5 : )?;
590 5 : self.timeline_start_segment = Some(it);
591 260 : }
592 :
593 265 : assert!(self.timeline_start_segment.is_some());
594 265 : let segment = self.timeline_start_segment.take().unwrap();
595 265 :
596 265 : let seg_bytes = &segment[..];
597 265 :
598 265 : // How much of the current segment have we already consumed?
599 265 : let pos_seg_offset = self.pos.segment_offset(self.wal_seg_size);
600 265 :
601 265 : // How many bytes may we consume in total?
602 265 : let tl_start_seg_offset = self.timeline_start_lsn.segment_offset(self.wal_seg_size);
603 :
604 265 : debug_assert!(seg_bytes.len() > pos_seg_offset);
605 265 : debug_assert!(seg_bytes.len() > tl_start_seg_offset);
606 :
607 : // Copy as many bytes as possible into the buffer
608 265 : let len = (tl_start_seg_offset - pos_seg_offset).min(buf.len());
609 265 : buf[0..len].copy_from_slice(&seg_bytes[pos_seg_offset..pos_seg_offset + len]);
610 265 :
611 265 : self.pos += len as u64;
612 265 :
613 265 : // If we're done with the segment, we can release it's memory.
614 265 : // However, if we're not yet done, store it so that we don't have to
615 265 : // construct the segment the next time this function is called.
616 265 : if self.pos < self.timeline_start_lsn {
617 260 : self.timeline_start_segment = Some(segment);
618 260 : }
619 :
620 265 : return Ok(len);
621 576122 : }
622 :
623 576122 : let mut wal_segment = match self.wal_segment.take() {
624 574734 : Some(reader) => reader,
625 2997 : None => self.open_segment().await?,
626 : };
627 :
628 : // How much to read and send in message? We cannot cross the WAL file
629 : // boundary, and we don't want send more than provided buffer.
630 576122 : let xlogoff = self.pos.segment_offset(self.wal_seg_size);
631 576122 : let send_size = min(buf.len(), self.wal_seg_size - xlogoff);
632 576122 :
633 576122 : // Read some data from the file.
634 576122 : let buf = &mut buf[0..send_size];
635 576122 : let send_size = wal_segment.read_exact(buf).await?;
636 576121 : self.pos += send_size as u64;
637 576121 :
638 576121 : // Decide whether to reuse this file. If we don't set wal_segment here
639 576121 : // a new reader will be opened next time.
640 576121 : if self.pos.segment_offset(self.wal_seg_size) != 0 {
641 575466 : self.wal_segment = Some(wal_segment);
642 575466 : }
643 :
644 576121 : Ok(send_size)
645 576386 : }
646 :
647 : /// Open WAL segment at the current position of the reader.
648 1388 : async fn open_segment(&self) -> Result<Pin<Box<dyn AsyncRead + Send + Sync>>> {
649 1388 : let xlogoff = self.pos.segment_offset(self.wal_seg_size);
650 1388 : let segno = self.pos.segment_number(self.wal_seg_size);
651 1388 : let wal_file_name = XLogFileName(PG_TLI, segno, self.wal_seg_size);
652 1388 : let wal_file_path = self.timeline_dir.join(wal_file_name);
653 1388 :
654 1388 : // Try to open local file, if we may have WAL locally
655 1388 : if self.pos >= self.local_start_lsn {
656 1581 : let res = Self::open_wal_file(&wal_file_path).await;
657 1386 : match res {
658 1350 : Ok(mut file) => {
659 1350 : file.seek(SeekFrom::Start(xlogoff as u64)).await?;
660 1350 : return Ok(Box::pin(file));
661 : }
662 36 : Err(e) => {
663 72 : let is_not_found = e.chain().any(|e| {
664 72 : if let Some(e) = e.downcast_ref::<io::Error>() {
665 36 : e.kind() == io::ErrorKind::NotFound
666 : } else {
667 36 : false
668 : }
669 72 : });
670 36 : if !is_not_found {
671 UBC 0 : return Err(e);
672 CBC 36 : }
673 : // NotFound is expected, fall through to remote read
674 : }
675 : };
676 2 : }
677 :
678 : // Try to open remote file, if remote reads are enabled
679 38 : if self.enable_remote_read {
680 38 : let remote_wal_file_path = wal_file_path
681 38 : .strip_prefix(&self.workdir)
682 38 : .context("Failed to strip workdir prefix")
683 38 : .and_then(RemotePath::new)
684 38 : .with_context(|| {
685 UBC 0 : format!(
686 0 : "Failed to resolve remote part of path {:?} for base {:?}",
687 0 : wal_file_path, self.workdir,
688 0 : )
689 CBC 38 : })?;
690 110 : return read_object(&remote_wal_file_path, xlogoff as u64).await;
691 UBC 0 : }
692 0 :
693 0 : bail!("WAL segment is not found")
694 CBC 1388 : }
695 :
696 : /// Helper function for opening a wal file.
697 1386 : async fn open_wal_file(wal_file_path: &Utf8Path) -> Result<tokio::fs::File> {
698 1386 : // First try to open the .partial file.
699 1386 : let mut partial_path = wal_file_path.to_owned();
700 1386 : partial_path.set_extension("partial");
701 1386 : if let Ok(opened_file) = tokio::fs::File::open(&partial_path).await {
702 1146 : return Ok(opened_file);
703 240 : }
704 240 :
705 240 : // If that failed, try it without the .partial extension.
706 240 : tokio::fs::File::open(&wal_file_path)
707 238 : .await
708 240 : .with_context(|| format!("Failed to open WAL file {:?}", wal_file_path))
709 240 : .map_err(|e| {
710 36 : warn!("{}", e);
711 36 : e
712 240 : })
713 1386 : }
714 : }
715 :
716 : /// Zero block for filling created WAL segments.
717 : const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ];
718 :
719 : /// Helper for filling file with zeroes.
720 1655 : async fn write_zeroes(file: &mut File, mut count: usize) -> Result<()> {
721 2947427 : while count >= XLOG_BLCKSZ {
722 2945773 : file.write_all(ZERO_BLOCK).await?;
723 2945772 : count -= XLOG_BLCKSZ;
724 : }
725 1654 : file.write_all(&ZERO_BLOCK[0..count]).await?;
726 1654 : file.flush().await?;
727 1654 : Ok(())
728 1654 : }
729 :
730 : /// Helper returning full path to WAL segment file and its .partial brother.
731 2465 : pub fn wal_file_paths(
732 2465 : timeline_dir: &Utf8Path,
733 2465 : segno: XLogSegNo,
734 2465 : wal_seg_size: usize,
735 2465 : ) -> Result<(Utf8PathBuf, Utf8PathBuf)> {
736 2465 : let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size);
737 2465 : let wal_file_path = timeline_dir.join(wal_file_name.clone());
738 2465 : let wal_file_partial_path = timeline_dir.join(wal_file_name + ".partial");
739 2465 : Ok((wal_file_path, wal_file_partial_path))
740 2465 : }
|