Line data Source code
1 : use std::time::Duration;
2 :
3 : use bytes::Bytes;
4 : use pageserver_api::{reltag::RelTag, shard::TenantShardId};
5 : use utils::lsn::Lsn;
6 :
7 : use crate::{config::PageServerConf, walrecord::NeonWalRecord};
8 :
9 : mod no_leak_child;
10 : /// The IPC protocol that pageserver and walredo process speak over their shared pipe.
11 : mod protocol;
12 :
13 : mod process_impl {
14 : pub(super) mod process_async;
15 : pub(super) mod process_std;
16 : }
17 :
18 : #[derive(
19 : Clone,
20 : Copy,
21 : Debug,
22 : PartialEq,
23 : Eq,
24 148 : strum_macros::EnumString,
25 0 : strum_macros::Display,
26 0 : strum_macros::IntoStaticStr,
27 0 : serde_with::DeserializeFromStr,
28 : serde_with::SerializeDisplay,
29 : )]
30 : #[strum(serialize_all = "kebab-case")]
31 : #[repr(u8)]
32 : pub enum Kind {
33 : Sync,
34 : Async,
35 : }
36 :
37 : pub(crate) enum Process {
38 : Sync(process_impl::process_std::WalRedoProcess),
39 : Async(process_impl::process_async::WalRedoProcess),
40 : }
41 :
42 : impl Process {
43 : #[inline(always)]
44 8 : pub fn launch(
45 8 : conf: &'static PageServerConf,
46 8 : tenant_shard_id: TenantShardId,
47 8 : pg_version: u32,
48 8 : ) -> anyhow::Result<Self> {
49 8 : Ok(match conf.walredo_process_kind {
50 8 : Kind::Sync => Self::Sync(process_impl::process_std::WalRedoProcess::launch(
51 8 : conf,
52 8 : tenant_shard_id,
53 8 : pg_version,
54 8 : )?),
55 0 : Kind::Async => Self::Async(process_impl::process_async::WalRedoProcess::launch(
56 0 : conf,
57 0 : tenant_shard_id,
58 0 : pg_version,
59 0 : )?),
60 : })
61 8 : }
62 :
63 : #[inline(always)]
64 8 : pub(crate) async fn apply_wal_records(
65 8 : &self,
66 8 : rel: RelTag,
67 8 : blknum: u32,
68 8 : base_img: &Option<Bytes>,
69 8 : records: &[(Lsn, NeonWalRecord)],
70 8 : wal_redo_timeout: Duration,
71 8 : ) -> anyhow::Result<Bytes> {
72 8 : match self {
73 8 : Process::Sync(p) => {
74 8 : p.apply_wal_records(rel, blknum, base_img, records, wal_redo_timeout)
75 0 : .await
76 : }
77 0 : Process::Async(p) => {
78 0 : p.apply_wal_records(rel, blknum, base_img, records, wal_redo_timeout)
79 0 : .await
80 : }
81 : }
82 8 : }
83 :
84 8 : pub(crate) fn id(&self) -> u32 {
85 8 : match self {
86 8 : Process::Sync(p) => p.id(),
87 0 : Process::Async(p) => p.id(),
88 : }
89 8 : }
90 :
91 0 : pub(crate) fn kind(&self) -> Kind {
92 0 : match self {
93 0 : Process::Sync(_) => Kind::Sync,
94 0 : Process::Async(_) => Kind::Async,
95 : }
96 0 : }
97 : }
|