LCOV - code coverage report
Current view: top level - pageserver/src/tenant - secondary.rs (source / functions) Coverage Total Hit
Test: 32f4a56327bc9da697706839ed4836b2a00a408f.info Lines: 82.2 % 169 139
Test Date: 2024-02-07 07:37:29 Functions: 66.7 % 30 20

            Line data    Source code
       1              : mod downloader;
       2              : pub mod heatmap;
       3              : mod heatmap_uploader;
       4              : mod scheduler;
       5              : 
       6              : use std::{sync::Arc, time::SystemTime};
       7              : 
       8              : use crate::{
       9              :     config::PageServerConf,
      10              :     disk_usage_eviction_task::DiskUsageEvictionInfo,
      11              :     task_mgr::{self, TaskKind, BACKGROUND_RUNTIME},
      12              :     virtual_file::MaybeFatalIo,
      13              : };
      14              : 
      15              : use self::{
      16              :     downloader::{downloader_task, SecondaryDetail},
      17              :     heatmap_uploader::heatmap_uploader_task,
      18              : };
      19              : 
      20              : use super::{
      21              :     config::{SecondaryLocationConfig, TenantConfOpt},
      22              :     mgr::TenantManager,
      23              :     span::debug_assert_current_span_has_tenant_id,
      24              :     storage_layer::LayerFileName,
      25              : };
      26              : 
      27              : use pageserver_api::{
      28              :     models,
      29              :     shard::{ShardIdentity, TenantShardId},
      30              : };
      31              : use remote_storage::GenericRemoteStorage;
      32              : 
      33              : use tokio_util::sync::CancellationToken;
      34              : use tracing::instrument;
      35              : use utils::{completion::Barrier, fs_ext, id::TimelineId, sync::gate::Gate};
      36              : 
      37              : enum DownloadCommand {
      38              :     Download(TenantShardId),
      39              : }
      40              : enum UploadCommand {
      41              :     Upload(TenantShardId),
      42              : }
      43              : 
      44              : impl UploadCommand {
      45            8 :     fn get_tenant_shard_id(&self) -> &TenantShardId {
      46            8 :         match self {
      47            8 :             Self::Upload(id) => id,
      48            8 :         }
      49            8 :     }
      50              : }
      51              : 
      52              : impl DownloadCommand {
      53            6 :     fn get_tenant_shard_id(&self) -> &TenantShardId {
      54            6 :         match self {
      55            6 :             Self::Download(id) => id,
      56            6 :         }
      57            6 :     }
      58              : }
      59              : 
      60              : struct CommandRequest<T> {
      61              :     payload: T,
      62              :     response_tx: tokio::sync::oneshot::Sender<CommandResponse>,
      63              : }
      64              : 
      65              : struct CommandResponse {
      66              :     result: anyhow::Result<()>,
      67              : }
      68              : 
      69              : // Whereas [`Tenant`] represents an attached tenant, this type represents the work
      70              : // we do for secondary tenant locations: where we are not serving clients or
      71              : // ingesting WAL, but we are maintaining a warm cache of layer files.
      72              : //
      73              : // This type is all about the _download_ path for secondary mode.  The upload path
      74              : // runs separately (see [`heatmap_uploader`]) while a regular attached `Tenant` exists.
      75              : //
      76              : // This structure coordinates TenantManager and SecondaryDownloader,
      77              : // so that the downloader can indicate which tenants it is currently
      78              : // operating on, and the manager can indicate when a particular
      79              : // secondary tenant should cancel any work in flight.
      80            0 : #[derive(Debug)]
      81              : pub(crate) struct SecondaryTenant {
      82              :     /// Carrying a tenant shard ID simplifies callers such as the downloader
      83              :     /// which need to organize many of these objects by ID.
      84              :     tenant_shard_id: TenantShardId,
      85              : 
      86              :     /// Cancellation token indicates to SecondaryDownloader that it should stop doing
      87              :     /// any work for this tenant at the next opportunity.
      88              :     pub(crate) cancel: CancellationToken,
      89              : 
      90              :     pub(crate) gate: Gate,
      91              : 
      92              :     // Secondary mode does not need the full shard identity or the TenantConfOpt.  However,
      93              :     // storing these enables us to report our full LocationConf, enabling convenient reconciliation
      94              :     // by the control plane (see [`Self::get_location_conf`])
      95              :     shard_identity: ShardIdentity,
      96              :     tenant_conf: std::sync::Mutex<TenantConfOpt>,
      97              : 
      98              :     detail: std::sync::Mutex<SecondaryDetail>,
      99              : }
     100              : 
     101              : impl SecondaryTenant {
     102           32 :     pub(crate) fn new(
     103           32 :         tenant_shard_id: TenantShardId,
     104           32 :         shard_identity: ShardIdentity,
     105           32 :         tenant_conf: TenantConfOpt,
     106           32 :         config: &SecondaryLocationConfig,
     107           32 :     ) -> Arc<Self> {
     108           32 :         Arc::new(Self {
     109           32 :             tenant_shard_id,
     110           32 :             // todo: shall we make this a descendent of the
     111           32 :             // main cancellation token, or is it sufficient that
     112           32 :             // on shutdown we walk the tenants and fire their
     113           32 :             // individual cancellations?
     114           32 :             cancel: CancellationToken::new(),
     115           32 :             gate: Gate::default(),
     116           32 : 
     117           32 :             shard_identity,
     118           32 :             tenant_conf: std::sync::Mutex::new(tenant_conf),
     119           32 : 
     120           32 :             detail: std::sync::Mutex::new(SecondaryDetail::new(config.clone())),
     121           32 :         })
     122           32 :     }
     123              : 
     124           22 :     pub(crate) async fn shutdown(&self) {
     125           22 :         self.cancel.cancel();
     126           22 : 
     127           22 :         // Wait for any secondary downloader work to complete
     128           22 :         self.gate.close().await;
     129           22 :     }
     130              : 
     131            6 :     pub(crate) fn set_config(&self, config: &SecondaryLocationConfig) {
     132            6 :         self.detail.lock().unwrap().config = config.clone();
     133            6 :     }
     134              : 
     135            6 :     pub(crate) fn set_tenant_conf(&self, config: &TenantConfOpt) {
     136            6 :         *(self.tenant_conf.lock().unwrap()) = *config;
     137            6 :     }
     138              : 
     139              :     /// For API access: generate a LocationConfig equivalent to the one that would be used to
     140              :     /// create a Tenant in the same state.  Do not use this in hot paths: it's for relatively
     141              :     /// rare external API calls, like a reconciliation at startup.
     142            0 :     pub(crate) fn get_location_conf(&self) -> models::LocationConfig {
     143            0 :         let conf = self.detail.lock().unwrap().config.clone();
     144            0 : 
     145            0 :         let conf = models::LocationConfigSecondary { warm: conf.warm };
     146            0 : 
     147            0 :         let tenant_conf = *self.tenant_conf.lock().unwrap();
     148            0 :         models::LocationConfig {
     149            0 :             mode: models::LocationConfigMode::Secondary,
     150            0 :             generation: None,
     151            0 :             secondary_conf: Some(conf),
     152            0 :             shard_number: self.tenant_shard_id.shard_number.0,
     153            0 :             shard_count: self.tenant_shard_id.shard_count.0,
     154            0 :             shard_stripe_size: self.shard_identity.stripe_size.0,
     155            0 :             tenant_conf: tenant_conf.into(),
     156            0 :         }
     157            0 :     }
     158              : 
     159           98 :     pub(crate) fn get_tenant_shard_id(&self) -> &TenantShardId {
     160           98 :         &self.tenant_shard_id
     161           98 :     }
     162              : 
     163            2 :     pub(crate) fn get_layers_for_eviction(self: &Arc<Self>) -> DiskUsageEvictionInfo {
     164            2 :         self.detail.lock().unwrap().get_layers_for_eviction(self)
     165            2 :     }
     166              : 
     167            0 :     #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%timeline_id, name=%name))]
     168              :     pub(crate) async fn evict_layer(
     169              :         &self,
     170              :         conf: &PageServerConf,
     171              :         timeline_id: TimelineId,
     172              :         name: LayerFileName,
     173              :     ) {
     174              :         debug_assert_current_span_has_tenant_id();
     175              : 
     176              :         let _guard = match self.gate.enter() {
     177              :             Ok(g) => g,
     178              :             Err(_) => {
     179            0 :                 tracing::debug!("Dropping layer evictions, secondary tenant shutting down",);
     180              :                 return;
     181              :             }
     182              :         };
     183              : 
     184              :         let now = SystemTime::now();
     185              : 
     186              :         let path = conf
     187              :             .timeline_path(&self.tenant_shard_id, &timeline_id)
     188              :             .join(name.file_name());
     189              : 
     190              :         // We tolerate ENOENT, because between planning eviction and executing
     191              :         // it, the secondary downloader could have seen an updated heatmap that
     192              :         // resulted in a layer being deleted.
     193              :         // Other local I/O errors are process-fatal: these should never happen.
     194              :         tokio::fs::remove_file(path)
     195              :             .await
     196              :             .or_else(fs_ext::ignore_not_found)
     197              :             .fatal_err("Deleting layer during eviction");
     198              : 
     199              :         // Update the timeline's state.  This does not have to be synchronized with
     200              :         // the download process, because:
     201              :         // - If downloader is racing with us to remove a file (e.g. because it is
     202              :         //   removed from heatmap), then our mutual .remove() operations will both
     203              :         //   succeed.
     204              :         // - If downloader is racing with us to download the object (this would require
     205              :         //   multiple eviction iterations to race with multiple download iterations), then
     206              :         //   if we remove it from the state, the worst that happens is the downloader
     207              :         //   downloads it again before re-inserting, or we delete the file but it remains
     208              :         //   in the state map (in which case it will be downloaded if this secondary
     209              :         //   tenant transitions to attached and tries to access it)
     210              :         //
     211              :         // The important assumption here is that the secondary timeline state does not
     212              :         // have to 100% match what is on disk, because it's a best-effort warming
     213              :         // of the cache.
     214              :         let mut detail = self.detail.lock().unwrap();
     215              :         if let Some(timeline_detail) = detail.timelines.get_mut(&timeline_id) {
     216              :             timeline_detail.on_disk_layers.remove(&name);
     217              :             timeline_detail.evicted_at.insert(name, now);
     218              :         }
     219              :     }
     220              : }
     221              : 
     222              : /// The SecondaryController is a pseudo-rpc client for administrative control of secondary mode downloads,
     223              : /// and heatmap uploads.  This is not a hot data path: it's primarily a hook for tests,
     224              : /// where we want to immediately upload/download for a particular tenant.  In normal operation
     225              : /// uploads & downloads are autonomous and not driven by this interface.
     226              : pub struct SecondaryController {
     227              :     upload_req_tx: tokio::sync::mpsc::Sender<CommandRequest<UploadCommand>>,
     228              :     download_req_tx: tokio::sync::mpsc::Sender<CommandRequest<DownloadCommand>>,
     229              : }
     230              : 
     231              : impl SecondaryController {
     232           14 :     async fn dispatch<T>(
     233           14 :         &self,
     234           14 :         queue: &tokio::sync::mpsc::Sender<CommandRequest<T>>,
     235           14 :         payload: T,
     236           14 :     ) -> anyhow::Result<()> {
     237           14 :         let (response_tx, response_rx) = tokio::sync::oneshot::channel();
     238           14 : 
     239           14 :         queue
     240           14 :             .send(CommandRequest {
     241           14 :                 payload,
     242           14 :                 response_tx,
     243           14 :             })
     244            0 :             .await
     245           14 :             .map_err(|_| anyhow::anyhow!("Receiver shut down"))?;
     246              : 
     247           14 :         let response = response_rx
     248           14 :             .await
     249           14 :             .map_err(|_| anyhow::anyhow!("Request dropped"))?;
     250              : 
     251           14 :         response.result
     252           14 :     }
     253              : 
     254            8 :     pub async fn upload_tenant(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> {
     255            8 :         self.dispatch(&self.upload_req_tx, UploadCommand::Upload(tenant_shard_id))
     256            8 :             .await
     257            8 :     }
     258            6 :     pub async fn download_tenant(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> {
     259            6 :         self.dispatch(
     260            6 :             &self.download_req_tx,
     261            6 :             DownloadCommand::Download(tenant_shard_id),
     262            6 :         )
     263            6 :         .await
     264            6 :     }
     265              : }
     266              : 
     267          604 : pub fn spawn_tasks(
     268          604 :     tenant_manager: Arc<TenantManager>,
     269          604 :     remote_storage: GenericRemoteStorage,
     270          604 :     background_jobs_can_start: Barrier,
     271          604 :     cancel: CancellationToken,
     272          604 : ) -> SecondaryController {
     273          604 :     let mgr_clone = tenant_manager.clone();
     274          604 :     let storage_clone = remote_storage.clone();
     275          604 :     let cancel_clone = cancel.clone();
     276          604 :     let bg_jobs_clone = background_jobs_can_start.clone();
     277          604 : 
     278          604 :     let (download_req_tx, download_req_rx) =
     279          604 :         tokio::sync::mpsc::channel::<CommandRequest<DownloadCommand>>(16);
     280          604 :     let (upload_req_tx, upload_req_rx) =
     281          604 :         tokio::sync::mpsc::channel::<CommandRequest<UploadCommand>>(16);
     282          604 : 
     283          604 :     task_mgr::spawn(
     284          604 :         BACKGROUND_RUNTIME.handle(),
     285          604 :         TaskKind::SecondaryDownloads,
     286          604 :         None,
     287          604 :         None,
     288          604 :         "secondary tenant downloads",
     289          604 :         false,
     290          604 :         async move {
     291          604 :             downloader_task(
     292          604 :                 mgr_clone,
     293          604 :                 storage_clone,
     294          604 :                 download_req_rx,
     295          604 :                 bg_jobs_clone,
     296          604 :                 cancel_clone,
     297          604 :             )
     298          946 :             .await;
     299              : 
     300          172 :             Ok(())
     301          604 :         },
     302          604 :     );
     303          604 : 
     304          604 :     task_mgr::spawn(
     305          604 :         BACKGROUND_RUNTIME.handle(),
     306          604 :         TaskKind::SecondaryUploads,
     307          604 :         None,
     308          604 :         None,
     309          604 :         "heatmap uploads",
     310          604 :         false,
     311          604 :         async move {
     312          604 :             heatmap_uploader_task(
     313          604 :                 tenant_manager,
     314          604 :                 remote_storage,
     315          604 :                 upload_req_rx,
     316          604 :                 background_jobs_can_start,
     317          604 :                 cancel,
     318          604 :             )
     319          946 :             .await;
     320              : 
     321          172 :             Ok(())
     322          604 :         },
     323          604 :     );
     324          604 : 
     325          604 :     SecondaryController {
     326          604 :         download_req_tx,
     327          604 :         upload_req_tx,
     328          604 :     }
     329          604 : }
     330              : 
     331              : /// For running with remote storage disabled: a SecondaryController that is connected to nothing.
     332            0 : pub fn null_controller() -> SecondaryController {
     333            0 :     let (download_req_tx, _download_req_rx) =
     334            0 :         tokio::sync::mpsc::channel::<CommandRequest<DownloadCommand>>(16);
     335            0 :     let (upload_req_tx, _upload_req_rx) =
     336            0 :         tokio::sync::mpsc::channel::<CommandRequest<UploadCommand>>(16);
     337            0 :     SecondaryController {
     338            0 :         upload_req_tx,
     339            0 :         download_req_tx,
     340            0 :     }
     341            0 : }
        

Generated by: LCOV version 2.1-beta