Line data Source code
1 : use std::sync::Arc;
2 :
3 : use async_stream::try_stream;
4 : use bytes::Bytes;
5 : use futures::Stream;
6 : use postgres_backend::CopyStreamHandlerEnd;
7 : use std::time::Duration;
8 : use tokio::time::timeout;
9 : use utils::lsn::Lsn;
10 :
11 : use crate::{
12 : safekeeper::Term,
13 : send_wal::{EndWatch, WalSenderGuard},
14 : timeline::WalResidentTimeline,
15 : };
16 :
17 : pub(crate) struct WalReaderStreamBuilder {
18 : pub(crate) tli: WalResidentTimeline,
19 : pub(crate) start_pos: Lsn,
20 : pub(crate) end_pos: Lsn,
21 : pub(crate) term: Option<Term>,
22 : pub(crate) end_watch: EndWatch,
23 : pub(crate) wal_sender_guard: Arc<WalSenderGuard>,
24 : }
25 :
26 : impl WalReaderStreamBuilder {
27 0 : pub(crate) fn start_pos(&self) -> Lsn {
28 0 : self.start_pos
29 0 : }
30 : }
31 :
32 : pub(crate) struct WalBytes {
33 : /// Raw PG WAL
34 : pub(crate) wal: Bytes,
35 : /// Start LSN of [`Self::wal`]
36 : pub(crate) wal_start_lsn: Lsn,
37 : /// End LSN of [`Self::wal`]
38 : pub(crate) wal_end_lsn: Lsn,
39 : /// End LSN of WAL available on the safekeeper
40 : pub(crate) commit_lsn: Lsn,
41 : }
42 :
43 : impl WalReaderStreamBuilder {
44 : /// Builds a stream of Postgres WAL starting from [`Self::start_pos`].
45 : /// The stream terminates when the receiver (pageserver) is fully caught up
46 : /// and there's no active computes.
47 0 : pub(crate) async fn build(
48 0 : self,
49 0 : buffer_size: usize,
50 0 : ) -> anyhow::Result<impl Stream<Item = Result<WalBytes, CopyStreamHandlerEnd>>> {
51 0 : // TODO(vlad): The code below duplicates functionality from [`crate::send_wal`].
52 0 : // We can make the raw WAL sender use this stream too and remove the duplication.
53 0 : let Self {
54 0 : tli,
55 0 : mut start_pos,
56 0 : mut end_pos,
57 0 : term,
58 0 : mut end_watch,
59 0 : wal_sender_guard,
60 0 : } = self;
61 0 : let mut wal_reader = tli.get_walreader(start_pos).await?;
62 0 : let mut buffer = vec![0; buffer_size];
63 :
64 : const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
65 :
66 0 : Ok(try_stream! {
67 0 : loop {
68 0 : let have_something_to_send = end_pos > start_pos;
69 0 :
70 0 : if !have_something_to_send {
71 0 : // wait for lsn
72 0 : let res = timeout(POLL_STATE_TIMEOUT, end_watch.wait_for_lsn(start_pos, term)).await;
73 0 : match res {
74 0 : Ok(ok) => {
75 0 : end_pos = ok?;
76 0 : },
77 0 : Err(_) => {
78 0 : if let EndWatch::Commit(_) = end_watch {
79 0 : if let Some(remote_consistent_lsn) = wal_sender_guard
80 0 : .walsenders()
81 0 : .get_ws_remote_consistent_lsn(wal_sender_guard.id())
82 0 : {
83 0 : if tli.should_walsender_stop(remote_consistent_lsn).await {
84 0 : // Terminate if there is nothing more to send.
85 0 : // Note that "ending streaming" part of the string is used by
86 0 : // pageserver to identify WalReceiverError::SuccessfulCompletion,
87 0 : // do not change this string without updating pageserver.
88 0 : return;
89 0 : }
90 0 : }
91 0 : }
92 0 :
93 0 : continue;
94 0 : }
95 0 : }
96 0 : }
97 0 :
98 0 :
99 0 : assert!(
100 0 : end_pos > start_pos,
101 0 : "nothing to send after waiting for WAL"
102 0 : );
103 0 :
104 0 : // try to send as much as available, capped by the buffer size
105 0 : let mut chunk_end_pos = start_pos + buffer_size as u64;
106 0 : // if we went behind available WAL, back off
107 0 : if chunk_end_pos >= end_pos {
108 0 : chunk_end_pos = end_pos;
109 0 : } else {
110 0 : // If sending not up to end pos, round down to page boundary to
111 0 : // avoid breaking WAL record not at page boundary, as protocol
112 0 : // demands. See walsender.c (XLogSendPhysical).
113 0 : chunk_end_pos = chunk_end_pos
114 0 : .checked_sub(chunk_end_pos.block_offset())
115 0 : .unwrap();
116 0 : }
117 0 : let send_size = (chunk_end_pos.0 - start_pos.0) as usize;
118 0 : let buffer = &mut buffer[..send_size];
119 0 : let send_size: usize;
120 0 : {
121 0 : // If uncommitted part is being pulled, check that the term is
122 0 : // still the expected one.
123 0 : let _term_guard = if let Some(t) = term {
124 0 : Some(tli.acquire_term(t).await?)
125 0 : } else {
126 0 : None
127 0 : };
128 0 : // Read WAL into buffer. send_size can be additionally capped to
129 0 : // segment boundary here.
130 0 : send_size = wal_reader.read(buffer).await?
131 0 : };
132 0 : let wal = Bytes::copy_from_slice(&buffer[..send_size]);
133 0 :
134 0 : yield WalBytes {
135 0 : wal,
136 0 : wal_start_lsn: start_pos,
137 0 : wal_end_lsn: start_pos + send_size as u64,
138 0 : commit_lsn: end_pos
139 0 : };
140 0 :
141 0 : start_pos += send_size as u64;
142 0 : }
143 0 : })
144 0 : }
145 : }
|