LCOV - differential code coverage report
Current view: top level - pageserver/src/tenant - secondary.rs (source / functions) Coverage Total Hit UBC CBC
Current: cd44433dd675caa99df17a61b18949c8387e2242.info Lines: 91.4 % 140 128 12 128
Current Date: 2024-01-09 02:06:09 Functions: 75.0 % 24 18 6 18
Baseline: 66c52a629a0f4a503e193045e0df4c77139e344b.info
Baseline Date: 2024-01-08 15:34:46

           TLA  Line data    Source code
       1                 : mod downloader;
       2                 : pub mod heatmap;
       3                 : mod heatmap_uploader;
       4                 : mod scheduler;
       5                 : 
       6                 : use std::sync::Arc;
       7                 : 
       8                 : use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
       9                 : 
      10                 : use self::{
      11                 :     downloader::{downloader_task, SecondaryDetail},
      12                 :     heatmap_uploader::heatmap_uploader_task,
      13                 : };
      14                 : 
      15                 : use super::{config::SecondaryLocationConfig, mgr::TenantManager};
      16                 : 
      17                 : use pageserver_api::shard::TenantShardId;
      18                 : use remote_storage::GenericRemoteStorage;
      19                 : 
      20                 : use tokio_util::sync::CancellationToken;
      21                 : use utils::{completion::Barrier, sync::gate::Gate};
      22                 : 
      23                 : enum DownloadCommand {
      24                 :     Download(TenantShardId),
      25                 : }
      26                 : enum UploadCommand {
      27                 :     Upload(TenantShardId),
      28                 : }
      29                 : 
      30                 : impl UploadCommand {
      31 CBC           6 :     fn get_tenant_shard_id(&self) -> &TenantShardId {
      32               6 :         match self {
      33               6 :             Self::Upload(id) => id,
      34               6 :         }
      35               6 :     }
      36                 : }
      37                 : 
      38                 : impl DownloadCommand {
      39               4 :     fn get_tenant_shard_id(&self) -> &TenantShardId {
      40               4 :         match self {
      41               4 :             Self::Download(id) => id,
      42               4 :         }
      43               4 :     }
      44                 : }
      45                 : 
      46                 : struct CommandRequest<T> {
      47                 :     payload: T,
      48                 :     response_tx: tokio::sync::oneshot::Sender<CommandResponse>,
      49                 : }
      50                 : 
      51                 : struct CommandResponse {
      52                 :     result: anyhow::Result<()>,
      53                 : }
      54                 : 
      55                 : // Whereas [`Tenant`] represents an attached tenant, this type represents the work
      56                 : // we do for secondary tenant locations: where we are not serving clients or
      57                 : // ingesting WAL, but we are maintaining a warm cache of layer files.
      58                 : //
      59                 : // This type is all about the _download_ path for secondary mode.  The upload path
      60                 : // runs separately (see [`heatmap_uploader`]) while a regular attached `Tenant` exists.
      61                 : //
      62                 : // This structure coordinates TenantManager and SecondaryDownloader,
      63                 : // so that the downloader can indicate which tenants it is currently
      64                 : // operating on, and the manager can indicate when a particular
      65                 : // secondary tenant should cancel any work in flight.
      66 UBC           0 : #[derive(Debug)]
      67                 : pub(crate) struct SecondaryTenant {
      68                 :     /// Carrying a tenant shard ID simplifies callers such as the downloader
      69                 :     /// which need to organize many of these objects by ID.
      70                 :     tenant_shard_id: TenantShardId,
      71                 : 
      72                 :     /// Cancellation token indicates to SecondaryDownloader that it should stop doing
      73                 :     /// any work for this tenant at the next opportunity.
      74                 :     pub(crate) cancel: CancellationToken,
      75                 : 
      76                 :     pub(crate) gate: Gate,
      77                 : 
      78                 :     detail: std::sync::Mutex<SecondaryDetail>,
      79                 : }
      80                 : 
      81                 : impl SecondaryTenant {
      82 CBC          27 :     pub(crate) fn new(
      83              27 :         tenant_shard_id: TenantShardId,
      84              27 :         config: &SecondaryLocationConfig,
      85              27 :     ) -> Arc<Self> {
      86              27 :         Arc::new(Self {
      87              27 :             tenant_shard_id,
      88              27 :             // todo: shall we make this a descendent of the
      89              27 :             // main cancellation token, or is it sufficient that
      90              27 :             // on shutdown we walk the tenants and fire their
      91              27 :             // individual cancellations?
      92              27 :             cancel: CancellationToken::new(),
      93              27 :             gate: Gate::new(format!("SecondaryTenant {tenant_shard_id}")),
      94              27 : 
      95              27 :             detail: std::sync::Mutex::new(SecondaryDetail::new(config.clone())),
      96              27 :         })
      97              27 :     }
      98                 : 
      99              21 :     pub(crate) async fn shutdown(&self) {
     100              21 :         self.cancel.cancel();
     101              21 : 
     102              21 :         // Wait for any secondary downloader work to complete
     103              21 :         self.gate.close().await;
     104              21 :     }
     105                 : 
     106               6 :     pub(crate) fn set_config(&self, config: &SecondaryLocationConfig) {
     107               6 :         self.detail.lock().unwrap().config = config.clone();
     108               6 :     }
     109                 : 
     110              65 :     fn get_tenant_shard_id(&self) -> &TenantShardId {
     111              65 :         &self.tenant_shard_id
     112              65 :     }
     113                 : }
     114                 : 
     115                 : /// The SecondaryController is a pseudo-rpc client for administrative control of secondary mode downloads,
     116                 : /// and heatmap uploads.  This is not a hot data path: it's primarily a hook for tests,
     117                 : /// where we want to immediately upload/download for a particular tenant.  In normal operation
     118                 : /// uploads & downloads are autonomous and not driven by this interface.
     119                 : pub struct SecondaryController {
     120                 :     upload_req_tx: tokio::sync::mpsc::Sender<CommandRequest<UploadCommand>>,
     121                 :     download_req_tx: tokio::sync::mpsc::Sender<CommandRequest<DownloadCommand>>,
     122                 : }
     123                 : 
     124                 : impl SecondaryController {
     125              10 :     async fn dispatch<T>(
     126              10 :         &self,
     127              10 :         queue: &tokio::sync::mpsc::Sender<CommandRequest<T>>,
     128              10 :         payload: T,
     129              10 :     ) -> anyhow::Result<()> {
     130              10 :         let (response_tx, response_rx) = tokio::sync::oneshot::channel();
     131              10 : 
     132              10 :         queue
     133              10 :             .send(CommandRequest {
     134              10 :                 payload,
     135              10 :                 response_tx,
     136              10 :             })
     137 UBC           0 :             .await
     138 CBC          10 :             .map_err(|_| anyhow::anyhow!("Receiver shut down"))?;
     139                 : 
     140              10 :         let response = response_rx
     141              10 :             .await
     142              10 :             .map_err(|_| anyhow::anyhow!("Request dropped"))?;
     143                 : 
     144              10 :         response.result
     145              10 :     }
     146                 : 
     147               6 :     pub async fn upload_tenant(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> {
     148               6 :         self.dispatch(&self.upload_req_tx, UploadCommand::Upload(tenant_shard_id))
     149               6 :             .await
     150               6 :     }
     151               4 :     pub async fn download_tenant(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> {
     152               4 :         self.dispatch(
     153               4 :             &self.download_req_tx,
     154               4 :             DownloadCommand::Download(tenant_shard_id),
     155               4 :         )
     156               4 :         .await
     157               4 :     }
     158                 : }
     159                 : 
     160             557 : pub fn spawn_tasks(
     161             557 :     tenant_manager: Arc<TenantManager>,
     162             557 :     remote_storage: GenericRemoteStorage,
     163             557 :     background_jobs_can_start: Barrier,
     164             557 :     cancel: CancellationToken,
     165             557 : ) -> SecondaryController {
     166             557 :     let mgr_clone = tenant_manager.clone();
     167             557 :     let storage_clone = remote_storage.clone();
     168             557 :     let cancel_clone = cancel.clone();
     169             557 :     let bg_jobs_clone = background_jobs_can_start.clone();
     170             557 : 
     171             557 :     let (download_req_tx, download_req_rx) =
     172             557 :         tokio::sync::mpsc::channel::<CommandRequest<DownloadCommand>>(16);
     173             557 :     let (upload_req_tx, upload_req_rx) =
     174             557 :         tokio::sync::mpsc::channel::<CommandRequest<UploadCommand>>(16);
     175             557 : 
     176             557 :     task_mgr::spawn(
     177             557 :         BACKGROUND_RUNTIME.handle(),
     178             557 :         TaskKind::SecondaryDownloads,
     179             557 :         None,
     180             557 :         None,
     181             557 :         "secondary tenant downloads",
     182             557 :         false,
     183             557 :         async move {
     184             557 :             downloader_task(
     185             557 :                 mgr_clone,
     186             557 :                 storage_clone,
     187             557 :                 download_req_rx,
     188             557 :                 bg_jobs_clone,
     189             557 :                 cancel_clone,
     190             557 :             )
     191             907 :             .await;
     192                 : 
     193             159 :             Ok(())
     194             557 :         },
     195             557 :     );
     196             557 : 
     197             557 :     task_mgr::spawn(
     198             557 :         BACKGROUND_RUNTIME.handle(),
     199             557 :         TaskKind::SecondaryUploads,
     200             557 :         None,
     201             557 :         None,
     202             557 :         "heatmap uploads",
     203             557 :         false,
     204             557 :         async move {
     205             557 :             heatmap_uploader_task(
     206             557 :                 tenant_manager,
     207             557 :                 remote_storage,
     208             557 :                 upload_req_rx,
     209             557 :                 background_jobs_can_start,
     210             557 :                 cancel,
     211             557 :             )
     212             908 :             .await;
     213                 : 
     214             159 :             Ok(())
     215             557 :         },
     216             557 :     );
     217             557 : 
     218             557 :     SecondaryController {
     219             557 :         download_req_tx,
     220             557 :         upload_req_tx,
     221             557 :     }
     222             557 : }
     223                 : 
     224                 : /// For running with remote storage disabled: a SecondaryController that is connected to nothing.
     225 UBC           0 : pub fn null_controller() -> SecondaryController {
     226               0 :     let (download_req_tx, _download_req_rx) =
     227               0 :         tokio::sync::mpsc::channel::<CommandRequest<DownloadCommand>>(16);
     228               0 :     let (upload_req_tx, _upload_req_rx) =
     229               0 :         tokio::sync::mpsc::channel::<CommandRequest<UploadCommand>>(16);
     230               0 :     SecondaryController {
     231               0 :         upload_req_tx,
     232               0 :         download_req_tx,
     233               0 :     }
     234               0 : }
        

Generated by: LCOV version 2.1-beta