|             Line data    Source code 
       1              : mod downloader;
       2              : pub mod heatmap;
       3              : mod heatmap_uploader;
       4              : mod scheduler;
       5              : 
       6              : use std::sync::Arc;
       7              : use std::time::SystemTime;
       8              : 
       9              : use metrics::UIntGauge;
      10              : use pageserver_api::models;
      11              : use pageserver_api::shard::{ShardIdentity, TenantShardId};
      12              : use remote_storage::GenericRemoteStorage;
      13              : use tokio::task::JoinHandle;
      14              : use tokio_util::sync::CancellationToken;
      15              : use tracing::instrument;
      16              : use utils::completion::Barrier;
      17              : use utils::id::TimelineId;
      18              : use utils::sync::gate::Gate;
      19              : 
      20              : use self::downloader::{SecondaryDetail, downloader_task};
      21              : use self::heatmap_uploader::heatmap_uploader_task;
      22              : use super::GetTenantError;
      23              : use super::config::SecondaryLocationConfig;
      24              : use super::mgr::TenantManager;
      25              : use super::span::debug_assert_current_span_has_tenant_id;
      26              : use super::storage_layer::LayerName;
      27              : use crate::context::RequestContext;
      28              : use crate::disk_usage_eviction_task::DiskUsageEvictionInfo;
      29              : use crate::metrics::{SECONDARY_HEATMAP_TOTAL_SIZE, SECONDARY_RESIDENT_PHYSICAL_SIZE};
      30              : use crate::task_mgr::{self, BACKGROUND_RUNTIME, TaskKind};
      31              : 
      32              : enum DownloadCommand {
      33              :     Download(TenantShardId),
      34              : }
      35              : enum UploadCommand {
      36              :     Upload(TenantShardId),
      37              : }
      38              : 
      39              : impl UploadCommand {
      40            0 :     fn get_tenant_shard_id(&self) -> &TenantShardId {
      41            0 :         match self {
      42            0 :             Self::Upload(id) => id,
      43              :         }
      44            0 :     }
      45              : }
      46              : 
      47              : impl DownloadCommand {
      48            0 :     fn get_tenant_shard_id(&self) -> &TenantShardId {
      49            0 :         match self {
      50            0 :             Self::Download(id) => id,
      51              :         }
      52            0 :     }
      53              : }
      54              : 
      55              : struct CommandRequest<T> {
      56              :     payload: T,
      57              :     response_tx: tokio::sync::oneshot::Sender<CommandResponse>,
      58              : }
      59              : 
      60              : struct CommandResponse {
      61              :     result: Result<(), SecondaryTenantError>,
      62              : }
      63              : 
      64              : #[derive(thiserror::Error, Debug)]
      65              : pub(crate) enum SecondaryTenantError {
      66              :     #[error("{0}")]
      67              :     GetTenant(GetTenantError),
      68              :     #[error("shutting down")]
      69              :     ShuttingDown,
      70              : }
      71              : 
      72              : impl From<GetTenantError> for SecondaryTenantError {
      73            0 :     fn from(gte: GetTenantError) -> Self {
      74            0 :         Self::GetTenant(gte)
      75            0 :     }
      76              : }
      77              : 
      78              : // Whereas [`Tenant`] represents an attached tenant, this type represents the work
      79              : // we do for secondary tenant locations: where we are not serving clients or
      80              : // ingesting WAL, but we are maintaining a warm cache of layer files.
      81              : //
      82              : // This type is all about the _download_ path for secondary mode.  The upload path
      83              : // runs separately (see [`heatmap_uploader`]) while a regular attached `Tenant` exists.
      84              : //
      85              : // This structure coordinates TenantManager and SecondaryDownloader,
      86              : // so that the downloader can indicate which tenants it is currently
      87              : // operating on, and the manager can indicate when a particular
      88              : // secondary tenant should cancel any work in flight.
      89              : #[derive(Debug)]
      90              : pub(crate) struct SecondaryTenant {
      91              :     /// Carrying a tenant shard ID simplifies callers such as the downloader
      92              :     /// which need to organize many of these objects by ID.
      93              :     tenant_shard_id: TenantShardId,
      94              : 
      95              :     /// Cancellation token indicates to SecondaryDownloader that it should stop doing
      96              :     /// any work for this tenant at the next opportunity.
      97              :     pub(crate) cancel: CancellationToken,
      98              : 
      99              :     pub(crate) gate: Gate,
     100              : 
     101              :     // Secondary mode does not need the full shard identity or the pageserver_api::models::TenantConfig.  However,
     102              :     // storing these enables us to report our full LocationConf, enabling convenient reconciliation
     103              :     // by the control plane (see [`Self::get_location_conf`])
     104              :     pub(crate) shard_identity: ShardIdentity,
     105              :     tenant_conf: std::sync::Mutex<pageserver_api::models::TenantConfig>,
     106              : 
     107              :     // Internal state used by the Downloader.
     108              :     detail: std::sync::Mutex<SecondaryDetail>,
     109              : 
     110              :     // Public state indicating overall progress of downloads relative to the last heatmap seen
     111              :     pub(crate) progress: std::sync::Mutex<models::SecondaryProgress>,
     112              : 
     113              :     // Sum of layer sizes on local disk
     114              :     pub(super) resident_size_metric: UIntGauge,
     115              : 
     116              :     // Sum of layer sizes in the most recently downloaded heatmap
     117              :     pub(super) heatmap_total_size_metric: UIntGauge,
     118              : }
     119              : 
     120              : impl SecondaryTenant {
     121            0 :     pub(crate) fn new(
     122            0 :         tenant_shard_id: TenantShardId,
     123            0 :         shard_identity: ShardIdentity,
     124            0 :         tenant_conf: pageserver_api::models::TenantConfig,
     125            0 :         config: &SecondaryLocationConfig,
     126            0 :     ) -> Arc<Self> {
     127            0 :         let tenant_id = tenant_shard_id.tenant_id.to_string();
     128            0 :         let shard_id = format!("{}", tenant_shard_id.shard_slug());
     129            0 :         let resident_size_metric = SECONDARY_RESIDENT_PHYSICAL_SIZE
     130            0 :             .get_metric_with_label_values(&[&tenant_id, &shard_id])
     131            0 :             .unwrap();
     132              : 
     133            0 :         let heatmap_total_size_metric = SECONDARY_HEATMAP_TOTAL_SIZE
     134            0 :             .get_metric_with_label_values(&[&tenant_id, &shard_id])
     135            0 :             .unwrap();
     136              : 
     137            0 :         Arc::new(Self {
     138            0 :             tenant_shard_id,
     139            0 :             // todo: shall we make this a descendent of the
     140            0 :             // main cancellation token, or is it sufficient that
     141            0 :             // on shutdown we walk the tenants and fire their
     142            0 :             // individual cancellations?
     143            0 :             cancel: CancellationToken::new(),
     144            0 :             gate: Gate::default(),
     145            0 : 
     146            0 :             shard_identity,
     147            0 :             tenant_conf: std::sync::Mutex::new(tenant_conf),
     148            0 : 
     149            0 :             detail: std::sync::Mutex::new(SecondaryDetail::new(config.clone())),
     150            0 : 
     151            0 :             progress: std::sync::Mutex::default(),
     152            0 : 
     153            0 :             resident_size_metric,
     154            0 :             heatmap_total_size_metric,
     155            0 :         })
     156            0 :     }
     157              : 
     158            0 :     pub(crate) fn tenant_shard_id(&self) -> TenantShardId {
     159            0 :         self.tenant_shard_id
     160            0 :     }
     161              : 
     162            0 :     pub(crate) async fn shutdown(&self) {
     163            0 :         self.cancel.cancel();
     164              : 
     165              :         // Wait for any secondary downloader work to complete
     166            0 :         self.gate.close().await;
     167              : 
     168            0 :         self.validate_metrics();
     169              : 
     170              :         // Metrics are subtracted from and/or removed eagerly.
     171              :         // Deletions are done in the background via [`BackgroundPurges::spawn`].
     172            0 :         let tenant_id = self.tenant_shard_id.tenant_id.to_string();
     173            0 :         let shard_id = format!("{}", self.tenant_shard_id.shard_slug());
     174            0 :         let _ = SECONDARY_RESIDENT_PHYSICAL_SIZE.remove_label_values(&[&tenant_id, &shard_id]);
     175            0 :         let _ = SECONDARY_HEATMAP_TOTAL_SIZE.remove_label_values(&[&tenant_id, &shard_id]);
     176              : 
     177            0 :         self.detail
     178            0 :             .lock()
     179            0 :             .unwrap()
     180            0 :             .drain_timelines(&self.tenant_shard_id, &self.resident_size_metric);
     181            0 :     }
     182              : 
     183            0 :     pub(crate) fn set_config(&self, config: &SecondaryLocationConfig) {
     184            0 :         self.detail.lock().unwrap().config = config.clone();
     185            0 :     }
     186              : 
     187            0 :     pub(crate) fn set_tenant_conf(&self, config: &pageserver_api::models::TenantConfig) {
     188            0 :         *(self.tenant_conf.lock().unwrap()) = config.clone();
     189            0 :     }
     190              : 
     191              :     /// For API access: generate a LocationConfig equivalent to the one that would be used to
     192              :     /// create a Tenant in the same state.  Do not use this in hot paths: it's for relatively
     193              :     /// rare external API calls, like a reconciliation at startup.
     194            0 :     pub(crate) fn get_location_conf(&self) -> models::LocationConfig {
     195            0 :         let conf = self.detail.lock().unwrap().config.clone();
     196              : 
     197            0 :         let conf = models::LocationConfigSecondary { warm: conf.warm };
     198              : 
     199            0 :         let tenant_conf = self.tenant_conf.lock().unwrap().clone();
     200            0 :         models::LocationConfig {
     201            0 :             mode: models::LocationConfigMode::Secondary,
     202            0 :             generation: None,
     203            0 :             secondary_conf: Some(conf),
     204            0 :             shard_number: self.tenant_shard_id.shard_number.0,
     205            0 :             shard_count: self.tenant_shard_id.shard_count.literal(),
     206            0 :             shard_stripe_size: self.shard_identity.stripe_size.0,
     207            0 :             tenant_conf,
     208            0 :         }
     209            0 :     }
     210              : 
     211            0 :     pub(crate) fn get_tenant_shard_id(&self) -> &TenantShardId {
     212            0 :         &self.tenant_shard_id
     213            0 :     }
     214              : 
     215            0 :     pub(crate) fn get_layers_for_eviction(self: &Arc<Self>) -> (DiskUsageEvictionInfo, usize) {
     216            0 :         self.detail.lock().unwrap().get_layers_for_eviction(self)
     217            0 :     }
     218              : 
     219              :     /// Cancellation safe, but on cancellation the eviction will go through
     220              :     #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%timeline_id, name=%name))]
     221              :     pub(crate) async fn evict_layer(self: &Arc<Self>, timeline_id: TimelineId, name: LayerName) {
     222              :         debug_assert_current_span_has_tenant_id();
     223              : 
     224              :         let guard = match self.gate.enter() {
     225              :             Ok(g) => g,
     226              :             Err(_) => {
     227              :                 tracing::debug!("Dropping layer evictions, secondary tenant shutting down",);
     228              :                 return;
     229              :             }
     230              :         };
     231              : 
     232              :         let now = SystemTime::now();
     233              :         tracing::info!("Evicting secondary layer");
     234              : 
     235              :         let this = self.clone();
     236              : 
     237              :         // spawn it to be cancellation safe
     238            0 :         tokio::task::spawn_blocking(move || {
     239            0 :             let _guard = guard;
     240              : 
     241              :             // Update the timeline's state.  This does not have to be synchronized with
     242              :             // the download process, because:
     243              :             // - If downloader is racing with us to remove a file (e.g. because it is
     244              :             //   removed from heatmap), then our mutual .remove() operations will both
     245              :             //   succeed.
     246              :             // - If downloader is racing with us to download the object (this would require
     247              :             //   multiple eviction iterations to race with multiple download iterations), then
     248              :             //   if we remove it from the state, the worst that happens is the downloader
     249              :             //   downloads it again before re-inserting, or we delete the file but it remains
     250              :             //   in the state map (in which case it will be downloaded if this secondary
     251              :             //   tenant transitions to attached and tries to access it)
     252              :             //
     253              :             // The important assumption here is that the secondary timeline state does not
     254              :             // have to 100% match what is on disk, because it's a best-effort warming
     255              :             // of the cache.
     256            0 :             let mut detail = this.detail.lock().unwrap();
     257            0 :             if let Some(removed) =
     258            0 :                 detail.evict_layer(name, &timeline_id, now, &this.resident_size_metric)
     259            0 :             {
     260            0 :                 // We might race with removal of the same layer during downloads, so finding the layer we
     261            0 :                 // were trying to remove is optional.  Only issue the disk I/O to remove it if we found it.
     262            0 :                 removed.remove_blocking();
     263            0 :             }
     264            0 :         })
     265              :         .await
     266              :         .expect("secondary eviction should not have panicked");
     267              :     }
     268              : 
     269              :     /// Exhaustive check that incrementally updated metrics match the actual state.
     270              :     #[cfg(feature = "testing")]
     271            0 :     fn validate_metrics(&self) {
     272            0 :         let detail = self.detail.lock().unwrap();
     273            0 :         let resident_size = detail.total_resident_size();
     274              : 
     275            0 :         assert_eq!(resident_size, self.resident_size_metric.get());
     276            0 :     }
     277              : 
     278              :     #[cfg(not(feature = "testing"))]
     279              :     fn validate_metrics(&self) {
     280              :         // No-op in non-testing builds
     281              :     }
     282              : }
     283              : 
     284              : /// The SecondaryController is a pseudo-rpc client for administrative control of secondary mode downloads,
     285              : /// and heatmap uploads.  This is not a hot data path: it's used for:
     286              : /// - Live migrations, where we want to ensure a migration destination has the freshest possible
     287              : ///   content before trying to cut over.
     288              : /// - Tests, where we want to immediately upload/download for a particular tenant.
     289              : ///
     290              : /// In normal operations, outside of migrations, uploads & downloads are autonomous and not driven by this interface.
     291              : pub struct SecondaryController {
     292              :     upload_req_tx: tokio::sync::mpsc::Sender<CommandRequest<UploadCommand>>,
     293              :     download_req_tx: tokio::sync::mpsc::Sender<CommandRequest<DownloadCommand>>,
     294              : }
     295              : 
     296              : impl SecondaryController {
     297            0 :     async fn dispatch<T>(
     298            0 :         &self,
     299            0 :         queue: &tokio::sync::mpsc::Sender<CommandRequest<T>>,
     300            0 :         payload: T,
     301            0 :     ) -> Result<(), SecondaryTenantError> {
     302            0 :         let (response_tx, response_rx) = tokio::sync::oneshot::channel();
     303              : 
     304            0 :         queue
     305            0 :             .send(CommandRequest {
     306            0 :                 payload,
     307            0 :                 response_tx,
     308            0 :             })
     309            0 :             .await
     310            0 :             .map_err(|_| SecondaryTenantError::ShuttingDown)?;
     311              : 
     312            0 :         let response = response_rx
     313            0 :             .await
     314            0 :             .map_err(|_| SecondaryTenantError::ShuttingDown)?;
     315              : 
     316            0 :         response.result
     317            0 :     }
     318              : 
     319            0 :     pub(crate) async fn upload_tenant(
     320            0 :         &self,
     321            0 :         tenant_shard_id: TenantShardId,
     322            0 :     ) -> Result<(), SecondaryTenantError> {
     323            0 :         self.dispatch(&self.upload_req_tx, UploadCommand::Upload(tenant_shard_id))
     324            0 :             .await
     325            0 :     }
     326            0 :     pub(crate) async fn download_tenant(
     327            0 :         &self,
     328            0 :         tenant_shard_id: TenantShardId,
     329            0 :     ) -> Result<(), SecondaryTenantError> {
     330            0 :         self.dispatch(
     331            0 :             &self.download_req_tx,
     332            0 :             DownloadCommand::Download(tenant_shard_id),
     333            0 :         )
     334            0 :         .await
     335            0 :     }
     336              : }
     337              : 
     338              : pub struct GlobalTasks {
     339              :     cancel: CancellationToken,
     340              :     uploader: JoinHandle<()>,
     341              :     downloader: JoinHandle<()>,
     342              : }
     343              : 
     344              : impl GlobalTasks {
     345              :     /// Caller is responsible for requesting shutdown via the cancellation token that was
     346              :     /// passed to [`spawn_tasks`].
     347              :     ///
     348              :     /// # Panics
     349              :     ///
     350              :     /// This method panics if that token is not cancelled.
     351              :     /// This is low-risk because we're calling this during process shutdown, so, a panic
     352              :     /// will be informative but not cause undue downtime.
     353            0 :     pub async fn wait(self) {
     354              :         let Self {
     355            0 :             cancel,
     356            0 :             uploader,
     357            0 :             downloader,
     358            0 :         } = self;
     359            0 :         assert!(
     360            0 :             cancel.is_cancelled(),
     361            0 :             "must cancel cancellation token, otherwise the tasks will not shut down"
     362              :         );
     363              : 
     364            0 :         let (uploader, downloader) = futures::future::join(uploader, downloader).await;
     365            0 :         uploader.expect(
     366            0 :             "unreachable: exit_on_panic_or_error would catch the panic and exit the process",
     367              :         );
     368            0 :         downloader.expect(
     369            0 :             "unreachable: exit_on_panic_or_error would catch the panic and exit the process",
     370              :         );
     371            0 :     }
     372              : }
     373              : 
     374            0 : pub fn spawn_tasks(
     375            0 :     tenant_manager: Arc<TenantManager>,
     376            0 :     remote_storage: GenericRemoteStorage,
     377            0 :     background_jobs_can_start: Barrier,
     378            0 :     cancel: CancellationToken,
     379            0 : ) -> (SecondaryController, GlobalTasks) {
     380            0 :     let mgr_clone = tenant_manager.clone();
     381            0 :     let storage_clone = remote_storage.clone();
     382            0 :     let bg_jobs_clone = background_jobs_can_start.clone();
     383              : 
     384            0 :     let (download_req_tx, download_req_rx) =
     385            0 :         tokio::sync::mpsc::channel::<CommandRequest<DownloadCommand>>(16);
     386            0 :     let (upload_req_tx, upload_req_rx) =
     387            0 :         tokio::sync::mpsc::channel::<CommandRequest<UploadCommand>>(16);
     388              : 
     389            0 :     let cancel_clone = cancel.clone();
     390            0 :     let downloader = BACKGROUND_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
     391              :         "secondary tenant downloads",
     392            0 :         async move {
     393            0 :             downloader_task(
     394            0 :                 mgr_clone,
     395            0 :                 storage_clone,
     396            0 :                 download_req_rx,
     397            0 :                 bg_jobs_clone,
     398            0 :                 cancel_clone,
     399            0 :                 RequestContext::new(
     400            0 :                     TaskKind::SecondaryDownloads,
     401            0 :                     crate::context::DownloadBehavior::Download,
     402            0 :                 ),
     403            0 :             )
     404            0 :             .await;
     405            0 :             anyhow::Ok(())
     406            0 :         },
     407              :     ));
     408              : 
     409            0 :     let cancel_clone = cancel.clone();
     410            0 :     let uploader = BACKGROUND_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
     411              :         "heatmap uploads",
     412            0 :         async move {
     413            0 :             heatmap_uploader_task(
     414            0 :                 tenant_manager,
     415            0 :                 remote_storage,
     416            0 :                 upload_req_rx,
     417            0 :                 background_jobs_can_start,
     418            0 :                 cancel_clone,
     419            0 :             )
     420            0 :             .await;
     421            0 :             anyhow::Ok(())
     422            0 :         },
     423              :     ));
     424              : 
     425            0 :     (
     426            0 :         SecondaryController {
     427            0 :             upload_req_tx,
     428            0 :             download_req_tx,
     429            0 :         },
     430            0 :         GlobalTasks {
     431            0 :             cancel,
     432            0 :             uploader,
     433            0 :             downloader,
     434            0 :         },
     435            0 :     )
     436            0 : }
         |