Line data Source code
1 : mod downloader;
2 : pub mod heatmap;
3 : mod heatmap_uploader;
4 : mod scheduler;
5 :
6 : use std::{sync::Arc, time::SystemTime};
7 :
8 : use crate::{
9 : config::PageServerConf,
10 : context::RequestContext,
11 : disk_usage_eviction_task::DiskUsageEvictionInfo,
12 : task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
13 : virtual_file::MaybeFatalIo,
14 : };
15 :
16 : use self::{
17 : downloader::{downloader_task, SecondaryDetail},
18 : heatmap_uploader::heatmap_uploader_task,
19 : };
20 :
21 : use super::{
22 : config::{SecondaryLocationConfig, TenantConfOpt},
23 : mgr::TenantManager,
24 : remote_timeline_client::LayerFileMetadata,
25 : span::debug_assert_current_span_has_tenant_id,
26 : storage_layer::{layer::local_layer_path, LayerName},
27 : };
28 :
29 : use pageserver_api::{
30 : models,
31 : shard::{ShardIdentity, TenantShardId},
32 : };
33 : use remote_storage::GenericRemoteStorage;
34 :
35 : use tokio_util::sync::CancellationToken;
36 : use tracing::instrument;
37 : use utils::{completion::Barrier, id::TimelineId, sync::gate::Gate};
38 :
39 : enum DownloadCommand {
40 : Download(TenantShardId),
41 : }
42 : enum UploadCommand {
43 : Upload(TenantShardId),
44 : }
45 :
46 : impl UploadCommand {
47 0 : fn get_tenant_shard_id(&self) -> &TenantShardId {
48 0 : match self {
49 0 : Self::Upload(id) => id,
50 0 : }
51 0 : }
52 : }
53 :
54 : impl DownloadCommand {
55 0 : fn get_tenant_shard_id(&self) -> &TenantShardId {
56 0 : match self {
57 0 : Self::Download(id) => id,
58 0 : }
59 0 : }
60 : }
61 :
62 : struct CommandRequest<T> {
63 : payload: T,
64 : response_tx: tokio::sync::oneshot::Sender<CommandResponse>,
65 : }
66 :
67 : struct CommandResponse {
68 : result: anyhow::Result<()>,
69 : }
70 :
71 : // Whereas [`Tenant`] represents an attached tenant, this type represents the work
72 : // we do for secondary tenant locations: where we are not serving clients or
73 : // ingesting WAL, but we are maintaining a warm cache of layer files.
74 : //
75 : // This type is all about the _download_ path for secondary mode. The upload path
76 : // runs separately (see [`heatmap_uploader`]) while a regular attached `Tenant` exists.
77 : //
78 : // This structure coordinates TenantManager and SecondaryDownloader,
79 : // so that the downloader can indicate which tenants it is currently
80 : // operating on, and the manager can indicate when a particular
81 : // secondary tenant should cancel any work in flight.
82 : #[derive(Debug)]
83 : pub(crate) struct SecondaryTenant {
84 : /// Carrying a tenant shard ID simplifies callers such as the downloader
85 : /// which need to organize many of these objects by ID.
86 : tenant_shard_id: TenantShardId,
87 :
88 : /// Cancellation token indicates to SecondaryDownloader that it should stop doing
89 : /// any work for this tenant at the next opportunity.
90 : pub(crate) cancel: CancellationToken,
91 :
92 : pub(crate) gate: Gate,
93 :
94 : // Secondary mode does not need the full shard identity or the TenantConfOpt. However,
95 : // storing these enables us to report our full LocationConf, enabling convenient reconciliation
96 : // by the control plane (see [`Self::get_location_conf`])
97 : shard_identity: ShardIdentity,
98 : tenant_conf: std::sync::Mutex<TenantConfOpt>,
99 :
100 : // Internal state used by the Downloader.
101 : detail: std::sync::Mutex<SecondaryDetail>,
102 :
103 : // Public state indicating overall progress of downloads relative to the last heatmap seen
104 : pub(crate) progress: std::sync::Mutex<models::SecondaryProgress>,
105 : }
106 :
107 : impl SecondaryTenant {
108 0 : pub(crate) fn new(
109 0 : tenant_shard_id: TenantShardId,
110 0 : shard_identity: ShardIdentity,
111 0 : tenant_conf: TenantConfOpt,
112 0 : config: &SecondaryLocationConfig,
113 0 : ) -> Arc<Self> {
114 0 : Arc::new(Self {
115 0 : tenant_shard_id,
116 0 : // todo: shall we make this a descendent of the
117 0 : // main cancellation token, or is it sufficient that
118 0 : // on shutdown we walk the tenants and fire their
119 0 : // individual cancellations?
120 0 : cancel: CancellationToken::new(),
121 0 : gate: Gate::default(),
122 0 :
123 0 : shard_identity,
124 0 : tenant_conf: std::sync::Mutex::new(tenant_conf),
125 0 :
126 0 : detail: std::sync::Mutex::new(SecondaryDetail::new(config.clone())),
127 0 :
128 0 : progress: std::sync::Mutex::default(),
129 0 : })
130 0 : }
131 :
132 0 : pub(crate) fn tenant_shard_id(&self) -> TenantShardId {
133 0 : self.tenant_shard_id
134 0 : }
135 :
136 0 : pub(crate) async fn shutdown(&self) {
137 0 : self.cancel.cancel();
138 0 :
139 0 : // Wait for any secondary downloader work to complete
140 0 : self.gate.close().await;
141 0 : }
142 :
143 0 : pub(crate) fn set_config(&self, config: &SecondaryLocationConfig) {
144 0 : self.detail.lock().unwrap().config = config.clone();
145 0 : }
146 :
147 0 : pub(crate) fn set_tenant_conf(&self, config: &TenantConfOpt) {
148 0 : *(self.tenant_conf.lock().unwrap()) = config.clone();
149 0 : }
150 :
151 : /// For API access: generate a LocationConfig equivalent to the one that would be used to
152 : /// create a Tenant in the same state. Do not use this in hot paths: it's for relatively
153 : /// rare external API calls, like a reconciliation at startup.
154 0 : pub(crate) fn get_location_conf(&self) -> models::LocationConfig {
155 0 : let conf = self.detail.lock().unwrap().config.clone();
156 0 :
157 0 : let conf = models::LocationConfigSecondary { warm: conf.warm };
158 0 :
159 0 : let tenant_conf = self.tenant_conf.lock().unwrap().clone();
160 0 : models::LocationConfig {
161 0 : mode: models::LocationConfigMode::Secondary,
162 0 : generation: None,
163 0 : secondary_conf: Some(conf),
164 0 : shard_number: self.tenant_shard_id.shard_number.0,
165 0 : shard_count: self.tenant_shard_id.shard_count.literal(),
166 0 : shard_stripe_size: self.shard_identity.stripe_size.0,
167 0 : tenant_conf: tenant_conf.into(),
168 0 : }
169 0 : }
170 :
171 0 : pub(crate) fn get_tenant_shard_id(&self) -> &TenantShardId {
172 0 : &self.tenant_shard_id
173 0 : }
174 :
175 0 : pub(crate) fn get_layers_for_eviction(self: &Arc<Self>) -> (DiskUsageEvictionInfo, usize) {
176 0 : self.detail.lock().unwrap().get_layers_for_eviction(self)
177 0 : }
178 :
179 : /// Cancellation safe, but on cancellation the eviction will go through
180 0 : #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%timeline_id, name=%name))]
181 : pub(crate) async fn evict_layer(
182 : self: &Arc<Self>,
183 : conf: &PageServerConf,
184 : timeline_id: TimelineId,
185 : name: LayerName,
186 : metadata: LayerFileMetadata,
187 : ) {
188 : debug_assert_current_span_has_tenant_id();
189 :
190 : let guard = match self.gate.enter() {
191 : Ok(g) => g,
192 : Err(_) => {
193 : tracing::debug!("Dropping layer evictions, secondary tenant shutting down",);
194 : return;
195 : }
196 : };
197 :
198 : let now = SystemTime::now();
199 :
200 : let local_path = local_layer_path(
201 : conf,
202 : &self.tenant_shard_id,
203 : &timeline_id,
204 : &name,
205 : &metadata.generation,
206 : );
207 :
208 : let this = self.clone();
209 :
210 : // spawn it to be cancellation safe
211 0 : tokio::task::spawn_blocking(move || {
212 0 : let _guard = guard;
213 0 : // We tolerate ENOENT, because between planning eviction and executing
214 0 : // it, the secondary downloader could have seen an updated heatmap that
215 0 : // resulted in a layer being deleted.
216 0 : // Other local I/O errors are process-fatal: these should never happen.
217 0 : let deleted = std::fs::remove_file(local_path);
218 0 :
219 0 : let not_found = deleted
220 0 : .as_ref()
221 0 : .is_err_and(|x| x.kind() == std::io::ErrorKind::NotFound);
222 :
223 0 : let deleted = if not_found {
224 0 : false
225 : } else {
226 0 : deleted
227 0 : .map(|()| true)
228 0 : .fatal_err("Deleting layer during eviction")
229 : };
230 :
231 0 : if !deleted {
232 : // skip updating accounting and putting perhaps later timestamp
233 0 : return;
234 0 : }
235 0 :
236 0 : // Update the timeline's state. This does not have to be synchronized with
237 0 : // the download process, because:
238 0 : // - If downloader is racing with us to remove a file (e.g. because it is
239 0 : // removed from heatmap), then our mutual .remove() operations will both
240 0 : // succeed.
241 0 : // - If downloader is racing with us to download the object (this would require
242 0 : // multiple eviction iterations to race with multiple download iterations), then
243 0 : // if we remove it from the state, the worst that happens is the downloader
244 0 : // downloads it again before re-inserting, or we delete the file but it remains
245 0 : // in the state map (in which case it will be downloaded if this secondary
246 0 : // tenant transitions to attached and tries to access it)
247 0 : //
248 0 : // The important assumption here is that the secondary timeline state does not
249 0 : // have to 100% match what is on disk, because it's a best-effort warming
250 0 : // of the cache.
251 0 : let mut detail = this.detail.lock().unwrap();
252 0 : if let Some(timeline_detail) = detail.timelines.get_mut(&timeline_id) {
253 0 : timeline_detail.on_disk_layers.remove(&name);
254 0 : timeline_detail.evicted_at.insert(name, now);
255 0 : }
256 0 : })
257 : .await
258 : .expect("secondary eviction should not have panicked");
259 : }
260 : }
261 :
262 : /// The SecondaryController is a pseudo-rpc client for administrative control of secondary mode downloads,
263 : /// and heatmap uploads. This is not a hot data path: it's used for:
264 : /// - Live migrations, where we want to ensure a migration destination has the freshest possible
265 : /// content before trying to cut over.
266 : /// - Tests, where we want to immediately upload/download for a particular tenant.
267 : ///
268 : /// In normal operations, outside of migrations, uploads & downloads are autonomous and not driven by this interface.
269 : pub struct SecondaryController {
270 : upload_req_tx: tokio::sync::mpsc::Sender<CommandRequest<UploadCommand>>,
271 : download_req_tx: tokio::sync::mpsc::Sender<CommandRequest<DownloadCommand>>,
272 : }
273 :
274 : impl SecondaryController {
275 0 : async fn dispatch<T>(
276 0 : &self,
277 0 : queue: &tokio::sync::mpsc::Sender<CommandRequest<T>>,
278 0 : payload: T,
279 0 : ) -> anyhow::Result<()> {
280 0 : let (response_tx, response_rx) = tokio::sync::oneshot::channel();
281 0 :
282 0 : queue
283 0 : .send(CommandRequest {
284 0 : payload,
285 0 : response_tx,
286 0 : })
287 0 : .await
288 0 : .map_err(|_| anyhow::anyhow!("Receiver shut down"))?;
289 :
290 0 : let response = response_rx
291 0 : .await
292 0 : .map_err(|_| anyhow::anyhow!("Request dropped"))?;
293 :
294 0 : response.result
295 0 : }
296 :
297 0 : pub async fn upload_tenant(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> {
298 0 : self.dispatch(&self.upload_req_tx, UploadCommand::Upload(tenant_shard_id))
299 0 : .await
300 0 : }
301 0 : pub async fn download_tenant(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> {
302 0 : self.dispatch(
303 0 : &self.download_req_tx,
304 0 : DownloadCommand::Download(tenant_shard_id),
305 0 : )
306 0 : .await
307 0 : }
308 : }
309 :
310 0 : pub fn spawn_tasks(
311 0 : tenant_manager: Arc<TenantManager>,
312 0 : remote_storage: GenericRemoteStorage,
313 0 : background_jobs_can_start: Barrier,
314 0 : cancel: CancellationToken,
315 0 : ) -> SecondaryController {
316 0 : let mgr_clone = tenant_manager.clone();
317 0 : let storage_clone = remote_storage.clone();
318 0 : let cancel_clone = cancel.clone();
319 0 : let bg_jobs_clone = background_jobs_can_start.clone();
320 0 :
321 0 : let (download_req_tx, download_req_rx) =
322 0 : tokio::sync::mpsc::channel::<CommandRequest<DownloadCommand>>(16);
323 0 : let (upload_req_tx, upload_req_rx) =
324 0 : tokio::sync::mpsc::channel::<CommandRequest<UploadCommand>>(16);
325 0 :
326 0 : let downloader_task_ctx = RequestContext::new(
327 0 : TaskKind::SecondaryDownloads,
328 0 : crate::context::DownloadBehavior::Download,
329 0 : );
330 0 : task_mgr::spawn(
331 0 : BACKGROUND_RUNTIME.handle(),
332 0 : downloader_task_ctx.task_kind(),
333 0 : None,
334 0 : None,
335 0 : "secondary tenant downloads",
336 0 : false,
337 0 : async move {
338 0 : downloader_task(
339 0 : mgr_clone,
340 0 : storage_clone,
341 0 : download_req_rx,
342 0 : bg_jobs_clone,
343 0 : cancel_clone,
344 0 : downloader_task_ctx,
345 0 : )
346 0 : .await;
347 :
348 0 : Ok(())
349 0 : },
350 0 : );
351 0 :
352 0 : task_mgr::spawn(
353 0 : BACKGROUND_RUNTIME.handle(),
354 0 : TaskKind::SecondaryUploads,
355 0 : None,
356 0 : None,
357 0 : "heatmap uploads",
358 0 : false,
359 0 : async move {
360 0 : heatmap_uploader_task(
361 0 : tenant_manager,
362 0 : remote_storage,
363 0 : upload_req_rx,
364 0 : background_jobs_can_start,
365 0 : cancel,
366 0 : )
367 0 : .await;
368 :
369 0 : Ok(())
370 0 : },
371 0 : );
372 0 :
373 0 : SecondaryController {
374 0 : download_req_tx,
375 0 : upload_req_tx,
376 0 : }
377 0 : }
378 :
379 : /// For running with remote storage disabled: a SecondaryController that is connected to nothing.
380 0 : pub fn null_controller() -> SecondaryController {
381 0 : let (download_req_tx, _download_req_rx) =
382 0 : tokio::sync::mpsc::channel::<CommandRequest<DownloadCommand>>(16);
383 0 : let (upload_req_tx, _upload_req_rx) =
384 0 : tokio::sync::mpsc::channel::<CommandRequest<UploadCommand>>(16);
385 0 : SecondaryController {
386 0 : upload_req_tx,
387 0 : download_req_tx,
388 0 : }
389 0 : }
|