Line data Source code
1 : use std::sync::Arc;
2 :
3 : use async_stream::try_stream;
4 : use bytes::Bytes;
5 : use futures::Stream;
6 : use postgres_backend::CopyStreamHandlerEnd;
7 : use safekeeper_api::Term;
8 : use std::time::Duration;
9 : use tokio::time::timeout;
10 : use utils::lsn::Lsn;
11 :
12 : use crate::{
13 : send_wal::{EndWatch, WalSenderGuard},
14 : timeline::WalResidentTimeline,
15 : };
16 :
17 : pub(crate) struct WalReaderStreamBuilder {
18 : pub(crate) tli: WalResidentTimeline,
19 : pub(crate) start_pos: Lsn,
20 : pub(crate) end_pos: Lsn,
21 : pub(crate) term: Option<Term>,
22 : pub(crate) end_watch: EndWatch,
23 : pub(crate) wal_sender_guard: Arc<WalSenderGuard>,
24 : }
25 :
26 : impl WalReaderStreamBuilder {
27 0 : pub(crate) fn start_pos(&self) -> Lsn {
28 0 : self.start_pos
29 0 : }
30 : }
31 :
32 : pub(crate) struct WalBytes {
33 : /// Raw PG WAL
34 : pub(crate) wal: Bytes,
35 : /// Start LSN of [`Self::wal`]
36 : #[allow(dead_code)]
37 : pub(crate) wal_start_lsn: Lsn,
38 : /// End LSN of [`Self::wal`]
39 : pub(crate) wal_end_lsn: Lsn,
40 : /// End LSN of WAL available on the safekeeper.
41 : ///
42 : /// For pagservers this will be commit LSN,
43 : /// while for the compute it will be the flush LSN.
44 : pub(crate) available_wal_end_lsn: Lsn,
45 : }
46 :
47 : impl WalReaderStreamBuilder {
48 : /// Builds a stream of Postgres WAL starting from [`Self::start_pos`].
49 : /// The stream terminates when the receiver (pageserver) is fully caught up
50 : /// and there's no active computes.
51 0 : pub(crate) async fn build(
52 0 : self,
53 0 : buffer_size: usize,
54 0 : ) -> anyhow::Result<impl Stream<Item = Result<WalBytes, CopyStreamHandlerEnd>>> {
55 0 : // TODO(vlad): The code below duplicates functionality from [`crate::send_wal`].
56 0 : // We can make the raw WAL sender use this stream too and remove the duplication.
57 0 : let Self {
58 0 : tli,
59 0 : mut start_pos,
60 0 : mut end_pos,
61 0 : term,
62 0 : mut end_watch,
63 0 : wal_sender_guard,
64 0 : } = self;
65 0 : let mut wal_reader = tli.get_walreader(start_pos).await?;
66 0 : let mut buffer = vec![0; buffer_size];
67 :
68 : const POLL_STATE_TIMEOUT: Duration = Duration::from_secs(1);
69 :
70 0 : Ok(try_stream! {
71 0 : loop {
72 0 : let have_something_to_send = end_pos > start_pos;
73 0 :
74 0 : if !have_something_to_send {
75 0 : // wait for lsn
76 0 : let res = timeout(POLL_STATE_TIMEOUT, end_watch.wait_for_lsn(start_pos, term)).await;
77 0 : match res {
78 0 : Ok(ok) => {
79 0 : end_pos = ok?;
80 0 : },
81 0 : Err(_) => {
82 0 : if let EndWatch::Commit(_) = end_watch {
83 0 : if let Some(remote_consistent_lsn) = wal_sender_guard
84 0 : .walsenders()
85 0 : .get_ws_remote_consistent_lsn(wal_sender_guard.id())
86 0 : {
87 0 : if tli.should_walsender_stop(remote_consistent_lsn).await {
88 0 : // Stop streaming if the receivers are caught up and
89 0 : // there's no active compute. This causes the loop in
90 0 : // [`crate::send_interpreted_wal::InterpretedWalSender::run`]
91 0 : // to exit and terminate the WAL stream.
92 0 : return;
93 0 : }
94 0 : }
95 0 : }
96 0 :
97 0 : continue;
98 0 : }
99 0 : }
100 0 : }
101 0 :
102 0 :
103 0 : assert!(
104 0 : end_pos > start_pos,
105 0 : "nothing to send after waiting for WAL"
106 0 : );
107 0 :
108 0 : // try to send as much as available, capped by the buffer size
109 0 : let mut chunk_end_pos = start_pos + buffer_size as u64;
110 0 : // if we went behind available WAL, back off
111 0 : if chunk_end_pos >= end_pos {
112 0 : chunk_end_pos = end_pos;
113 0 : } else {
114 0 : // If sending not up to end pos, round down to page boundary to
115 0 : // avoid breaking WAL record not at page boundary, as protocol
116 0 : // demands. See walsender.c (XLogSendPhysical).
117 0 : chunk_end_pos = chunk_end_pos
118 0 : .checked_sub(chunk_end_pos.block_offset())
119 0 : .unwrap();
120 0 : }
121 0 : let send_size = (chunk_end_pos.0 - start_pos.0) as usize;
122 0 : let buffer = &mut buffer[..send_size];
123 0 : let send_size: usize;
124 0 : {
125 0 : // If uncommitted part is being pulled, check that the term is
126 0 : // still the expected one.
127 0 : let _term_guard = if let Some(t) = term {
128 0 : Some(tli.acquire_term(t).await?)
129 0 : } else {
130 0 : None
131 0 : };
132 0 : // Read WAL into buffer. send_size can be additionally capped to
133 0 : // segment boundary here.
134 0 : send_size = wal_reader.read(buffer).await?
135 0 : };
136 0 : let wal = Bytes::copy_from_slice(&buffer[..send_size]);
137 0 :
138 0 : yield WalBytes {
139 0 : wal,
140 0 : wal_start_lsn: start_pos,
141 0 : wal_end_lsn: start_pos + send_size as u64,
142 0 : available_wal_end_lsn: end_pos
143 0 : };
144 0 :
145 0 : start_pos += send_size as u64;
146 0 : }
147 0 : })
148 0 : }
149 : }
|