Line data Source code
1 : use std::collections::HashMap;
2 : use std::fmt::Display;
3 : use std::sync::Arc;
4 : use std::time::Duration;
5 :
6 : use anyhow::{anyhow, Context};
7 : use futures::future::Either;
8 : use futures::StreamExt;
9 : use pageserver_api::shard::ShardIdentity;
10 : use postgres_backend::{CopyStreamHandlerEnd, PostgresBackend};
11 : use postgres_ffi::waldecoder::WalDecodeError;
12 : use postgres_ffi::{get_current_timestamp, waldecoder::WalStreamDecoder};
13 : use pq_proto::{BeMessage, InterpretedWalRecordsBody, WalSndKeepAlive};
14 : use tokio::io::{AsyncRead, AsyncWrite};
15 : use tokio::sync::mpsc::error::SendError;
16 : use tokio::task::JoinHandle;
17 : use tokio::time::MissedTickBehavior;
18 : use tracing::{error, info, info_span, Instrument};
19 : use utils::critical;
20 : use utils::lsn::Lsn;
21 : use utils::postgres_client::Compression;
22 : use utils::postgres_client::InterpretedFormat;
23 : use wal_decoder::models::{InterpretedWalRecord, InterpretedWalRecords};
24 : use wal_decoder::wire_format::ToWireFormat;
25 :
26 : use crate::metrics::WAL_READERS;
27 : use crate::send_wal::{EndWatchView, WalSenderGuard};
28 : use crate::timeline::WalResidentTimeline;
29 : use crate::wal_reader_stream::{StreamingWalReader, WalBytes};
30 :
31 : /// Identifier used to differentiate between senders of the same
32 : /// shard.
33 : ///
34 : /// In the steady state there's only one, but two pageservers may
35 : /// temporarily have the same shard attached and attempt to ingest
36 : /// WAL for it. See also [`ShardSenderId`].
37 : #[derive(Hash, Eq, PartialEq, Copy, Clone)]
38 : struct SenderId(u8);
39 :
40 : impl SenderId {
41 3 : fn first() -> Self {
42 3 : SenderId(0)
43 3 : }
44 :
45 2 : fn next(&self) -> Self {
46 2 : SenderId(self.0.checked_add(1).expect("few senders"))
47 2 : }
48 : }
49 :
50 : #[derive(Hash, Eq, PartialEq)]
51 : struct ShardSenderId {
52 : shard: ShardIdentity,
53 : sender_id: SenderId,
54 : }
55 :
56 : impl Display for ShardSenderId {
57 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58 0 : write!(f, "{}{}", self.sender_id.0, self.shard.shard_slug())
59 0 : }
60 : }
61 :
62 : impl ShardSenderId {
63 865 : fn new(shard: ShardIdentity, sender_id: SenderId) -> Self {
64 865 : ShardSenderId { shard, sender_id }
65 865 : }
66 :
67 0 : fn shard(&self) -> ShardIdentity {
68 0 : self.shard
69 0 : }
70 : }
71 :
72 : /// Shard-aware fan-out interpreted record reader.
73 : /// Reads WAL from disk, decodes it, intepretets it, and sends
74 : /// it to any [`InterpretedWalSender`] connected to it.
75 : /// Each [`InterpretedWalSender`] corresponds to one shard
76 : /// and gets interpreted records concerning that shard only.
77 : pub(crate) struct InterpretedWalReader {
78 : wal_stream: StreamingWalReader,
79 : shard_senders: HashMap<ShardIdentity, smallvec::SmallVec<[ShardSenderState; 1]>>,
80 : shard_notification_rx: Option<tokio::sync::mpsc::UnboundedReceiver<AttachShardNotification>>,
81 : state: Arc<std::sync::RwLock<InterpretedWalReaderState>>,
82 : pg_version: u32,
83 : }
84 :
85 : /// A handle for [`InterpretedWalReader`] which allows for interacting with it
86 : /// when it runs as a separate tokio task.
87 : #[derive(Debug)]
88 : pub(crate) struct InterpretedWalReaderHandle {
89 : join_handle: JoinHandle<Result<(), InterpretedWalReaderError>>,
90 : state: Arc<std::sync::RwLock<InterpretedWalReaderState>>,
91 : shard_notification_tx: tokio::sync::mpsc::UnboundedSender<AttachShardNotification>,
92 : }
93 :
94 : struct ShardSenderState {
95 : sender_id: SenderId,
96 : tx: tokio::sync::mpsc::Sender<Batch>,
97 : next_record_lsn: Lsn,
98 : }
99 :
100 : /// State of [`InterpretedWalReader`] visible outside of the task running it.
101 : #[derive(Debug)]
102 : pub(crate) enum InterpretedWalReaderState {
103 : Running { current_position: Lsn },
104 : Done,
105 : }
106 :
107 : pub(crate) struct Batch {
108 : wal_end_lsn: Lsn,
109 : available_wal_end_lsn: Lsn,
110 : records: InterpretedWalRecords,
111 : }
112 :
113 : #[derive(thiserror::Error, Debug)]
114 : pub enum InterpretedWalReaderError {
115 : /// Handler initiates the end of streaming.
116 : #[error("decode error: {0}")]
117 : Decode(#[from] WalDecodeError),
118 : #[error("read or interpret error: {0}")]
119 : ReadOrInterpret(#[from] anyhow::Error),
120 : #[error("wal stream closed")]
121 : WalStreamClosed,
122 : }
123 :
124 : enum CurrentPositionUpdate {
125 : Reset(Lsn),
126 : NotReset(Lsn),
127 : }
128 :
129 : impl CurrentPositionUpdate {
130 0 : fn current_position(&self) -> Lsn {
131 0 : match self {
132 0 : CurrentPositionUpdate::Reset(lsn) => *lsn,
133 0 : CurrentPositionUpdate::NotReset(lsn) => *lsn,
134 : }
135 0 : }
136 : }
137 :
138 : impl InterpretedWalReaderState {
139 4 : fn current_position(&self) -> Option<Lsn> {
140 4 : match self {
141 : InterpretedWalReaderState::Running {
142 2 : current_position, ..
143 2 : } => Some(*current_position),
144 2 : InterpretedWalReaderState::Done => None,
145 : }
146 4 : }
147 :
148 : // Reset the current position of the WAL reader if the requested starting position
149 : // of the new shard is smaller than the current value.
150 3 : fn maybe_reset(&mut self, new_shard_start_pos: Lsn) -> CurrentPositionUpdate {
151 3 : match self {
152 : InterpretedWalReaderState::Running {
153 3 : current_position, ..
154 3 : } => {
155 3 : if new_shard_start_pos < *current_position {
156 2 : *current_position = new_shard_start_pos;
157 2 : CurrentPositionUpdate::Reset(*current_position)
158 : } else {
159 1 : CurrentPositionUpdate::NotReset(*current_position)
160 : }
161 : }
162 : InterpretedWalReaderState::Done => {
163 0 : panic!("maybe_reset called on finished reader")
164 : }
165 : }
166 3 : }
167 : }
168 :
169 : pub(crate) struct AttachShardNotification {
170 : shard_id: ShardIdentity,
171 : sender: tokio::sync::mpsc::Sender<Batch>,
172 : start_pos: Lsn,
173 : }
174 :
175 : impl InterpretedWalReader {
176 : /// Spawn the reader in a separate tokio task and return a handle
177 2 : pub(crate) fn spawn(
178 2 : wal_stream: StreamingWalReader,
179 2 : start_pos: Lsn,
180 2 : tx: tokio::sync::mpsc::Sender<Batch>,
181 2 : shard: ShardIdentity,
182 2 : pg_version: u32,
183 2 : appname: &Option<String>,
184 2 : ) -> InterpretedWalReaderHandle {
185 2 : let state = Arc::new(std::sync::RwLock::new(InterpretedWalReaderState::Running {
186 2 : current_position: start_pos,
187 2 : }));
188 2 :
189 2 : let (shard_notification_tx, shard_notification_rx) = tokio::sync::mpsc::unbounded_channel();
190 :
191 2 : let reader = InterpretedWalReader {
192 2 : wal_stream,
193 2 : shard_senders: HashMap::from([(
194 2 : shard,
195 2 : smallvec::smallvec![ShardSenderState {
196 0 : sender_id: SenderId::first(),
197 0 : tx,
198 0 : next_record_lsn: start_pos,
199 0 : }],
200 : )]),
201 2 : shard_notification_rx: Some(shard_notification_rx),
202 2 : state: state.clone(),
203 2 : pg_version,
204 2 : };
205 2 :
206 2 : let metric = WAL_READERS
207 2 : .get_metric_with_label_values(&["task", appname.as_deref().unwrap_or("safekeeper")])
208 2 : .unwrap();
209 :
210 2 : let join_handle = tokio::task::spawn(
211 2 : async move {
212 2 : metric.inc();
213 2 : scopeguard::defer! {
214 2 : metric.dec();
215 2 : }
216 2 :
217 2 : reader
218 2 : .run_impl(start_pos)
219 2 : .await
220 0 : .inspect_err(|err| critical!("failed to read WAL record: {err:?}"))
221 0 : }
222 2 : .instrument(info_span!("interpreted wal reader")),
223 : );
224 :
225 2 : InterpretedWalReaderHandle {
226 2 : join_handle,
227 2 : state,
228 2 : shard_notification_tx,
229 2 : }
230 2 : }
231 :
232 : /// Construct the reader without spawning anything
233 : /// Callers should drive the future returned by [`Self::run`].
234 0 : pub(crate) fn new(
235 0 : wal_stream: StreamingWalReader,
236 0 : start_pos: Lsn,
237 0 : tx: tokio::sync::mpsc::Sender<Batch>,
238 0 : shard: ShardIdentity,
239 0 : pg_version: u32,
240 0 : ) -> InterpretedWalReader {
241 0 : let state = Arc::new(std::sync::RwLock::new(InterpretedWalReaderState::Running {
242 0 : current_position: start_pos,
243 0 : }));
244 0 :
245 0 : InterpretedWalReader {
246 0 : wal_stream,
247 0 : shard_senders: HashMap::from([(
248 0 : shard,
249 0 : smallvec::smallvec![ShardSenderState {
250 0 : sender_id: SenderId::first(),
251 0 : tx,
252 0 : next_record_lsn: start_pos,
253 0 : }],
254 : )]),
255 0 : shard_notification_rx: None,
256 0 : state: state.clone(),
257 0 : pg_version,
258 0 : }
259 0 : }
260 :
261 : /// Entry point for future (polling) based wal reader.
262 0 : pub(crate) async fn run(
263 0 : self,
264 0 : start_pos: Lsn,
265 0 : appname: &Option<String>,
266 0 : ) -> Result<(), CopyStreamHandlerEnd> {
267 0 : let metric = WAL_READERS
268 0 : .get_metric_with_label_values(&["future", appname.as_deref().unwrap_or("safekeeper")])
269 0 : .unwrap();
270 0 :
271 0 : metric.inc();
272 0 : scopeguard::defer! {
273 0 : metric.dec();
274 0 : }
275 :
276 0 : if let Err(err) = self.run_impl(start_pos).await {
277 0 : critical!("failed to read WAL record: {err:?}");
278 : } else {
279 0 : info!("interpreted wal reader exiting");
280 : }
281 :
282 0 : Err(CopyStreamHandlerEnd::Other(anyhow!(
283 0 : "interpreted wal reader finished"
284 0 : )))
285 0 : }
286 :
287 : /// Send interpreted WAL to one or more [`InterpretedWalSender`]s
288 : /// Stops when an error is encountered or when the [`InterpretedWalReaderHandle`]
289 : /// goes out of scope.
290 2 : async fn run_impl(mut self, start_pos: Lsn) -> Result<(), InterpretedWalReaderError> {
291 2 : let defer_state = self.state.clone();
292 2 : scopeguard::defer! {
293 2 : *defer_state.write().unwrap() = InterpretedWalReaderState::Done;
294 2 : }
295 2 :
296 2 : let mut wal_decoder = WalStreamDecoder::new(start_pos, self.pg_version);
297 2 :
298 2 : // Tracks the start of the PG WAL LSN from which the current batch of
299 2 : // interpreted records originated.
300 2 : let mut current_batch_wal_start_lsn: Option<Lsn> = None;
301 :
302 : loop {
303 44 : tokio::select! {
304 : // Main branch for reading WAL and forwarding it
305 44 : wal_or_reset = self.wal_stream.next() => {
306 39 : let wal = wal_or_reset.map(|wor| wor.get_wal().expect("reset handled in select branch below"));
307 : let WalBytes {
308 39 : wal,
309 39 : wal_start_lsn,
310 39 : wal_end_lsn,
311 39 : available_wal_end_lsn,
312 39 : } = match wal {
313 39 : Some(some) => some.map_err(InterpretedWalReaderError::ReadOrInterpret)?,
314 : None => {
315 : // [`StreamingWalReader::next`] is an endless stream of WAL.
316 : // It shouldn't ever finish unless it panicked or became internally
317 : // inconsistent.
318 0 : return Result::Err(InterpretedWalReaderError::WalStreamClosed);
319 : }
320 : };
321 :
322 : // We will already have a value if the previous chunks of WAL
323 : // did not decode into anything useful.
324 39 : if current_batch_wal_start_lsn.is_none() {
325 39 : current_batch_wal_start_lsn = Some(wal_start_lsn);
326 39 : }
327 :
328 39 : wal_decoder.feed_bytes(&wal);
329 39 :
330 39 : // Deserialize and interpret WAL records from this batch of WAL.
331 39 : // Interpreted records for each shard are collected separately.
332 39 : let shard_ids = self.shard_senders.keys().copied().collect::<Vec<_>>();
333 39 : let mut records_by_sender: HashMap<ShardSenderId, Vec<InterpretedWalRecord>> = HashMap::new();
334 39 : let mut max_next_record_lsn = None;
335 635 : while let Some((next_record_lsn, recdata)) = wal_decoder.poll_decode()?
336 : {
337 596 : assert!(next_record_lsn.is_aligned());
338 596 : max_next_record_lsn = Some(next_record_lsn);
339 :
340 596 : let interpreted = InterpretedWalRecord::from_bytes_filtered(
341 596 : recdata,
342 596 : &shard_ids,
343 596 : next_record_lsn,
344 596 : self.pg_version,
345 596 : )
346 596 : .with_context(|| "Failed to interpret WAL")?;
347 :
348 1391 : for (shard, record) in interpreted {
349 795 : if record.is_empty() {
350 199 : continue;
351 596 : }
352 596 :
353 596 : let mut states_iter = self.shard_senders
354 596 : .get(&shard)
355 596 : .expect("keys collected above")
356 596 : .iter()
357 992 : .filter(|state| record.next_record_lsn > state.next_record_lsn)
358 596 : .peekable();
359 986 : while let Some(state) = states_iter.next() {
360 787 : let shard_sender_id = ShardSenderId::new(shard, state.sender_id);
361 787 :
362 787 : // The most commont case is one sender per shard. Peek and break to avoid the
363 787 : // clone in that situation.
364 787 : if states_iter.peek().is_none() {
365 397 : records_by_sender.entry(shard_sender_id).or_default().push(record);
366 397 : break;
367 390 : } else {
368 390 : records_by_sender.entry(shard_sender_id).or_default().push(record.clone());
369 390 : }
370 : }
371 : }
372 : }
373 :
374 39 : let max_next_record_lsn = match max_next_record_lsn {
375 39 : Some(lsn) => lsn,
376 : None => {
377 0 : continue;
378 : }
379 : };
380 :
381 : // Update the current position such that new receivers can decide
382 : // whether to attach to us or spawn a new WAL reader.
383 39 : match &mut *self.state.write().unwrap() {
384 39 : InterpretedWalReaderState::Running { current_position, .. } => {
385 39 : *current_position = max_next_record_lsn;
386 39 : },
387 : InterpretedWalReaderState::Done => {
388 0 : unreachable!()
389 : }
390 : }
391 :
392 39 : let batch_wal_start_lsn = current_batch_wal_start_lsn.take().unwrap();
393 39 :
394 39 : // Send interpreted records downstream. Anything that has already been seen
395 39 : // by a shard is filtered out.
396 39 : let mut shard_senders_to_remove = Vec::new();
397 91 : for (shard, states) in &mut self.shard_senders {
398 130 : for state in states {
399 78 : let shard_sender_id = ShardSenderId::new(*shard, state.sender_id);
400 :
401 78 : let batch = if max_next_record_lsn > state.next_record_lsn {
402 : // This batch contains at least one record that this shard has not
403 : // seen yet.
404 65 : let records = records_by_sender.remove(&shard_sender_id).unwrap_or_default();
405 65 :
406 65 : InterpretedWalRecords {
407 65 : records,
408 65 : next_record_lsn: max_next_record_lsn,
409 65 : raw_wal_start_lsn: Some(batch_wal_start_lsn),
410 65 : }
411 13 : } else if wal_end_lsn > state.next_record_lsn {
412 : // All the records in this batch were seen by the shard
413 : // However, the batch maps to a chunk of WAL that the
414 : // shard has not yet seen. Notify it of the start LSN
415 : // of the PG WAL chunk such that it doesn't look like a gap.
416 0 : InterpretedWalRecords {
417 0 : records: Vec::default(),
418 0 : next_record_lsn: state.next_record_lsn,
419 0 : raw_wal_start_lsn: Some(batch_wal_start_lsn),
420 0 : }
421 : } else {
422 : // The shard has seen this chunk of WAL before. Skip it.
423 13 : continue;
424 : };
425 :
426 65 : let res = state.tx.send(Batch {
427 65 : wal_end_lsn,
428 65 : available_wal_end_lsn,
429 65 : records: batch,
430 65 : }).await;
431 :
432 65 : if res.is_err() {
433 0 : shard_senders_to_remove.push(shard_sender_id);
434 65 : } else {
435 65 : state.next_record_lsn = std::cmp::max(state.next_record_lsn, max_next_record_lsn);
436 65 : }
437 : }
438 : }
439 :
440 : // Clean up any shard senders that have dropped out.
441 : // This is inefficient, but such events are rare (connection to PS termination)
442 : // and the number of subscriptions on the same shards very small (only one
443 : // for the steady state).
444 39 : for to_remove in shard_senders_to_remove {
445 0 : let shard_senders = self.shard_senders.get_mut(&to_remove.shard()).expect("saw it above");
446 0 : if let Some(idx) = shard_senders.iter().position(|s| s.sender_id == to_remove.sender_id) {
447 0 : shard_senders.remove(idx);
448 0 : tracing::info!("Removed shard sender {}", to_remove);
449 0 : }
450 :
451 0 : if shard_senders.is_empty() {
452 0 : self.shard_senders.remove(&to_remove.shard());
453 0 : }
454 : }
455 : },
456 : // Listen for new shards that want to attach to this reader.
457 : // If the reader is not running as a task, then this is not supported
458 : // (see the pending branch below).
459 44 : notification = match self.shard_notification_rx.as_mut() {
460 44 : Some(rx) => Either::Left(rx.recv()),
461 0 : None => Either::Right(std::future::pending())
462 : } => {
463 3 : if let Some(n) = notification {
464 3 : let AttachShardNotification { shard_id, sender, start_pos } = n;
465 3 :
466 3 : // Update internal and external state, then reset the WAL stream
467 3 : // if required.
468 3 : let senders = self.shard_senders.entry(shard_id).or_default();
469 3 : let new_sender_id = match senders.last() {
470 2 : Some(sender) => sender.sender_id.next(),
471 1 : None => SenderId::first()
472 : };
473 :
474 3 : senders.push(ShardSenderState { sender_id: new_sender_id, tx: sender, next_record_lsn: start_pos});
475 3 :
476 3 : // If the shard is subscribing below the current position the we need
477 3 : // to update the cursor that tracks where we are at in the WAL
478 3 : // ([`Self::state`]) and reset the WAL stream itself
479 3 : // (`[Self::wal_stream`]). This must be done atomically from the POV of
480 3 : // anything outside the select statement.
481 3 : let position_reset = self.state.write().unwrap().maybe_reset(start_pos);
482 3 : match position_reset {
483 2 : CurrentPositionUpdate::Reset(to) => {
484 2 : self.wal_stream.reset(to).await;
485 2 : wal_decoder = WalStreamDecoder::new(to, self.pg_version);
486 : },
487 1 : CurrentPositionUpdate::NotReset(_) => {}
488 : };
489 :
490 3 : tracing::info!(
491 0 : "Added shard sender {} with start_pos={} current_pos={}",
492 0 : ShardSenderId::new(shard_id, new_sender_id), start_pos, position_reset.current_position()
493 : );
494 0 : }
495 : }
496 : }
497 : }
498 0 : }
499 : }
500 :
501 : impl InterpretedWalReaderHandle {
502 : /// Fan-out the reader by attaching a new shard to it
503 3 : pub(crate) fn fanout(
504 3 : &self,
505 3 : shard_id: ShardIdentity,
506 3 : sender: tokio::sync::mpsc::Sender<Batch>,
507 3 : start_pos: Lsn,
508 3 : ) -> Result<(), SendError<AttachShardNotification>> {
509 3 : self.shard_notification_tx.send(AttachShardNotification {
510 3 : shard_id,
511 3 : sender,
512 3 : start_pos,
513 3 : })
514 3 : }
515 :
516 : /// Get the current WAL position of the reader
517 4 : pub(crate) fn current_position(&self) -> Option<Lsn> {
518 4 : self.state.read().unwrap().current_position()
519 4 : }
520 :
521 4 : pub(crate) fn abort(&self) {
522 4 : self.join_handle.abort()
523 4 : }
524 : }
525 :
526 : impl Drop for InterpretedWalReaderHandle {
527 2 : fn drop(&mut self) {
528 2 : tracing::info!("Aborting interpreted wal reader");
529 2 : self.abort()
530 2 : }
531 : }
532 :
533 : pub(crate) struct InterpretedWalSender<'a, IO> {
534 : pub(crate) format: InterpretedFormat,
535 : pub(crate) compression: Option<Compression>,
536 : pub(crate) appname: Option<String>,
537 :
538 : pub(crate) tli: WalResidentTimeline,
539 : pub(crate) start_lsn: Lsn,
540 :
541 : pub(crate) pgb: &'a mut PostgresBackend<IO>,
542 : pub(crate) end_watch_view: EndWatchView,
543 : pub(crate) wal_sender_guard: Arc<WalSenderGuard>,
544 : pub(crate) rx: tokio::sync::mpsc::Receiver<Batch>,
545 : }
546 :
547 : impl<IO: AsyncRead + AsyncWrite + Unpin> InterpretedWalSender<'_, IO> {
548 : /// Send interpreted WAL records over the network.
549 : /// Also manages keep-alives if nothing was sent for a while.
550 0 : pub(crate) async fn run(mut self) -> Result<(), CopyStreamHandlerEnd> {
551 0 : let mut keepalive_ticker = tokio::time::interval(Duration::from_secs(1));
552 0 : keepalive_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
553 0 : keepalive_ticker.reset();
554 0 :
555 0 : let mut wal_position = self.start_lsn;
556 :
557 : loop {
558 0 : tokio::select! {
559 0 : batch = self.rx.recv() => {
560 0 : let batch = match batch {
561 0 : Some(b) => b,
562 : None => {
563 0 : return Result::Err(
564 0 : CopyStreamHandlerEnd::Other(anyhow!("Interpreted WAL reader exited early"))
565 0 : );
566 : }
567 : };
568 :
569 0 : wal_position = batch.wal_end_lsn;
570 :
571 0 : let buf = batch
572 0 : .records
573 0 : .to_wire(self.format, self.compression)
574 0 : .await
575 0 : .with_context(|| "Failed to serialize interpreted WAL")
576 0 : .map_err(CopyStreamHandlerEnd::from)?;
577 :
578 : // Reset the keep alive ticker since we are sending something
579 : // over the wire now.
580 0 : keepalive_ticker.reset();
581 0 :
582 0 : self.pgb
583 0 : .write_message(&BeMessage::InterpretedWalRecords(InterpretedWalRecordsBody {
584 0 : streaming_lsn: batch.wal_end_lsn.0,
585 0 : commit_lsn: batch.available_wal_end_lsn.0,
586 0 : data: &buf,
587 0 : })).await?;
588 : }
589 : // Send a periodic keep alive when the connection has been idle for a while.
590 : // Since we've been idle, also check if we can stop streaming.
591 0 : _ = keepalive_ticker.tick() => {
592 0 : if let Some(remote_consistent_lsn) = self.wal_sender_guard
593 0 : .walsenders()
594 0 : .get_ws_remote_consistent_lsn(self.wal_sender_guard.id())
595 : {
596 0 : if self.tli.should_walsender_stop(remote_consistent_lsn).await {
597 : // Stop streaming if the receivers are caught up and
598 : // there's no active compute. This causes the loop in
599 : // [`crate::send_interpreted_wal::InterpretedWalSender::run`]
600 : // to exit and terminate the WAL stream.
601 0 : break;
602 0 : }
603 0 : }
604 :
605 0 : self.pgb
606 0 : .write_message(&BeMessage::KeepAlive(WalSndKeepAlive {
607 0 : wal_end: self.end_watch_view.get().0,
608 0 : timestamp: get_current_timestamp(),
609 0 : request_reply: true,
610 0 : }))
611 0 : .await?;
612 : },
613 : }
614 : }
615 :
616 0 : Err(CopyStreamHandlerEnd::ServerInitiated(format!(
617 0 : "ending streaming to {:?} at {}, receiver is caughtup and there is no computes",
618 0 : self.appname, wal_position,
619 0 : )))
620 0 : }
621 : }
622 : #[cfg(test)]
623 : mod tests {
624 : use std::{collections::HashMap, str::FromStr, time::Duration};
625 :
626 : use pageserver_api::shard::{ShardIdentity, ShardStripeSize};
627 : use postgres_ffi::MAX_SEND_SIZE;
628 : use tokio::sync::mpsc::error::TryRecvError;
629 : use utils::{
630 : id::{NodeId, TenantTimelineId},
631 : lsn::Lsn,
632 : shard::{ShardCount, ShardNumber},
633 : };
634 :
635 : use crate::{
636 : send_interpreted_wal::{Batch, InterpretedWalReader},
637 : test_utils::Env,
638 : wal_reader_stream::StreamingWalReader,
639 : };
640 :
641 : #[tokio::test]
642 1 : async fn test_interpreted_wal_reader_fanout() {
643 1 : let _ = env_logger::builder().is_test(true).try_init();
644 1 :
645 1 : const SIZE: usize = 8 * 1024;
646 1 : const MSG_COUNT: usize = 200;
647 1 : const PG_VERSION: u32 = 17;
648 1 : const SHARD_COUNT: u8 = 2;
649 1 :
650 1 : let start_lsn = Lsn::from_str("0/149FD18").unwrap();
651 1 : let env = Env::new(true).unwrap();
652 1 : let tli = env
653 1 : .make_timeline(NodeId(1), TenantTimelineId::generate(), start_lsn)
654 1 : .await
655 1 : .unwrap();
656 1 :
657 1 : let resident_tli = tli.wal_residence_guard().await.unwrap();
658 1 : let end_watch = Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT, None)
659 1 : .await
660 1 : .unwrap();
661 1 : let end_pos = end_watch.get();
662 1 :
663 1 : tracing::info!("Doing first round of reads ...");
664 1 :
665 1 : let streaming_wal_reader = StreamingWalReader::new(
666 1 : resident_tli,
667 1 : None,
668 1 : start_lsn,
669 1 : end_pos,
670 1 : end_watch,
671 1 : MAX_SEND_SIZE,
672 1 : );
673 1 :
674 1 : let shard_0 = ShardIdentity::new(
675 1 : ShardNumber(0),
676 1 : ShardCount(SHARD_COUNT),
677 1 : ShardStripeSize::default(),
678 1 : )
679 1 : .unwrap();
680 1 :
681 1 : let shard_1 = ShardIdentity::new(
682 1 : ShardNumber(1),
683 1 : ShardCount(SHARD_COUNT),
684 1 : ShardStripeSize::default(),
685 1 : )
686 1 : .unwrap();
687 1 :
688 1 : let mut shards = HashMap::new();
689 1 :
690 3 : for shard_number in 0..SHARD_COUNT {
691 2 : let shard_id = ShardIdentity::new(
692 2 : ShardNumber(shard_number),
693 2 : ShardCount(SHARD_COUNT),
694 2 : ShardStripeSize::default(),
695 2 : )
696 2 : .unwrap();
697 2 : let (tx, rx) = tokio::sync::mpsc::channel::<Batch>(MSG_COUNT * 2);
698 2 : shards.insert(shard_id, (Some(tx), Some(rx)));
699 2 : }
700 1 :
701 1 : let shard_0_tx = shards.get_mut(&shard_0).unwrap().0.take().unwrap();
702 1 : let mut shard_0_rx = shards.get_mut(&shard_0).unwrap().1.take().unwrap();
703 1 :
704 1 : let handle = InterpretedWalReader::spawn(
705 1 : streaming_wal_reader,
706 1 : start_lsn,
707 1 : shard_0_tx,
708 1 : shard_0,
709 1 : PG_VERSION,
710 1 : &Some("pageserver".to_string()),
711 1 : );
712 1 :
713 1 : tracing::info!("Reading all WAL with only shard 0 attached ...");
714 1 :
715 1 : let mut shard_0_interpreted_records = Vec::new();
716 13 : while let Some(batch) = shard_0_rx.recv().await {
717 13 : shard_0_interpreted_records.push(batch.records);
718 13 : if batch.wal_end_lsn == batch.available_wal_end_lsn {
719 1 : break;
720 12 : }
721 1 : }
722 1 :
723 1 : let shard_1_tx = shards.get_mut(&shard_1).unwrap().0.take().unwrap();
724 1 : let mut shard_1_rx = shards.get_mut(&shard_1).unwrap().1.take().unwrap();
725 1 :
726 1 : tracing::info!("Attaching shard 1 to the reader at start of WAL");
727 1 : handle.fanout(shard_1, shard_1_tx, start_lsn).unwrap();
728 1 :
729 1 : tracing::info!("Reading all WAL with shard 0 and shard 1 attached ...");
730 1 :
731 1 : let mut shard_1_interpreted_records = Vec::new();
732 13 : while let Some(batch) = shard_1_rx.recv().await {
733 13 : shard_1_interpreted_records.push(batch.records);
734 13 : if batch.wal_end_lsn == batch.available_wal_end_lsn {
735 1 : break;
736 12 : }
737 1 : }
738 1 :
739 1 : // This test uses logical messages. Those only go to shard 0. Check that the
740 1 : // filtering worked and shard 1 did not get any.
741 1 : assert!(shard_1_interpreted_records
742 1 : .iter()
743 13 : .all(|recs| recs.records.is_empty()));
744 1 :
745 1 : // Shard 0 should not receive anything more since the reader is
746 1 : // going through wal that it has already processed.
747 1 : let res = shard_0_rx.try_recv();
748 1 : if let Ok(ref ok) = res {
749 1 : tracing::error!(
750 1 : "Shard 0 received batch: wal_end_lsn={} available_wal_end_lsn={}",
751 1 : ok.wal_end_lsn,
752 1 : ok.available_wal_end_lsn
753 1 : );
754 1 : }
755 1 : assert!(matches!(res, Err(TryRecvError::Empty)));
756 1 :
757 1 : // Check that the next records lsns received by the two shards match up.
758 1 : let shard_0_next_lsns = shard_0_interpreted_records
759 1 : .iter()
760 13 : .map(|recs| recs.next_record_lsn)
761 1 : .collect::<Vec<_>>();
762 1 : let shard_1_next_lsns = shard_1_interpreted_records
763 1 : .iter()
764 13 : .map(|recs| recs.next_record_lsn)
765 1 : .collect::<Vec<_>>();
766 1 : assert_eq!(shard_0_next_lsns, shard_1_next_lsns);
767 1 :
768 1 : handle.abort();
769 1 : let mut done = false;
770 2 : for _ in 0..5 {
771 2 : if handle.current_position().is_none() {
772 1 : done = true;
773 1 : break;
774 1 : }
775 1 : tokio::time::sleep(Duration::from_millis(1)).await;
776 1 : }
777 1 :
778 1 : assert!(done);
779 1 : }
780 :
781 : #[tokio::test]
782 1 : async fn test_interpreted_wal_reader_same_shard_fanout() {
783 1 : let _ = env_logger::builder().is_test(true).try_init();
784 1 :
785 1 : const SIZE: usize = 8 * 1024;
786 1 : const MSG_COUNT: usize = 200;
787 1 : const PG_VERSION: u32 = 17;
788 1 : const SHARD_COUNT: u8 = 2;
789 1 :
790 1 : let start_lsn = Lsn::from_str("0/149FD18").unwrap();
791 1 : let env = Env::new(true).unwrap();
792 1 : let tli = env
793 1 : .make_timeline(NodeId(1), TenantTimelineId::generate(), start_lsn)
794 1 : .await
795 1 : .unwrap();
796 1 :
797 1 : let resident_tli = tli.wal_residence_guard().await.unwrap();
798 1 : let mut next_record_lsns = Vec::default();
799 1 : let end_watch =
800 1 : Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT, Some(&mut next_record_lsns))
801 1 : .await
802 1 : .unwrap();
803 1 : let end_pos = end_watch.get();
804 1 :
805 1 : let streaming_wal_reader = StreamingWalReader::new(
806 1 : resident_tli,
807 1 : None,
808 1 : start_lsn,
809 1 : end_pos,
810 1 : end_watch,
811 1 : MAX_SEND_SIZE,
812 1 : );
813 1 :
814 1 : let shard_0 = ShardIdentity::new(
815 1 : ShardNumber(0),
816 1 : ShardCount(SHARD_COUNT),
817 1 : ShardStripeSize::default(),
818 1 : )
819 1 : .unwrap();
820 1 :
821 1 : struct Sender {
822 1 : tx: Option<tokio::sync::mpsc::Sender<Batch>>,
823 1 : rx: tokio::sync::mpsc::Receiver<Batch>,
824 1 : shard: ShardIdentity,
825 1 : start_lsn: Lsn,
826 1 : received_next_record_lsns: Vec<Lsn>,
827 1 : }
828 1 :
829 1 : impl Sender {
830 3 : fn new(start_lsn: Lsn, shard: ShardIdentity) -> Self {
831 3 : let (tx, rx) = tokio::sync::mpsc::channel::<Batch>(MSG_COUNT * 2);
832 3 : Self {
833 3 : tx: Some(tx),
834 3 : rx,
835 3 : shard,
836 3 : start_lsn,
837 3 : received_next_record_lsns: Vec::default(),
838 3 : }
839 3 : }
840 1 : }
841 1 :
842 1 : assert!(next_record_lsns.len() > 7);
843 1 : let start_lsns = vec![
844 1 : next_record_lsns[5],
845 1 : next_record_lsns[1],
846 1 : next_record_lsns[3],
847 1 : ];
848 1 : let mut senders = start_lsns
849 1 : .into_iter()
850 3 : .map(|lsn| Sender::new(lsn, shard_0))
851 1 : .collect::<Vec<_>>();
852 1 :
853 1 : let first_sender = senders.first_mut().unwrap();
854 1 : let handle = InterpretedWalReader::spawn(
855 1 : streaming_wal_reader,
856 1 : first_sender.start_lsn,
857 1 : first_sender.tx.take().unwrap(),
858 1 : first_sender.shard,
859 1 : PG_VERSION,
860 1 : &Some("pageserver".to_string()),
861 1 : );
862 1 :
863 2 : for sender in senders.iter_mut().skip(1) {
864 2 : handle
865 2 : .fanout(sender.shard, sender.tx.take().unwrap(), sender.start_lsn)
866 2 : .unwrap();
867 2 : }
868 1 :
869 3 : for sender in senders.iter_mut() {
870 1 : loop {
871 39 : let batch = sender.rx.recv().await.unwrap();
872 39 : tracing::info!(
873 1 : "Sender with start_lsn={} received batch ending at {} with {} records",
874 0 : sender.start_lsn,
875 0 : batch.wal_end_lsn,
876 0 : batch.records.records.len()
877 1 : );
878 1 :
879 627 : for rec in batch.records.records {
880 588 : sender.received_next_record_lsns.push(rec.next_record_lsn);
881 588 : }
882 1 :
883 39 : if batch.wal_end_lsn == batch.available_wal_end_lsn {
884 3 : break;
885 36 : }
886 1 : }
887 1 : }
888 1 :
889 1 : handle.abort();
890 1 : let mut done = false;
891 2 : for _ in 0..5 {
892 2 : if handle.current_position().is_none() {
893 1 : done = true;
894 1 : break;
895 1 : }
896 1 : tokio::time::sleep(Duration::from_millis(1)).await;
897 1 : }
898 1 :
899 1 : assert!(done);
900 1 :
901 4 : for sender in senders {
902 3 : tracing::info!(
903 1 : "Validating records received by sender with start_lsn={}",
904 1 : sender.start_lsn
905 1 : );
906 1 :
907 3 : assert!(sender.received_next_record_lsns.is_sorted());
908 3 : let expected = next_record_lsns
909 3 : .iter()
910 600 : .filter(|lsn| **lsn > sender.start_lsn)
911 3 : .copied()
912 3 : .collect::<Vec<_>>();
913 3 : assert_eq!(sender.received_next_record_lsns, expected);
914 1 : }
915 1 : }
916 : }
|