Line data Source code
1 : use std::collections::HashMap;
2 : use std::fmt::Display;
3 : use std::sync::Arc;
4 : use std::time::Duration;
5 :
6 : use anyhow::{anyhow, Context};
7 : use futures::future::Either;
8 : use futures::StreamExt;
9 : use pageserver_api::shard::ShardIdentity;
10 : use postgres_backend::{CopyStreamHandlerEnd, PostgresBackend};
11 : use postgres_ffi::waldecoder::WalDecodeError;
12 : use postgres_ffi::{get_current_timestamp, waldecoder::WalStreamDecoder};
13 : use pq_proto::{BeMessage, InterpretedWalRecordsBody, WalSndKeepAlive};
14 : use tokio::io::{AsyncRead, AsyncWrite};
15 : use tokio::sync::mpsc::error::SendError;
16 : use tokio::task::JoinHandle;
17 : use tokio::time::MissedTickBehavior;
18 : use tracing::{info_span, Instrument};
19 : use utils::lsn::Lsn;
20 : use utils::postgres_client::Compression;
21 : use utils::postgres_client::InterpretedFormat;
22 : use wal_decoder::models::{InterpretedWalRecord, InterpretedWalRecords};
23 : use wal_decoder::wire_format::ToWireFormat;
24 :
25 : use crate::metrics::WAL_READERS;
26 : use crate::send_wal::{EndWatchView, WalSenderGuard};
27 : use crate::timeline::WalResidentTimeline;
28 : use crate::wal_reader_stream::{StreamingWalReader, WalBytes};
29 :
30 : /// Identifier used to differentiate between senders of the same
31 : /// shard.
32 : ///
33 : /// In the steady state there's only one, but two pageservers may
34 : /// temporarily have the same shard attached and attempt to ingest
35 : /// WAL for it. See also [`ShardSenderId`].
36 : #[derive(Hash, Eq, PartialEq, Copy, Clone)]
37 : struct SenderId(u8);
38 :
39 : impl SenderId {
40 3 : fn first() -> Self {
41 3 : SenderId(0)
42 3 : }
43 :
44 3 : fn next(&self) -> Self {
45 3 : SenderId(self.0.checked_add(1).expect("few senders"))
46 3 : }
47 : }
48 :
49 : #[derive(Hash, Eq, PartialEq)]
50 : struct ShardSenderId {
51 : shard: ShardIdentity,
52 : sender_id: SenderId,
53 : }
54 :
55 : impl Display for ShardSenderId {
56 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57 0 : write!(f, "{}{}", self.sender_id.0, self.shard.shard_slug())
58 0 : }
59 : }
60 :
61 : impl ShardSenderId {
62 78 : fn new(shard: ShardIdentity, sender_id: SenderId) -> Self {
63 78 : ShardSenderId { shard, sender_id }
64 78 : }
65 :
66 0 : fn shard(&self) -> ShardIdentity {
67 0 : self.shard
68 0 : }
69 : }
70 :
71 : /// Shard-aware fan-out interpreted record reader.
72 : /// Reads WAL from disk, decodes it, intepretets it, and sends
73 : /// it to any [`InterpretedWalSender`] connected to it.
74 : /// Each [`InterpretedWalSender`] corresponds to one shard
75 : /// and gets interpreted records concerning that shard only.
76 : pub(crate) struct InterpretedWalReader {
77 : wal_stream: StreamingWalReader,
78 : shard_senders: HashMap<ShardIdentity, smallvec::SmallVec<[ShardSenderState; 1]>>,
79 : shard_notification_rx: Option<tokio::sync::mpsc::UnboundedReceiver<AttachShardNotification>>,
80 : state: Arc<std::sync::RwLock<InterpretedWalReaderState>>,
81 : pg_version: u32,
82 : }
83 :
84 : /// A handle for [`InterpretedWalReader`] which allows for interacting with it
85 : /// when it runs as a separate tokio task.
86 : #[derive(Debug)]
87 : pub(crate) struct InterpretedWalReaderHandle {
88 : join_handle: JoinHandle<Result<(), InterpretedWalReaderError>>,
89 : state: Arc<std::sync::RwLock<InterpretedWalReaderState>>,
90 : shard_notification_tx: tokio::sync::mpsc::UnboundedSender<AttachShardNotification>,
91 : }
92 :
93 : struct ShardSenderState {
94 : sender_id: SenderId,
95 : tx: tokio::sync::mpsc::Sender<Batch>,
96 : next_record_lsn: Lsn,
97 : }
98 :
99 : /// State of [`InterpretedWalReader`] visible outside of the task running it.
100 : #[derive(Debug)]
101 : pub(crate) enum InterpretedWalReaderState {
102 : Running { current_position: Lsn },
103 : Done,
104 : }
105 :
106 : pub(crate) struct Batch {
107 : wal_end_lsn: Lsn,
108 : available_wal_end_lsn: Lsn,
109 : records: InterpretedWalRecords,
110 : }
111 :
112 : #[derive(thiserror::Error, Debug)]
113 : pub enum InterpretedWalReaderError {
114 : /// Handler initiates the end of streaming.
115 : #[error("decode error: {0}")]
116 : Decode(#[from] WalDecodeError),
117 : #[error("read or interpret error: {0}")]
118 : ReadOrInterpret(#[from] anyhow::Error),
119 : #[error("wal stream closed")]
120 : WalStreamClosed,
121 : }
122 :
123 : impl InterpretedWalReaderState {
124 8 : fn current_position(&self) -> Option<Lsn> {
125 8 : match self {
126 : InterpretedWalReaderState::Running {
127 6 : current_position, ..
128 6 : } => Some(*current_position),
129 2 : InterpretedWalReaderState::Done => None,
130 : }
131 8 : }
132 : }
133 :
134 : pub(crate) struct AttachShardNotification {
135 : shard_id: ShardIdentity,
136 : sender: tokio::sync::mpsc::Sender<Batch>,
137 : start_pos: Lsn,
138 : }
139 :
140 : impl InterpretedWalReader {
141 : /// Spawn the reader in a separate tokio task and return a handle
142 2 : pub(crate) fn spawn(
143 2 : wal_stream: StreamingWalReader,
144 2 : start_pos: Lsn,
145 2 : tx: tokio::sync::mpsc::Sender<Batch>,
146 2 : shard: ShardIdentity,
147 2 : pg_version: u32,
148 2 : appname: &Option<String>,
149 2 : ) -> InterpretedWalReaderHandle {
150 2 : let state = Arc::new(std::sync::RwLock::new(InterpretedWalReaderState::Running {
151 2 : current_position: start_pos,
152 2 : }));
153 2 :
154 2 : let (shard_notification_tx, shard_notification_rx) = tokio::sync::mpsc::unbounded_channel();
155 :
156 2 : let reader = InterpretedWalReader {
157 2 : wal_stream,
158 2 : shard_senders: HashMap::from([(
159 2 : shard,
160 2 : smallvec::smallvec![ShardSenderState {
161 0 : sender_id: SenderId::first(),
162 0 : tx,
163 0 : next_record_lsn: start_pos,
164 0 : }],
165 : )]),
166 2 : shard_notification_rx: Some(shard_notification_rx),
167 2 : state: state.clone(),
168 2 : pg_version,
169 2 : };
170 2 :
171 2 : let metric = WAL_READERS
172 2 : .get_metric_with_label_values(&["task", appname.as_deref().unwrap_or("safekeeper")])
173 2 : .unwrap();
174 :
175 2 : let join_handle = tokio::task::spawn(
176 2 : async move {
177 2 : metric.inc();
178 2 : scopeguard::defer! {
179 2 : metric.dec();
180 2 : }
181 :
182 2 : let res = reader.run_impl(start_pos).await;
183 0 : if let Err(ref err) = res {
184 0 : tracing::error!("Task finished with error: {err}");
185 0 : }
186 0 : res
187 0 : }
188 2 : .instrument(info_span!("interpreted wal reader")),
189 : );
190 :
191 2 : InterpretedWalReaderHandle {
192 2 : join_handle,
193 2 : state,
194 2 : shard_notification_tx,
195 2 : }
196 2 : }
197 :
198 : /// Construct the reader without spawning anything
199 : /// Callers should drive the future returned by [`Self::run`].
200 0 : pub(crate) fn new(
201 0 : wal_stream: StreamingWalReader,
202 0 : start_pos: Lsn,
203 0 : tx: tokio::sync::mpsc::Sender<Batch>,
204 0 : shard: ShardIdentity,
205 0 : pg_version: u32,
206 0 : ) -> InterpretedWalReader {
207 0 : let state = Arc::new(std::sync::RwLock::new(InterpretedWalReaderState::Running {
208 0 : current_position: start_pos,
209 0 : }));
210 0 :
211 0 : InterpretedWalReader {
212 0 : wal_stream,
213 0 : shard_senders: HashMap::from([(
214 0 : shard,
215 0 : smallvec::smallvec![ShardSenderState {
216 0 : sender_id: SenderId::first(),
217 0 : tx,
218 0 : next_record_lsn: start_pos,
219 0 : }],
220 : )]),
221 0 : shard_notification_rx: None,
222 0 : state: state.clone(),
223 0 : pg_version,
224 0 : }
225 0 : }
226 :
227 : /// Entry point for future (polling) based wal reader.
228 0 : pub(crate) async fn run(
229 0 : self,
230 0 : start_pos: Lsn,
231 0 : appname: &Option<String>,
232 0 : ) -> Result<(), CopyStreamHandlerEnd> {
233 0 : let metric = WAL_READERS
234 0 : .get_metric_with_label_values(&["future", appname.as_deref().unwrap_or("safekeeper")])
235 0 : .unwrap();
236 0 :
237 0 : metric.inc();
238 0 : scopeguard::defer! {
239 0 : metric.dec();
240 0 : }
241 :
242 0 : let res = self.run_impl(start_pos).await;
243 0 : if let Err(err) = res {
244 0 : tracing::error!("Interpreted wal reader encountered error: {err}");
245 : } else {
246 0 : tracing::info!("Interpreted wal reader exiting");
247 : }
248 :
249 0 : Err(CopyStreamHandlerEnd::Other(anyhow!(
250 0 : "interpreted wal reader finished"
251 0 : )))
252 0 : }
253 :
254 : /// Send interpreted WAL to one or more [`InterpretedWalSender`]s
255 : /// Stops when an error is encountered or when the [`InterpretedWalReaderHandle`]
256 : /// goes out of scope.
257 2 : async fn run_impl(mut self, start_pos: Lsn) -> Result<(), InterpretedWalReaderError> {
258 2 : let defer_state = self.state.clone();
259 2 : scopeguard::defer! {
260 2 : *defer_state.write().unwrap() = InterpretedWalReaderState::Done;
261 2 : }
262 2 :
263 2 : let mut wal_decoder = WalStreamDecoder::new(start_pos, self.pg_version);
264 :
265 : loop {
266 45 : tokio::select! {
267 : // Main branch for reading WAL and forwarding it
268 45 : wal_or_reset = self.wal_stream.next() => {
269 39 : let wal = wal_or_reset.map(|wor| wor.get_wal().expect("reset handled in select branch below"));
270 : let WalBytes {
271 39 : wal,
272 39 : wal_start_lsn: _,
273 39 : wal_end_lsn,
274 39 : available_wal_end_lsn,
275 39 : } = match wal {
276 39 : Some(some) => some.map_err(InterpretedWalReaderError::ReadOrInterpret)?,
277 : None => {
278 : // [`StreamingWalReader::next`] is an endless stream of WAL.
279 : // It shouldn't ever finish unless it panicked or became internally
280 : // inconsistent.
281 0 : return Result::Err(InterpretedWalReaderError::WalStreamClosed);
282 : }
283 : };
284 :
285 39 : wal_decoder.feed_bytes(&wal);
286 39 :
287 39 : // Deserialize and interpret WAL records from this batch of WAL.
288 39 : // Interpreted records for each shard are collected separately.
289 39 : let shard_ids = self.shard_senders.keys().copied().collect::<Vec<_>>();
290 39 : let mut records_by_sender: HashMap<ShardSenderId, Vec<InterpretedWalRecord>> = HashMap::new();
291 39 : let mut max_next_record_lsn = None;
292 636 : while let Some((next_record_lsn, recdata)) = wal_decoder.poll_decode()?
293 : {
294 597 : assert!(next_record_lsn.is_aligned());
295 597 : max_next_record_lsn = Some(next_record_lsn);
296 :
297 597 : let interpreted = InterpretedWalRecord::from_bytes_filtered(
298 597 : recdata,
299 597 : &shard_ids,
300 597 : next_record_lsn,
301 597 : self.pg_version,
302 597 : )
303 597 : .with_context(|| "Failed to interpret WAL")?;
304 :
305 1393 : for (shard, record) in interpreted {
306 796 : if record.is_empty() {
307 796 : continue;
308 0 : }
309 0 :
310 0 : let mut states_iter = self.shard_senders
311 0 : .get(&shard)
312 0 : .expect("keys collected above")
313 0 : .iter()
314 0 : .filter(|state| record.next_record_lsn > state.next_record_lsn)
315 0 : .peekable();
316 0 : while let Some(state) = states_iter.next() {
317 0 : let shard_sender_id = ShardSenderId::new(shard, state.sender_id);
318 0 :
319 0 : // The most commont case is one sender per shard. Peek and break to avoid the
320 0 : // clone in that situation.
321 0 : if states_iter.peek().is_none() {
322 0 : records_by_sender.entry(shard_sender_id).or_default().push(record);
323 0 : break;
324 0 : } else {
325 0 : records_by_sender.entry(shard_sender_id).or_default().push(record.clone());
326 0 : }
327 : }
328 : }
329 : }
330 :
331 39 : let max_next_record_lsn = match max_next_record_lsn {
332 39 : Some(lsn) => lsn,
333 0 : None => { continue; }
334 : };
335 :
336 : // Update the current position such that new receivers can decide
337 : // whether to attach to us or spawn a new WAL reader.
338 39 : match &mut *self.state.write().unwrap() {
339 39 : InterpretedWalReaderState::Running { current_position, .. } => {
340 39 : *current_position = max_next_record_lsn;
341 39 : },
342 : InterpretedWalReaderState::Done => {
343 0 : unreachable!()
344 : }
345 : }
346 :
347 : // Send interpreted records downstream. Anything that has already been seen
348 : // by a shard is filtered out.
349 39 : let mut shard_senders_to_remove = Vec::new();
350 91 : for (shard, states) in &mut self.shard_senders {
351 143 : for state in states {
352 91 : if max_next_record_lsn <= state.next_record_lsn {
353 13 : continue;
354 78 : }
355 78 :
356 78 : let shard_sender_id = ShardSenderId::new(*shard, state.sender_id);
357 78 : let records = records_by_sender.remove(&shard_sender_id).unwrap_or_default();
358 78 :
359 78 : let batch = InterpretedWalRecords {
360 78 : records,
361 78 : next_record_lsn: Some(max_next_record_lsn),
362 78 : };
363 :
364 78 : let res = state.tx.send(Batch {
365 78 : wal_end_lsn,
366 78 : available_wal_end_lsn,
367 78 : records: batch,
368 78 : }).await;
369 :
370 78 : if res.is_err() {
371 0 : shard_senders_to_remove.push(shard_sender_id);
372 78 : } else {
373 78 : state.next_record_lsn = max_next_record_lsn;
374 78 : }
375 : }
376 : }
377 :
378 : // Clean up any shard senders that have dropped out.
379 : // This is inefficient, but such events are rare (connection to PS termination)
380 : // and the number of subscriptions on the same shards very small (only one
381 : // for the steady state).
382 39 : for to_remove in shard_senders_to_remove {
383 0 : let shard_senders = self.shard_senders.get_mut(&to_remove.shard()).expect("saw it above");
384 0 : if let Some(idx) = shard_senders.iter().position(|s| s.sender_id == to_remove.sender_id) {
385 0 : shard_senders.remove(idx);
386 0 : tracing::info!("Removed shard sender {}", to_remove);
387 0 : }
388 :
389 0 : if shard_senders.is_empty() {
390 0 : self.shard_senders.remove(&to_remove.shard());
391 0 : }
392 : }
393 : },
394 : // Listen for new shards that want to attach to this reader.
395 : // If the reader is not running as a task, then this is not supported
396 : // (see the pending branch below).
397 45 : notification = match self.shard_notification_rx.as_mut() {
398 45 : Some(rx) => Either::Left(rx.recv()),
399 0 : None => Either::Right(std::future::pending())
400 : } => {
401 4 : if let Some(n) = notification {
402 4 : let AttachShardNotification { shard_id, sender, start_pos } = n;
403 4 :
404 4 : // Update internal and external state, then reset the WAL stream
405 4 : // if required.
406 4 : let senders = self.shard_senders.entry(shard_id).or_default();
407 4 : let new_sender_id = match senders.last() {
408 3 : Some(sender) => sender.sender_id.next(),
409 1 : None => SenderId::first()
410 : };
411 :
412 4 : senders.push(ShardSenderState { sender_id: new_sender_id, tx: sender, next_record_lsn: start_pos});
413 4 : let current_pos = self.state.read().unwrap().current_position().unwrap();
414 4 : if start_pos < current_pos {
415 1 : self.wal_stream.reset(start_pos).await;
416 1 : wal_decoder = WalStreamDecoder::new(start_pos, self.pg_version);
417 3 : }
418 :
419 4 : tracing::info!(
420 0 : "Added shard sender {} with start_pos={} current_pos={}",
421 0 : ShardSenderId::new(shard_id, new_sender_id), start_pos, current_pos
422 : );
423 0 : }
424 : }
425 : }
426 : }
427 0 : }
428 : }
429 :
430 : impl InterpretedWalReaderHandle {
431 : /// Fan-out the reader by attaching a new shard to it
432 4 : pub(crate) fn fanout(
433 4 : &self,
434 4 : shard_id: ShardIdentity,
435 4 : sender: tokio::sync::mpsc::Sender<Batch>,
436 4 : start_pos: Lsn,
437 4 : ) -> Result<(), SendError<AttachShardNotification>> {
438 4 : self.shard_notification_tx.send(AttachShardNotification {
439 4 : shard_id,
440 4 : sender,
441 4 : start_pos,
442 4 : })
443 4 : }
444 :
445 : /// Get the current WAL position of the reader
446 4 : pub(crate) fn current_position(&self) -> Option<Lsn> {
447 4 : self.state.read().unwrap().current_position()
448 4 : }
449 :
450 4 : pub(crate) fn abort(&self) {
451 4 : self.join_handle.abort()
452 4 : }
453 : }
454 :
455 : impl Drop for InterpretedWalReaderHandle {
456 2 : fn drop(&mut self) {
457 2 : tracing::info!("Aborting interpreted wal reader");
458 2 : self.abort()
459 2 : }
460 : }
461 :
462 : pub(crate) struct InterpretedWalSender<'a, IO> {
463 : pub(crate) format: InterpretedFormat,
464 : pub(crate) compression: Option<Compression>,
465 : pub(crate) appname: Option<String>,
466 :
467 : pub(crate) tli: WalResidentTimeline,
468 : pub(crate) start_lsn: Lsn,
469 :
470 : pub(crate) pgb: &'a mut PostgresBackend<IO>,
471 : pub(crate) end_watch_view: EndWatchView,
472 : pub(crate) wal_sender_guard: Arc<WalSenderGuard>,
473 : pub(crate) rx: tokio::sync::mpsc::Receiver<Batch>,
474 : }
475 :
476 : impl<IO: AsyncRead + AsyncWrite + Unpin> InterpretedWalSender<'_, IO> {
477 : /// Send interpreted WAL records over the network.
478 : /// Also manages keep-alives if nothing was sent for a while.
479 0 : pub(crate) async fn run(mut self) -> Result<(), CopyStreamHandlerEnd> {
480 0 : let mut keepalive_ticker = tokio::time::interval(Duration::from_secs(1));
481 0 : keepalive_ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
482 0 : keepalive_ticker.reset();
483 0 :
484 0 : let mut wal_position = self.start_lsn;
485 :
486 : loop {
487 0 : tokio::select! {
488 0 : batch = self.rx.recv() => {
489 0 : let batch = match batch {
490 0 : Some(b) => b,
491 : None => {
492 0 : return Result::Err(
493 0 : CopyStreamHandlerEnd::Other(anyhow!("Interpreted WAL reader exited early"))
494 0 : );
495 : }
496 : };
497 :
498 0 : wal_position = batch.wal_end_lsn;
499 :
500 0 : let buf = batch
501 0 : .records
502 0 : .to_wire(self.format, self.compression)
503 0 : .await
504 0 : .with_context(|| "Failed to serialize interpreted WAL")
505 0 : .map_err(CopyStreamHandlerEnd::from)?;
506 :
507 : // Reset the keep alive ticker since we are sending something
508 : // over the wire now.
509 0 : keepalive_ticker.reset();
510 0 :
511 0 : self.pgb
512 0 : .write_message(&BeMessage::InterpretedWalRecords(InterpretedWalRecordsBody {
513 0 : streaming_lsn: batch.wal_end_lsn.0,
514 0 : commit_lsn: batch.available_wal_end_lsn.0,
515 0 : data: &buf,
516 0 : })).await?;
517 : }
518 : // Send a periodic keep alive when the connection has been idle for a while.
519 : // Since we've been idle, also check if we can stop streaming.
520 0 : _ = keepalive_ticker.tick() => {
521 0 : if let Some(remote_consistent_lsn) = self.wal_sender_guard
522 0 : .walsenders()
523 0 : .get_ws_remote_consistent_lsn(self.wal_sender_guard.id())
524 : {
525 0 : if self.tli.should_walsender_stop(remote_consistent_lsn).await {
526 : // Stop streaming if the receivers are caught up and
527 : // there's no active compute. This causes the loop in
528 : // [`crate::send_interpreted_wal::InterpretedWalSender::run`]
529 : // to exit and terminate the WAL stream.
530 0 : break;
531 0 : }
532 0 : }
533 :
534 0 : self.pgb
535 0 : .write_message(&BeMessage::KeepAlive(WalSndKeepAlive {
536 0 : wal_end: self.end_watch_view.get().0,
537 0 : timestamp: get_current_timestamp(),
538 0 : request_reply: true,
539 0 : }))
540 0 : .await?;
541 : },
542 : }
543 : }
544 :
545 0 : Err(CopyStreamHandlerEnd::ServerInitiated(format!(
546 0 : "ending streaming to {:?} at {}, receiver is caughtup and there is no computes",
547 0 : self.appname, wal_position,
548 0 : )))
549 0 : }
550 : }
551 : #[cfg(test)]
552 : mod tests {
553 : use std::{collections::HashMap, str::FromStr, time::Duration};
554 :
555 : use pageserver_api::shard::{ShardIdentity, ShardStripeSize};
556 : use postgres_ffi::MAX_SEND_SIZE;
557 : use tokio::sync::mpsc::error::TryRecvError;
558 : use utils::{
559 : id::{NodeId, TenantTimelineId},
560 : lsn::Lsn,
561 : shard::{ShardCount, ShardNumber},
562 : };
563 :
564 : use crate::{
565 : send_interpreted_wal::{Batch, InterpretedWalReader},
566 : test_utils::Env,
567 : wal_reader_stream::StreamingWalReader,
568 : };
569 :
570 : #[tokio::test]
571 1 : async fn test_interpreted_wal_reader_fanout() {
572 1 : let _ = env_logger::builder().is_test(true).try_init();
573 1 :
574 1 : const SIZE: usize = 8 * 1024;
575 1 : const MSG_COUNT: usize = 200;
576 1 : const PG_VERSION: u32 = 17;
577 1 : const SHARD_COUNT: u8 = 2;
578 1 :
579 1 : let start_lsn = Lsn::from_str("0/149FD18").unwrap();
580 1 : let env = Env::new(true).unwrap();
581 1 : let tli = env
582 1 : .make_timeline(NodeId(1), TenantTimelineId::generate(), start_lsn)
583 1 : .await
584 1 : .unwrap();
585 1 :
586 1 : let resident_tli = tli.wal_residence_guard().await.unwrap();
587 1 : let end_watch = Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT)
588 1 : .await
589 1 : .unwrap();
590 1 : let end_pos = end_watch.get();
591 1 :
592 1 : tracing::info!("Doing first round of reads ...");
593 1 :
594 1 : let streaming_wal_reader = StreamingWalReader::new(
595 1 : resident_tli,
596 1 : None,
597 1 : start_lsn,
598 1 : end_pos,
599 1 : end_watch,
600 1 : MAX_SEND_SIZE,
601 1 : );
602 1 :
603 1 : let shard_0 = ShardIdentity::new(
604 1 : ShardNumber(0),
605 1 : ShardCount(SHARD_COUNT),
606 1 : ShardStripeSize::default(),
607 1 : )
608 1 : .unwrap();
609 1 :
610 1 : let shard_1 = ShardIdentity::new(
611 1 : ShardNumber(1),
612 1 : ShardCount(SHARD_COUNT),
613 1 : ShardStripeSize::default(),
614 1 : )
615 1 : .unwrap();
616 1 :
617 1 : let mut shards = HashMap::new();
618 1 :
619 3 : for shard_number in 0..SHARD_COUNT {
620 2 : let shard_id = ShardIdentity::new(
621 2 : ShardNumber(shard_number),
622 2 : ShardCount(SHARD_COUNT),
623 2 : ShardStripeSize::default(),
624 2 : )
625 2 : .unwrap();
626 2 : let (tx, rx) = tokio::sync::mpsc::channel::<Batch>(MSG_COUNT * 2);
627 2 : shards.insert(shard_id, (Some(tx), Some(rx)));
628 2 : }
629 1 :
630 1 : let shard_0_tx = shards.get_mut(&shard_0).unwrap().0.take().unwrap();
631 1 : let mut shard_0_rx = shards.get_mut(&shard_0).unwrap().1.take().unwrap();
632 1 :
633 1 : let handle = InterpretedWalReader::spawn(
634 1 : streaming_wal_reader,
635 1 : start_lsn,
636 1 : shard_0_tx,
637 1 : shard_0,
638 1 : PG_VERSION,
639 1 : &Some("pageserver".to_string()),
640 1 : );
641 1 :
642 1 : tracing::info!("Reading all WAL with only shard 0 attached ...");
643 1 :
644 1 : let mut shard_0_interpreted_records = Vec::new();
645 13 : while let Some(batch) = shard_0_rx.recv().await {
646 13 : shard_0_interpreted_records.push(batch.records);
647 13 : if batch.wal_end_lsn == batch.available_wal_end_lsn {
648 1 : break;
649 12 : }
650 1 : }
651 1 :
652 1 : let shard_1_tx = shards.get_mut(&shard_1).unwrap().0.take().unwrap();
653 1 : let mut shard_1_rx = shards.get_mut(&shard_1).unwrap().1.take().unwrap();
654 1 :
655 1 : tracing::info!("Attaching shard 1 to the reader at start of WAL");
656 1 : handle.fanout(shard_1, shard_1_tx, start_lsn).unwrap();
657 1 :
658 1 : tracing::info!("Reading all WAL with shard 0 and shard 1 attached ...");
659 1 :
660 1 : let mut shard_1_interpreted_records = Vec::new();
661 13 : while let Some(batch) = shard_1_rx.recv().await {
662 13 : shard_1_interpreted_records.push(batch.records);
663 13 : if batch.wal_end_lsn == batch.available_wal_end_lsn {
664 1 : break;
665 12 : }
666 1 : }
667 1 :
668 1 : // This test uses logical messages. Those only go to shard 0. Check that the
669 1 : // filtering worked and shard 1 did not get any.
670 1 : assert!(shard_1_interpreted_records
671 1 : .iter()
672 13 : .all(|recs| recs.records.is_empty()));
673 1 :
674 1 : // Shard 0 should not receive anything more since the reader is
675 1 : // going through wal that it has already processed.
676 1 : let res = shard_0_rx.try_recv();
677 1 : if let Ok(ref ok) = res {
678 1 : tracing::error!(
679 1 : "Shard 0 received batch: wal_end_lsn={} available_wal_end_lsn={}",
680 1 : ok.wal_end_lsn,
681 1 : ok.available_wal_end_lsn
682 1 : );
683 1 : }
684 1 : assert!(matches!(res, Err(TryRecvError::Empty)));
685 1 :
686 1 : // Check that the next records lsns received by the two shards match up.
687 1 : let shard_0_next_lsns = shard_0_interpreted_records
688 1 : .iter()
689 13 : .map(|recs| recs.next_record_lsn)
690 1 : .collect::<Vec<_>>();
691 1 : let shard_1_next_lsns = shard_1_interpreted_records
692 1 : .iter()
693 13 : .map(|recs| recs.next_record_lsn)
694 1 : .collect::<Vec<_>>();
695 1 : assert_eq!(shard_0_next_lsns, shard_1_next_lsns);
696 1 :
697 1 : handle.abort();
698 1 : let mut done = false;
699 2 : for _ in 0..5 {
700 2 : if handle.current_position().is_none() {
701 1 : done = true;
702 1 : break;
703 1 : }
704 1 : tokio::time::sleep(Duration::from_millis(1)).await;
705 1 : }
706 1 :
707 1 : assert!(done);
708 1 : }
709 :
710 : #[tokio::test]
711 1 : async fn test_interpreted_wal_reader_same_shard_fanout() {
712 1 : let _ = env_logger::builder().is_test(true).try_init();
713 1 :
714 1 : const SIZE: usize = 8 * 1024;
715 1 : const MSG_COUNT: usize = 200;
716 1 : const PG_VERSION: u32 = 17;
717 1 : const SHARD_COUNT: u8 = 2;
718 1 : const ATTACHED_SHARDS: u8 = 4;
719 1 :
720 1 : let start_lsn = Lsn::from_str("0/149FD18").unwrap();
721 1 : let env = Env::new(true).unwrap();
722 1 : let tli = env
723 1 : .make_timeline(NodeId(1), TenantTimelineId::generate(), start_lsn)
724 1 : .await
725 1 : .unwrap();
726 1 :
727 1 : let resident_tli = tli.wal_residence_guard().await.unwrap();
728 1 : let end_watch = Env::write_wal(tli, start_lsn, SIZE, MSG_COUNT)
729 1 : .await
730 1 : .unwrap();
731 1 : let end_pos = end_watch.get();
732 1 :
733 1 : let streaming_wal_reader = StreamingWalReader::new(
734 1 : resident_tli,
735 1 : None,
736 1 : start_lsn,
737 1 : end_pos,
738 1 : end_watch,
739 1 : MAX_SEND_SIZE,
740 1 : );
741 1 :
742 1 : let shard_0 = ShardIdentity::new(
743 1 : ShardNumber(0),
744 1 : ShardCount(SHARD_COUNT),
745 1 : ShardStripeSize::default(),
746 1 : )
747 1 : .unwrap();
748 1 :
749 1 : let (tx, rx) = tokio::sync::mpsc::channel::<Batch>(MSG_COUNT * 2);
750 1 : let mut batch_receivers = vec![rx];
751 1 :
752 1 : let handle = InterpretedWalReader::spawn(
753 1 : streaming_wal_reader,
754 1 : start_lsn,
755 1 : tx,
756 1 : shard_0,
757 1 : PG_VERSION,
758 1 : &Some("pageserver".to_string()),
759 1 : );
760 1 :
761 3 : for _ in 0..(ATTACHED_SHARDS - 1) {
762 3 : let (tx, rx) = tokio::sync::mpsc::channel::<Batch>(MSG_COUNT * 2);
763 3 : handle.fanout(shard_0, tx, start_lsn).unwrap();
764 3 : batch_receivers.push(rx);
765 3 : }
766 1 :
767 1 : loop {
768 13 : let batch = batch_receivers.first_mut().unwrap().recv().await.unwrap();
769 39 : for rx in batch_receivers.iter_mut().skip(1) {
770 39 : let other_batch = rx.recv().await.unwrap();
771 39 :
772 39 : assert_eq!(batch.wal_end_lsn, other_batch.wal_end_lsn);
773 39 : assert_eq!(
774 39 : batch.available_wal_end_lsn,
775 39 : other_batch.available_wal_end_lsn
776 39 : );
777 1 : }
778 1 :
779 13 : if batch.wal_end_lsn == batch.available_wal_end_lsn {
780 1 : break;
781 12 : }
782 1 : }
783 1 :
784 1 : handle.abort();
785 1 : let mut done = false;
786 2 : for _ in 0..5 {
787 2 : if handle.current_position().is_none() {
788 1 : done = true;
789 1 : break;
790 1 : }
791 1 : tokio::time::sleep(Duration::from_millis(1)).await;
792 1 : }
793 1 :
794 1 : assert!(done);
795 1 : }
796 : }
|