LCOV - code coverage report
Current view: top level - pageserver/src/tenant - secondary.rs (source / functions) Coverage Total Hit
Test: 691a4c28fe7169edd60b367c52d448a0a6605f1f.info Lines: 0.0 % 218 0
Test Date: 2024-05-10 13:18:37 Functions: 0.0 % 32 0

            Line data    Source code
       1              : mod downloader;
       2              : pub mod heatmap;
       3              : mod heatmap_uploader;
       4              : mod scheduler;
       5              : 
       6              : use std::{sync::Arc, time::SystemTime};
       7              : 
       8              : use crate::{
       9              :     config::PageServerConf,
      10              :     context::RequestContext,
      11              :     disk_usage_eviction_task::DiskUsageEvictionInfo,
      12              :     task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
      13              :     virtual_file::MaybeFatalIo,
      14              : };
      15              : 
      16              : use self::{
      17              :     downloader::{downloader_task, SecondaryDetail},
      18              :     heatmap_uploader::heatmap_uploader_task,
      19              : };
      20              : 
      21              : use super::{
      22              :     config::{SecondaryLocationConfig, TenantConfOpt},
      23              :     mgr::TenantManager,
      24              :     remote_timeline_client::LayerFileMetadata,
      25              :     span::debug_assert_current_span_has_tenant_id,
      26              :     storage_layer::{layer::local_layer_path, LayerName},
      27              : };
      28              : 
      29              : use pageserver_api::{
      30              :     models,
      31              :     shard::{ShardIdentity, TenantShardId},
      32              : };
      33              : use remote_storage::GenericRemoteStorage;
      34              : 
      35              : use tokio_util::sync::CancellationToken;
      36              : use tracing::instrument;
      37              : use utils::{completion::Barrier, id::TimelineId, sync::gate::Gate};
      38              : 
      39              : enum DownloadCommand {
      40              :     Download(TenantShardId),
      41              : }
      42              : enum UploadCommand {
      43              :     Upload(TenantShardId),
      44              : }
      45              : 
      46              : impl UploadCommand {
      47            0 :     fn get_tenant_shard_id(&self) -> &TenantShardId {
      48            0 :         match self {
      49            0 :             Self::Upload(id) => id,
      50            0 :         }
      51            0 :     }
      52              : }
      53              : 
      54              : impl DownloadCommand {
      55            0 :     fn get_tenant_shard_id(&self) -> &TenantShardId {
      56            0 :         match self {
      57            0 :             Self::Download(id) => id,
      58            0 :         }
      59            0 :     }
      60              : }
      61              : 
      62              : struct CommandRequest<T> {
      63              :     payload: T,
      64              :     response_tx: tokio::sync::oneshot::Sender<CommandResponse>,
      65              : }
      66              : 
      67              : struct CommandResponse {
      68              :     result: anyhow::Result<()>,
      69              : }
      70              : 
      71              : // Whereas [`Tenant`] represents an attached tenant, this type represents the work
      72              : // we do for secondary tenant locations: where we are not serving clients or
      73              : // ingesting WAL, but we are maintaining a warm cache of layer files.
      74              : //
      75              : // This type is all about the _download_ path for secondary mode.  The upload path
      76              : // runs separately (see [`heatmap_uploader`]) while a regular attached `Tenant` exists.
      77              : //
      78              : // This structure coordinates TenantManager and SecondaryDownloader,
      79              : // so that the downloader can indicate which tenants it is currently
      80              : // operating on, and the manager can indicate when a particular
      81              : // secondary tenant should cancel any work in flight.
      82              : #[derive(Debug)]
      83              : pub(crate) struct SecondaryTenant {
      84              :     /// Carrying a tenant shard ID simplifies callers such as the downloader
      85              :     /// which need to organize many of these objects by ID.
      86              :     tenant_shard_id: TenantShardId,
      87              : 
      88              :     /// Cancellation token indicates to SecondaryDownloader that it should stop doing
      89              :     /// any work for this tenant at the next opportunity.
      90              :     pub(crate) cancel: CancellationToken,
      91              : 
      92              :     pub(crate) gate: Gate,
      93              : 
      94              :     // Secondary mode does not need the full shard identity or the TenantConfOpt.  However,
      95              :     // storing these enables us to report our full LocationConf, enabling convenient reconciliation
      96              :     // by the control plane (see [`Self::get_location_conf`])
      97              :     shard_identity: ShardIdentity,
      98              :     tenant_conf: std::sync::Mutex<TenantConfOpt>,
      99              : 
     100              :     // Internal state used by the Downloader.
     101              :     detail: std::sync::Mutex<SecondaryDetail>,
     102              : 
     103              :     // Public state indicating overall progress of downloads relative to the last heatmap seen
     104              :     pub(crate) progress: std::sync::Mutex<models::SecondaryProgress>,
     105              : }
     106              : 
     107              : impl SecondaryTenant {
     108            0 :     pub(crate) fn new(
     109            0 :         tenant_shard_id: TenantShardId,
     110            0 :         shard_identity: ShardIdentity,
     111            0 :         tenant_conf: TenantConfOpt,
     112            0 :         config: &SecondaryLocationConfig,
     113            0 :     ) -> Arc<Self> {
     114            0 :         Arc::new(Self {
     115            0 :             tenant_shard_id,
     116            0 :             // todo: shall we make this a descendent of the
     117            0 :             // main cancellation token, or is it sufficient that
     118            0 :             // on shutdown we walk the tenants and fire their
     119            0 :             // individual cancellations?
     120            0 :             cancel: CancellationToken::new(),
     121            0 :             gate: Gate::default(),
     122            0 : 
     123            0 :             shard_identity,
     124            0 :             tenant_conf: std::sync::Mutex::new(tenant_conf),
     125            0 : 
     126            0 :             detail: std::sync::Mutex::new(SecondaryDetail::new(config.clone())),
     127            0 : 
     128            0 :             progress: std::sync::Mutex::default(),
     129            0 :         })
     130            0 :     }
     131              : 
     132            0 :     pub(crate) fn tenant_shard_id(&self) -> TenantShardId {
     133            0 :         self.tenant_shard_id
     134            0 :     }
     135              : 
     136            0 :     pub(crate) async fn shutdown(&self) {
     137            0 :         self.cancel.cancel();
     138            0 : 
     139            0 :         // Wait for any secondary downloader work to complete
     140            0 :         self.gate.close().await;
     141            0 :     }
     142              : 
     143            0 :     pub(crate) fn set_config(&self, config: &SecondaryLocationConfig) {
     144            0 :         self.detail.lock().unwrap().config = config.clone();
     145            0 :     }
     146              : 
     147            0 :     pub(crate) fn set_tenant_conf(&self, config: &TenantConfOpt) {
     148            0 :         *(self.tenant_conf.lock().unwrap()) = config.clone();
     149            0 :     }
     150              : 
     151              :     /// For API access: generate a LocationConfig equivalent to the one that would be used to
     152              :     /// create a Tenant in the same state.  Do not use this in hot paths: it's for relatively
     153              :     /// rare external API calls, like a reconciliation at startup.
     154            0 :     pub(crate) fn get_location_conf(&self) -> models::LocationConfig {
     155            0 :         let conf = self.detail.lock().unwrap().config.clone();
     156            0 : 
     157            0 :         let conf = models::LocationConfigSecondary { warm: conf.warm };
     158            0 : 
     159            0 :         let tenant_conf = self.tenant_conf.lock().unwrap().clone();
     160            0 :         models::LocationConfig {
     161            0 :             mode: models::LocationConfigMode::Secondary,
     162            0 :             generation: None,
     163            0 :             secondary_conf: Some(conf),
     164            0 :             shard_number: self.tenant_shard_id.shard_number.0,
     165            0 :             shard_count: self.tenant_shard_id.shard_count.literal(),
     166            0 :             shard_stripe_size: self.shard_identity.stripe_size.0,
     167            0 :             tenant_conf: tenant_conf.into(),
     168            0 :         }
     169            0 :     }
     170              : 
     171            0 :     pub(crate) fn get_tenant_shard_id(&self) -> &TenantShardId {
     172            0 :         &self.tenant_shard_id
     173            0 :     }
     174              : 
     175            0 :     pub(crate) fn get_layers_for_eviction(self: &Arc<Self>) -> (DiskUsageEvictionInfo, usize) {
     176            0 :         self.detail.lock().unwrap().get_layers_for_eviction(self)
     177            0 :     }
     178              : 
     179              :     /// Cancellation safe, but on cancellation the eviction will go through
     180            0 :     #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%timeline_id, name=%name))]
     181              :     pub(crate) async fn evict_layer(
     182              :         self: &Arc<Self>,
     183              :         conf: &PageServerConf,
     184              :         timeline_id: TimelineId,
     185              :         name: LayerName,
     186              :         metadata: LayerFileMetadata,
     187              :     ) {
     188              :         debug_assert_current_span_has_tenant_id();
     189              : 
     190              :         let guard = match self.gate.enter() {
     191              :             Ok(g) => g,
     192              :             Err(_) => {
     193              :                 tracing::debug!("Dropping layer evictions, secondary tenant shutting down",);
     194              :                 return;
     195              :             }
     196              :         };
     197              : 
     198              :         let now = SystemTime::now();
     199              : 
     200              :         let local_path = local_layer_path(
     201              :             conf,
     202              :             &self.tenant_shard_id,
     203              :             &timeline_id,
     204              :             &name,
     205              :             &metadata.generation,
     206              :         );
     207              : 
     208              :         let this = self.clone();
     209              : 
     210              :         // spawn it to be cancellation safe
     211            0 :         tokio::task::spawn_blocking(move || {
     212            0 :             let _guard = guard;
     213            0 :             // We tolerate ENOENT, because between planning eviction and executing
     214            0 :             // it, the secondary downloader could have seen an updated heatmap that
     215            0 :             // resulted in a layer being deleted.
     216            0 :             // Other local I/O errors are process-fatal: these should never happen.
     217            0 :             let deleted = std::fs::remove_file(local_path);
     218            0 : 
     219            0 :             let not_found = deleted
     220            0 :                 .as_ref()
     221            0 :                 .is_err_and(|x| x.kind() == std::io::ErrorKind::NotFound);
     222              : 
     223            0 :             let deleted = if not_found {
     224            0 :                 false
     225              :             } else {
     226            0 :                 deleted
     227            0 :                     .map(|()| true)
     228            0 :                     .fatal_err("Deleting layer during eviction")
     229              :             };
     230              : 
     231            0 :             if !deleted {
     232              :                 // skip updating accounting and putting perhaps later timestamp
     233            0 :                 return;
     234            0 :             }
     235            0 : 
     236            0 :             // Update the timeline's state.  This does not have to be synchronized with
     237            0 :             // the download process, because:
     238            0 :             // - If downloader is racing with us to remove a file (e.g. because it is
     239            0 :             //   removed from heatmap), then our mutual .remove() operations will both
     240            0 :             //   succeed.
     241            0 :             // - If downloader is racing with us to download the object (this would require
     242            0 :             //   multiple eviction iterations to race with multiple download iterations), then
     243            0 :             //   if we remove it from the state, the worst that happens is the downloader
     244            0 :             //   downloads it again before re-inserting, or we delete the file but it remains
     245            0 :             //   in the state map (in which case it will be downloaded if this secondary
     246            0 :             //   tenant transitions to attached and tries to access it)
     247            0 :             //
     248            0 :             // The important assumption here is that the secondary timeline state does not
     249            0 :             // have to 100% match what is on disk, because it's a best-effort warming
     250            0 :             // of the cache.
     251            0 :             let mut detail = this.detail.lock().unwrap();
     252            0 :             if let Some(timeline_detail) = detail.timelines.get_mut(&timeline_id) {
     253            0 :                 timeline_detail.on_disk_layers.remove(&name);
     254            0 :                 timeline_detail.evicted_at.insert(name, now);
     255            0 :             }
     256            0 :         })
     257              :         .await
     258              :         .expect("secondary eviction should not have panicked");
     259              :     }
     260              : }
     261              : 
     262              : /// The SecondaryController is a pseudo-rpc client for administrative control of secondary mode downloads,
     263              : /// and heatmap uploads.  This is not a hot data path: it's used for:
     264              : /// - Live migrations, where we want to ensure a migration destination has the freshest possible
     265              : ///   content before trying to cut over.
     266              : /// - Tests, where we want to immediately upload/download for a particular tenant.
     267              : ///
     268              : /// In normal operations, outside of migrations, uploads & downloads are autonomous and not driven by this interface.
     269              : pub struct SecondaryController {
     270              :     upload_req_tx: tokio::sync::mpsc::Sender<CommandRequest<UploadCommand>>,
     271              :     download_req_tx: tokio::sync::mpsc::Sender<CommandRequest<DownloadCommand>>,
     272              : }
     273              : 
     274              : impl SecondaryController {
     275            0 :     async fn dispatch<T>(
     276            0 :         &self,
     277            0 :         queue: &tokio::sync::mpsc::Sender<CommandRequest<T>>,
     278            0 :         payload: T,
     279            0 :     ) -> anyhow::Result<()> {
     280            0 :         let (response_tx, response_rx) = tokio::sync::oneshot::channel();
     281            0 : 
     282            0 :         queue
     283            0 :             .send(CommandRequest {
     284            0 :                 payload,
     285            0 :                 response_tx,
     286            0 :             })
     287            0 :             .await
     288            0 :             .map_err(|_| anyhow::anyhow!("Receiver shut down"))?;
     289              : 
     290            0 :         let response = response_rx
     291            0 :             .await
     292            0 :             .map_err(|_| anyhow::anyhow!("Request dropped"))?;
     293              : 
     294            0 :         response.result
     295            0 :     }
     296              : 
     297            0 :     pub async fn upload_tenant(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> {
     298            0 :         self.dispatch(&self.upload_req_tx, UploadCommand::Upload(tenant_shard_id))
     299            0 :             .await
     300            0 :     }
     301            0 :     pub async fn download_tenant(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> {
     302            0 :         self.dispatch(
     303            0 :             &self.download_req_tx,
     304            0 :             DownloadCommand::Download(tenant_shard_id),
     305            0 :         )
     306            0 :         .await
     307            0 :     }
     308              : }
     309              : 
     310            0 : pub fn spawn_tasks(
     311            0 :     tenant_manager: Arc<TenantManager>,
     312            0 :     remote_storage: GenericRemoteStorage,
     313            0 :     background_jobs_can_start: Barrier,
     314            0 :     cancel: CancellationToken,
     315            0 : ) -> SecondaryController {
     316            0 :     let mgr_clone = tenant_manager.clone();
     317            0 :     let storage_clone = remote_storage.clone();
     318            0 :     let cancel_clone = cancel.clone();
     319            0 :     let bg_jobs_clone = background_jobs_can_start.clone();
     320            0 : 
     321            0 :     let (download_req_tx, download_req_rx) =
     322            0 :         tokio::sync::mpsc::channel::<CommandRequest<DownloadCommand>>(16);
     323            0 :     let (upload_req_tx, upload_req_rx) =
     324            0 :         tokio::sync::mpsc::channel::<CommandRequest<UploadCommand>>(16);
     325            0 : 
     326            0 :     let downloader_task_ctx = RequestContext::new(
     327            0 :         TaskKind::SecondaryDownloads,
     328            0 :         crate::context::DownloadBehavior::Download,
     329            0 :     );
     330            0 :     task_mgr::spawn(
     331            0 :         BACKGROUND_RUNTIME.handle(),
     332            0 :         downloader_task_ctx.task_kind(),
     333            0 :         None,
     334            0 :         None,
     335            0 :         "secondary tenant downloads",
     336            0 :         false,
     337            0 :         async move {
     338            0 :             downloader_task(
     339            0 :                 mgr_clone,
     340            0 :                 storage_clone,
     341            0 :                 download_req_rx,
     342            0 :                 bg_jobs_clone,
     343            0 :                 cancel_clone,
     344            0 :                 downloader_task_ctx,
     345            0 :             )
     346            0 :             .await;
     347              : 
     348            0 :             Ok(())
     349            0 :         },
     350            0 :     );
     351            0 : 
     352            0 :     task_mgr::spawn(
     353            0 :         BACKGROUND_RUNTIME.handle(),
     354            0 :         TaskKind::SecondaryUploads,
     355            0 :         None,
     356            0 :         None,
     357            0 :         "heatmap uploads",
     358            0 :         false,
     359            0 :         async move {
     360            0 :             heatmap_uploader_task(
     361            0 :                 tenant_manager,
     362            0 :                 remote_storage,
     363            0 :                 upload_req_rx,
     364            0 :                 background_jobs_can_start,
     365            0 :                 cancel,
     366            0 :             )
     367            0 :             .await;
     368              : 
     369            0 :             Ok(())
     370            0 :         },
     371            0 :     );
     372            0 : 
     373            0 :     SecondaryController {
     374            0 :         download_req_tx,
     375            0 :         upload_req_tx,
     376            0 :     }
     377            0 : }
     378              : 
     379              : /// For running with remote storage disabled: a SecondaryController that is connected to nothing.
     380            0 : pub fn null_controller() -> SecondaryController {
     381            0 :     let (download_req_tx, _download_req_rx) =
     382            0 :         tokio::sync::mpsc::channel::<CommandRequest<DownloadCommand>>(16);
     383            0 :     let (upload_req_tx, _upload_req_rx) =
     384            0 :         tokio::sync::mpsc::channel::<CommandRequest<UploadCommand>>(16);
     385            0 :     SecondaryController {
     386            0 :         upload_req_tx,
     387            0 :         download_req_tx,
     388            0 :     }
     389            0 : }
        

Generated by: LCOV version 2.1-beta