TLA Line data Source code
1 : mod downloader;
2 : pub mod heatmap;
3 : mod heatmap_uploader;
4 : mod scheduler;
5 :
6 : use std::sync::Arc;
7 :
8 : use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
9 :
10 : use self::{
11 : downloader::{downloader_task, SecondaryDetail},
12 : heatmap_uploader::heatmap_uploader_task,
13 : };
14 :
15 : use super::{config::SecondaryLocationConfig, mgr::TenantManager};
16 :
17 : use pageserver_api::shard::TenantShardId;
18 : use remote_storage::GenericRemoteStorage;
19 :
20 : use tokio_util::sync::CancellationToken;
21 : use utils::{completion::Barrier, sync::gate::Gate};
22 :
23 : enum DownloadCommand {
24 : Download(TenantShardId),
25 : }
26 : enum UploadCommand {
27 : Upload(TenantShardId),
28 : }
29 :
30 : impl UploadCommand {
31 CBC 6 : fn get_tenant_shard_id(&self) -> &TenantShardId {
32 6 : match self {
33 6 : Self::Upload(id) => id,
34 6 : }
35 6 : }
36 : }
37 :
38 : impl DownloadCommand {
39 4 : fn get_tenant_shard_id(&self) -> &TenantShardId {
40 4 : match self {
41 4 : Self::Download(id) => id,
42 4 : }
43 4 : }
44 : }
45 :
46 : struct CommandRequest<T> {
47 : payload: T,
48 : response_tx: tokio::sync::oneshot::Sender<CommandResponse>,
49 : }
50 :
51 : struct CommandResponse {
52 : result: anyhow::Result<()>,
53 : }
54 :
55 : // Whereas [`Tenant`] represents an attached tenant, this type represents the work
56 : // we do for secondary tenant locations: where we are not serving clients or
57 : // ingesting WAL, but we are maintaining a warm cache of layer files.
58 : //
59 : // This type is all about the _download_ path for secondary mode. The upload path
60 : // runs separately (see [`heatmap_uploader`]) while a regular attached `Tenant` exists.
61 : //
62 : // This structure coordinates TenantManager and SecondaryDownloader,
63 : // so that the downloader can indicate which tenants it is currently
64 : // operating on, and the manager can indicate when a particular
65 : // secondary tenant should cancel any work in flight.
66 UBC 0 : #[derive(Debug)]
67 : pub(crate) struct SecondaryTenant {
68 : /// Carrying a tenant shard ID simplifies callers such as the downloader
69 : /// which need to organize many of these objects by ID.
70 : tenant_shard_id: TenantShardId,
71 :
72 : /// Cancellation token indicates to SecondaryDownloader that it should stop doing
73 : /// any work for this tenant at the next opportunity.
74 : pub(crate) cancel: CancellationToken,
75 :
76 : pub(crate) gate: Gate,
77 :
78 : detail: std::sync::Mutex<SecondaryDetail>,
79 : }
80 :
81 : impl SecondaryTenant {
82 CBC 27 : pub(crate) fn new(
83 27 : tenant_shard_id: TenantShardId,
84 27 : config: &SecondaryLocationConfig,
85 27 : ) -> Arc<Self> {
86 27 : Arc::new(Self {
87 27 : tenant_shard_id,
88 27 : // todo: shall we make this a descendent of the
89 27 : // main cancellation token, or is it sufficient that
90 27 : // on shutdown we walk the tenants and fire their
91 27 : // individual cancellations?
92 27 : cancel: CancellationToken::new(),
93 27 : gate: Gate::new(format!("SecondaryTenant {tenant_shard_id}")),
94 27 :
95 27 : detail: std::sync::Mutex::new(SecondaryDetail::new(config.clone())),
96 27 : })
97 27 : }
98 :
99 21 : pub(crate) async fn shutdown(&self) {
100 21 : self.cancel.cancel();
101 21 :
102 21 : // Wait for any secondary downloader work to complete
103 21 : self.gate.close().await;
104 21 : }
105 :
106 6 : pub(crate) fn set_config(&self, config: &SecondaryLocationConfig) {
107 6 : self.detail.lock().unwrap().config = config.clone();
108 6 : }
109 :
110 65 : fn get_tenant_shard_id(&self) -> &TenantShardId {
111 65 : &self.tenant_shard_id
112 65 : }
113 : }
114 :
115 : /// The SecondaryController is a pseudo-rpc client for administrative control of secondary mode downloads,
116 : /// and heatmap uploads. This is not a hot data path: it's primarily a hook for tests,
117 : /// where we want to immediately upload/download for a particular tenant. In normal operation
118 : /// uploads & downloads are autonomous and not driven by this interface.
119 : pub struct SecondaryController {
120 : upload_req_tx: tokio::sync::mpsc::Sender<CommandRequest<UploadCommand>>,
121 : download_req_tx: tokio::sync::mpsc::Sender<CommandRequest<DownloadCommand>>,
122 : }
123 :
124 : impl SecondaryController {
125 10 : async fn dispatch<T>(
126 10 : &self,
127 10 : queue: &tokio::sync::mpsc::Sender<CommandRequest<T>>,
128 10 : payload: T,
129 10 : ) -> anyhow::Result<()> {
130 10 : let (response_tx, response_rx) = tokio::sync::oneshot::channel();
131 10 :
132 10 : queue
133 10 : .send(CommandRequest {
134 10 : payload,
135 10 : response_tx,
136 10 : })
137 UBC 0 : .await
138 CBC 10 : .map_err(|_| anyhow::anyhow!("Receiver shut down"))?;
139 :
140 10 : let response = response_rx
141 10 : .await
142 10 : .map_err(|_| anyhow::anyhow!("Request dropped"))?;
143 :
144 10 : response.result
145 10 : }
146 :
147 6 : pub async fn upload_tenant(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> {
148 6 : self.dispatch(&self.upload_req_tx, UploadCommand::Upload(tenant_shard_id))
149 6 : .await
150 6 : }
151 4 : pub async fn download_tenant(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> {
152 4 : self.dispatch(
153 4 : &self.download_req_tx,
154 4 : DownloadCommand::Download(tenant_shard_id),
155 4 : )
156 4 : .await
157 4 : }
158 : }
159 :
160 557 : pub fn spawn_tasks(
161 557 : tenant_manager: Arc<TenantManager>,
162 557 : remote_storage: GenericRemoteStorage,
163 557 : background_jobs_can_start: Barrier,
164 557 : cancel: CancellationToken,
165 557 : ) -> SecondaryController {
166 557 : let mgr_clone = tenant_manager.clone();
167 557 : let storage_clone = remote_storage.clone();
168 557 : let cancel_clone = cancel.clone();
169 557 : let bg_jobs_clone = background_jobs_can_start.clone();
170 557 :
171 557 : let (download_req_tx, download_req_rx) =
172 557 : tokio::sync::mpsc::channel::<CommandRequest<DownloadCommand>>(16);
173 557 : let (upload_req_tx, upload_req_rx) =
174 557 : tokio::sync::mpsc::channel::<CommandRequest<UploadCommand>>(16);
175 557 :
176 557 : task_mgr::spawn(
177 557 : BACKGROUND_RUNTIME.handle(),
178 557 : TaskKind::SecondaryDownloads,
179 557 : None,
180 557 : None,
181 557 : "secondary tenant downloads",
182 557 : false,
183 557 : async move {
184 557 : downloader_task(
185 557 : mgr_clone,
186 557 : storage_clone,
187 557 : download_req_rx,
188 557 : bg_jobs_clone,
189 557 : cancel_clone,
190 557 : )
191 907 : .await;
192 :
193 159 : Ok(())
194 557 : },
195 557 : );
196 557 :
197 557 : task_mgr::spawn(
198 557 : BACKGROUND_RUNTIME.handle(),
199 557 : TaskKind::SecondaryUploads,
200 557 : None,
201 557 : None,
202 557 : "heatmap uploads",
203 557 : false,
204 557 : async move {
205 557 : heatmap_uploader_task(
206 557 : tenant_manager,
207 557 : remote_storage,
208 557 : upload_req_rx,
209 557 : background_jobs_can_start,
210 557 : cancel,
211 557 : )
212 908 : .await;
213 :
214 159 : Ok(())
215 557 : },
216 557 : );
217 557 :
218 557 : SecondaryController {
219 557 : download_req_tx,
220 557 : upload_req_tx,
221 557 : }
222 557 : }
223 :
224 : /// For running with remote storage disabled: a SecondaryController that is connected to nothing.
225 UBC 0 : pub fn null_controller() -> SecondaryController {
226 0 : let (download_req_tx, _download_req_rx) =
227 0 : tokio::sync::mpsc::channel::<CommandRequest<DownloadCommand>>(16);
228 0 : let (upload_req_tx, _upload_req_rx) =
229 0 : tokio::sync::mpsc::channel::<CommandRequest<UploadCommand>>(16);
230 0 : SecondaryController {
231 0 : upload_req_tx,
232 0 : download_req_tx,
233 0 : }
234 0 : }
|