Line data Source code
1 : mod downloader;
2 : pub mod heatmap;
3 : mod heatmap_uploader;
4 : mod scheduler;
5 :
6 : use std::{sync::Arc, time::SystemTime};
7 :
8 : use crate::{
9 : context::RequestContext,
10 : disk_usage_eviction_task::DiskUsageEvictionInfo,
11 : task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
12 : };
13 :
14 : use self::{
15 : downloader::{downloader_task, SecondaryDetail},
16 : heatmap_uploader::heatmap_uploader_task,
17 : };
18 :
19 : use super::{
20 : config::{SecondaryLocationConfig, TenantConfOpt},
21 : mgr::TenantManager,
22 : span::debug_assert_current_span_has_tenant_id,
23 : storage_layer::LayerName,
24 : };
25 :
26 : use crate::metrics::SECONDARY_RESIDENT_PHYSICAL_SIZE;
27 : use metrics::UIntGauge;
28 : use pageserver_api::{
29 : models,
30 : shard::{ShardIdentity, TenantShardId},
31 : };
32 : use remote_storage::GenericRemoteStorage;
33 :
34 : use tokio_util::sync::CancellationToken;
35 : use tracing::instrument;
36 : use utils::{completion::Barrier, id::TimelineId, sync::gate::Gate};
37 :
38 : enum DownloadCommand {
39 : Download(TenantShardId),
40 : }
41 : enum UploadCommand {
42 : Upload(TenantShardId),
43 : }
44 :
45 : impl UploadCommand {
46 0 : fn get_tenant_shard_id(&self) -> &TenantShardId {
47 0 : match self {
48 0 : Self::Upload(id) => id,
49 0 : }
50 0 : }
51 : }
52 :
53 : impl DownloadCommand {
54 0 : fn get_tenant_shard_id(&self) -> &TenantShardId {
55 0 : match self {
56 0 : Self::Download(id) => id,
57 0 : }
58 0 : }
59 : }
60 :
61 : struct CommandRequest<T> {
62 : payload: T,
63 : response_tx: tokio::sync::oneshot::Sender<CommandResponse>,
64 : }
65 :
66 : struct CommandResponse {
67 : result: anyhow::Result<()>,
68 : }
69 :
70 : // Whereas [`Tenant`] represents an attached tenant, this type represents the work
71 : // we do for secondary tenant locations: where we are not serving clients or
72 : // ingesting WAL, but we are maintaining a warm cache of layer files.
73 : //
74 : // This type is all about the _download_ path for secondary mode. The upload path
75 : // runs separately (see [`heatmap_uploader`]) while a regular attached `Tenant` exists.
76 : //
77 : // This structure coordinates TenantManager and SecondaryDownloader,
78 : // so that the downloader can indicate which tenants it is currently
79 : // operating on, and the manager can indicate when a particular
80 : // secondary tenant should cancel any work in flight.
81 : #[derive(Debug)]
82 : pub(crate) struct SecondaryTenant {
83 : /// Carrying a tenant shard ID simplifies callers such as the downloader
84 : /// which need to organize many of these objects by ID.
85 : tenant_shard_id: TenantShardId,
86 :
87 : /// Cancellation token indicates to SecondaryDownloader that it should stop doing
88 : /// any work for this tenant at the next opportunity.
89 : pub(crate) cancel: CancellationToken,
90 :
91 : pub(crate) gate: Gate,
92 :
93 : // Secondary mode does not need the full shard identity or the TenantConfOpt. However,
94 : // storing these enables us to report our full LocationConf, enabling convenient reconciliation
95 : // by the control plane (see [`Self::get_location_conf`])
96 : shard_identity: ShardIdentity,
97 : tenant_conf: std::sync::Mutex<TenantConfOpt>,
98 :
99 : // Internal state used by the Downloader.
100 : detail: std::sync::Mutex<SecondaryDetail>,
101 :
102 : // Public state indicating overall progress of downloads relative to the last heatmap seen
103 : pub(crate) progress: std::sync::Mutex<models::SecondaryProgress>,
104 :
105 : // Sum of layer sizes on local disk
106 : pub(super) resident_size_metric: UIntGauge,
107 : }
108 :
109 : impl Drop for SecondaryTenant {
110 0 : fn drop(&mut self) {
111 0 : let tenant_id = self.tenant_shard_id.tenant_id.to_string();
112 0 : let shard_id = format!("{}", self.tenant_shard_id.shard_slug());
113 0 : let _ = SECONDARY_RESIDENT_PHYSICAL_SIZE.remove_label_values(&[&tenant_id, &shard_id]);
114 0 : }
115 : }
116 :
117 : impl SecondaryTenant {
118 0 : pub(crate) fn new(
119 0 : tenant_shard_id: TenantShardId,
120 0 : shard_identity: ShardIdentity,
121 0 : tenant_conf: TenantConfOpt,
122 0 : config: &SecondaryLocationConfig,
123 0 : ) -> Arc<Self> {
124 0 : let tenant_id = tenant_shard_id.tenant_id.to_string();
125 0 : let shard_id = format!("{}", tenant_shard_id.shard_slug());
126 0 : let resident_size_metric = SECONDARY_RESIDENT_PHYSICAL_SIZE
127 0 : .get_metric_with_label_values(&[&tenant_id, &shard_id])
128 0 : .unwrap();
129 0 :
130 0 : Arc::new(Self {
131 0 : tenant_shard_id,
132 0 : // todo: shall we make this a descendent of the
133 0 : // main cancellation token, or is it sufficient that
134 0 : // on shutdown we walk the tenants and fire their
135 0 : // individual cancellations?
136 0 : cancel: CancellationToken::new(),
137 0 : gate: Gate::default(),
138 0 :
139 0 : shard_identity,
140 0 : tenant_conf: std::sync::Mutex::new(tenant_conf),
141 0 :
142 0 : detail: std::sync::Mutex::new(SecondaryDetail::new(config.clone())),
143 0 :
144 0 : progress: std::sync::Mutex::default(),
145 0 :
146 0 : resident_size_metric,
147 0 : })
148 0 : }
149 :
150 0 : pub(crate) fn tenant_shard_id(&self) -> TenantShardId {
151 0 : self.tenant_shard_id
152 0 : }
153 :
154 0 : pub(crate) async fn shutdown(&self) {
155 0 : self.cancel.cancel();
156 0 :
157 0 : // Wait for any secondary downloader work to complete
158 0 : self.gate.close().await;
159 0 : }
160 :
161 0 : pub(crate) fn set_config(&self, config: &SecondaryLocationConfig) {
162 0 : self.detail.lock().unwrap().config = config.clone();
163 0 : }
164 :
165 0 : pub(crate) fn set_tenant_conf(&self, config: &TenantConfOpt) {
166 0 : *(self.tenant_conf.lock().unwrap()) = config.clone();
167 0 : }
168 :
169 : /// For API access: generate a LocationConfig equivalent to the one that would be used to
170 : /// create a Tenant in the same state. Do not use this in hot paths: it's for relatively
171 : /// rare external API calls, like a reconciliation at startup.
172 0 : pub(crate) fn get_location_conf(&self) -> models::LocationConfig {
173 0 : let conf = self.detail.lock().unwrap().config.clone();
174 0 :
175 0 : let conf = models::LocationConfigSecondary { warm: conf.warm };
176 0 :
177 0 : let tenant_conf = self.tenant_conf.lock().unwrap().clone();
178 0 : models::LocationConfig {
179 0 : mode: models::LocationConfigMode::Secondary,
180 0 : generation: None,
181 0 : secondary_conf: Some(conf),
182 0 : shard_number: self.tenant_shard_id.shard_number.0,
183 0 : shard_count: self.tenant_shard_id.shard_count.literal(),
184 0 : shard_stripe_size: self.shard_identity.stripe_size.0,
185 0 : tenant_conf: tenant_conf.into(),
186 0 : }
187 0 : }
188 :
189 0 : pub(crate) fn get_tenant_shard_id(&self) -> &TenantShardId {
190 0 : &self.tenant_shard_id
191 0 : }
192 :
193 0 : pub(crate) fn get_layers_for_eviction(self: &Arc<Self>) -> (DiskUsageEvictionInfo, usize) {
194 0 : self.detail.lock().unwrap().get_layers_for_eviction(self)
195 0 : }
196 :
197 : /// Cancellation safe, but on cancellation the eviction will go through
198 0 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%timeline_id, name=%name))]
199 : pub(crate) async fn evict_layer(self: &Arc<Self>, timeline_id: TimelineId, name: LayerName) {
200 : debug_assert_current_span_has_tenant_id();
201 :
202 : let guard = match self.gate.enter() {
203 : Ok(g) => g,
204 : Err(_) => {
205 : tracing::debug!("Dropping layer evictions, secondary tenant shutting down",);
206 : return;
207 : }
208 : };
209 :
210 : let now = SystemTime::now();
211 : tracing::info!("Evicting secondary layer");
212 :
213 : let this = self.clone();
214 :
215 : // spawn it to be cancellation safe
216 0 : tokio::task::spawn_blocking(move || {
217 0 : let _guard = guard;
218 0 :
219 0 : // Update the timeline's state. This does not have to be synchronized with
220 0 : // the download process, because:
221 0 : // - If downloader is racing with us to remove a file (e.g. because it is
222 0 : // removed from heatmap), then our mutual .remove() operations will both
223 0 : // succeed.
224 0 : // - If downloader is racing with us to download the object (this would require
225 0 : // multiple eviction iterations to race with multiple download iterations), then
226 0 : // if we remove it from the state, the worst that happens is the downloader
227 0 : // downloads it again before re-inserting, or we delete the file but it remains
228 0 : // in the state map (in which case it will be downloaded if this secondary
229 0 : // tenant transitions to attached and tries to access it)
230 0 : //
231 0 : // The important assumption here is that the secondary timeline state does not
232 0 : // have to 100% match what is on disk, because it's a best-effort warming
233 0 : // of the cache.
234 0 : let mut detail = this.detail.lock().unwrap();
235 0 : if let Some(removed) =
236 0 : detail.evict_layer(name, &timeline_id, now, &this.resident_size_metric)
237 0 : {
238 0 : // We might race with removal of the same layer during downloads, so finding the layer we
239 0 : // were trying to remove is optional. Only issue the disk I/O to remove it if we found it.
240 0 : removed.remove_blocking();
241 0 : }
242 0 : })
243 : .await
244 : .expect("secondary eviction should not have panicked");
245 : }
246 : }
247 :
248 : /// The SecondaryController is a pseudo-rpc client for administrative control of secondary mode downloads,
249 : /// and heatmap uploads. This is not a hot data path: it's used for:
250 : /// - Live migrations, where we want to ensure a migration destination has the freshest possible
251 : /// content before trying to cut over.
252 : /// - Tests, where we want to immediately upload/download for a particular tenant.
253 : ///
254 : /// In normal operations, outside of migrations, uploads & downloads are autonomous and not driven by this interface.
255 : pub struct SecondaryController {
256 : upload_req_tx: tokio::sync::mpsc::Sender<CommandRequest<UploadCommand>>,
257 : download_req_tx: tokio::sync::mpsc::Sender<CommandRequest<DownloadCommand>>,
258 : }
259 :
260 : impl SecondaryController {
261 0 : async fn dispatch<T>(
262 0 : &self,
263 0 : queue: &tokio::sync::mpsc::Sender<CommandRequest<T>>,
264 0 : payload: T,
265 0 : ) -> anyhow::Result<()> {
266 0 : let (response_tx, response_rx) = tokio::sync::oneshot::channel();
267 0 :
268 0 : queue
269 0 : .send(CommandRequest {
270 0 : payload,
271 0 : response_tx,
272 0 : })
273 0 : .await
274 0 : .map_err(|_| anyhow::anyhow!("Receiver shut down"))?;
275 :
276 0 : let response = response_rx
277 0 : .await
278 0 : .map_err(|_| anyhow::anyhow!("Request dropped"))?;
279 :
280 0 : response.result
281 0 : }
282 :
283 0 : pub async fn upload_tenant(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> {
284 0 : self.dispatch(&self.upload_req_tx, UploadCommand::Upload(tenant_shard_id))
285 0 : .await
286 0 : }
287 0 : pub async fn download_tenant(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> {
288 0 : self.dispatch(
289 0 : &self.download_req_tx,
290 0 : DownloadCommand::Download(tenant_shard_id),
291 0 : )
292 0 : .await
293 0 : }
294 : }
295 :
296 0 : pub fn spawn_tasks(
297 0 : tenant_manager: Arc<TenantManager>,
298 0 : remote_storage: GenericRemoteStorage,
299 0 : background_jobs_can_start: Barrier,
300 0 : cancel: CancellationToken,
301 0 : ) -> SecondaryController {
302 0 : let mgr_clone = tenant_manager.clone();
303 0 : let storage_clone = remote_storage.clone();
304 0 : let cancel_clone = cancel.clone();
305 0 : let bg_jobs_clone = background_jobs_can_start.clone();
306 0 :
307 0 : let (download_req_tx, download_req_rx) =
308 0 : tokio::sync::mpsc::channel::<CommandRequest<DownloadCommand>>(16);
309 0 : let (upload_req_tx, upload_req_rx) =
310 0 : tokio::sync::mpsc::channel::<CommandRequest<UploadCommand>>(16);
311 0 :
312 0 : let downloader_task_ctx = RequestContext::new(
313 0 : TaskKind::SecondaryDownloads,
314 0 : crate::context::DownloadBehavior::Download,
315 0 : );
316 0 : task_mgr::spawn(
317 0 : BACKGROUND_RUNTIME.handle(),
318 0 : downloader_task_ctx.task_kind(),
319 0 : None,
320 0 : None,
321 0 : "secondary tenant downloads",
322 0 : false,
323 0 : async move {
324 0 : downloader_task(
325 0 : mgr_clone,
326 0 : storage_clone,
327 0 : download_req_rx,
328 0 : bg_jobs_clone,
329 0 : cancel_clone,
330 0 : downloader_task_ctx,
331 0 : )
332 0 : .await;
333 :
334 0 : Ok(())
335 0 : },
336 0 : );
337 0 :
338 0 : task_mgr::spawn(
339 0 : BACKGROUND_RUNTIME.handle(),
340 0 : TaskKind::SecondaryUploads,
341 0 : None,
342 0 : None,
343 0 : "heatmap uploads",
344 0 : false,
345 0 : async move {
346 0 : heatmap_uploader_task(
347 0 : tenant_manager,
348 0 : remote_storage,
349 0 : upload_req_rx,
350 0 : background_jobs_can_start,
351 0 : cancel,
352 0 : )
353 0 : .await;
354 :
355 0 : Ok(())
356 0 : },
357 0 : );
358 0 :
359 0 : SecondaryController {
360 0 : download_req_tx,
361 0 : upload_req_tx,
362 0 : }
363 0 : }
364 :
365 : /// For running with remote storage disabled: a SecondaryController that is connected to nothing.
366 0 : pub fn null_controller() -> SecondaryController {
367 0 : let (download_req_tx, _download_req_rx) =
368 0 : tokio::sync::mpsc::channel::<CommandRequest<DownloadCommand>>(16);
369 0 : let (upload_req_tx, _upload_req_rx) =
370 0 : tokio::sync::mpsc::channel::<CommandRequest<UploadCommand>>(16);
371 0 : SecondaryController {
372 0 : upload_req_tx,
373 0 : download_req_tx,
374 0 : }
375 0 : }
|