LCOV - code coverage report
Current view: top level - pageserver/src - basebackup_cache.rs (source / functions) Coverage Total Hit
Test: 472031e0b71f3195f7f21b1f2b20de09fd07bb56.info Lines: 0.0 % 349 0
Test Date: 2025-05-26 10:37:33 Functions: 0.0 % 19 0

            Line data    Source code
       1              : use std::{collections::HashMap, sync::Arc};
       2              : 
       3              : use async_compression::tokio::write::GzipEncoder;
       4              : use camino::{Utf8Path, Utf8PathBuf};
       5              : use metrics::core::{AtomicU64, GenericCounter};
       6              : use pageserver_api::{config::BasebackupCacheConfig, models::TenantState};
       7              : use tokio::{
       8              :     io::{AsyncWriteExt, BufWriter},
       9              :     sync::mpsc::{UnboundedReceiver, UnboundedSender},
      10              : };
      11              : use tokio_util::sync::CancellationToken;
      12              : use utils::{
      13              :     id::{TenantId, TenantTimelineId, TimelineId},
      14              :     lsn::Lsn,
      15              :     shard::TenantShardId,
      16              : };
      17              : 
      18              : use crate::{
      19              :     basebackup::send_basebackup_tarball,
      20              :     context::{DownloadBehavior, RequestContext},
      21              :     metrics::{BASEBACKUP_CACHE_ENTRIES, BASEBACKUP_CACHE_PREPARE, BASEBACKUP_CACHE_READ},
      22              :     task_mgr::TaskKind,
      23              :     tenant::{
      24              :         Timeline,
      25              :         mgr::{TenantManager, TenantSlot},
      26              :     },
      27              : };
      28              : 
      29              : pub struct BasebackupPrepareRequest {
      30              :     pub tenant_shard_id: TenantShardId,
      31              :     pub timeline_id: TimelineId,
      32              :     pub lsn: Lsn,
      33              : }
      34              : 
      35              : pub type BasebackupPrepareSender = UnboundedSender<BasebackupPrepareRequest>;
      36              : pub type BasebackupPrepareReceiver = UnboundedReceiver<BasebackupPrepareRequest>;
      37              : 
      38              : type BasebackupRemoveEntrySender = UnboundedSender<Utf8PathBuf>;
      39              : type BasebackupRemoveEntryReceiver = UnboundedReceiver<Utf8PathBuf>;
      40              : 
      41              : /// BasebackupCache stores cached basebackup archives for timelines on local disk.
      42              : ///
      43              : /// The main purpose of this cache is to speed up the startup process of compute nodes
      44              : /// after scaling to zero.
      45              : /// Thus, the basebackup is stored only for the latest LSN of the timeline and with
      46              : /// fixed set of parameters (gzip=true, full_backup=false, replica=false, prev_lsn=none).
      47              : ///
      48              : /// The cache receives prepare requests through the `BasebackupPrepareSender` channel,
      49              : /// generates a basebackup from the timeline in the background, and stores it on disk.
      50              : ///
      51              : /// Basebackup requests are pretty rare. We expect ~thousands of entries in the cache
      52              : /// and ~1 RPS for get requests.
      53              : pub struct BasebackupCache {
      54              :     data_dir: Utf8PathBuf,
      55              :     config: BasebackupCacheConfig,
      56              :     tenant_manager: Arc<TenantManager>,
      57              :     remove_entry_sender: BasebackupRemoveEntrySender,
      58              : 
      59              :     entries: std::sync::Mutex<HashMap<TenantTimelineId, Lsn>>,
      60              : 
      61              :     cancel: CancellationToken,
      62              : 
      63              :     read_hit_count: GenericCounter<AtomicU64>,
      64              :     read_miss_count: GenericCounter<AtomicU64>,
      65              :     read_err_count: GenericCounter<AtomicU64>,
      66              : 
      67              :     prepare_ok_count: GenericCounter<AtomicU64>,
      68              :     prepare_skip_count: GenericCounter<AtomicU64>,
      69              :     prepare_err_count: GenericCounter<AtomicU64>,
      70              : }
      71              : 
      72              : impl BasebackupCache {
      73              :     /// Creates a BasebackupCache and spawns the background task.
      74              :     /// The initialization of the cache is performed in the background and does not
      75              :     /// block the caller. The cache will return `None` for any get requests until
      76              :     /// initialization is complete.
      77            0 :     pub fn spawn(
      78            0 :         runtime_handle: &tokio::runtime::Handle,
      79            0 :         data_dir: Utf8PathBuf,
      80            0 :         config: Option<BasebackupCacheConfig>,
      81            0 :         prepare_receiver: BasebackupPrepareReceiver,
      82            0 :         tenant_manager: Arc<TenantManager>,
      83            0 :         cancel: CancellationToken,
      84            0 :     ) -> Arc<Self> {
      85            0 :         let (remove_entry_sender, remove_entry_receiver) = tokio::sync::mpsc::unbounded_channel();
      86            0 : 
      87            0 :         let enabled = config.is_some();
      88            0 : 
      89            0 :         let cache = Arc::new(BasebackupCache {
      90            0 :             data_dir,
      91            0 :             config: config.unwrap_or_default(),
      92            0 :             tenant_manager,
      93            0 :             remove_entry_sender,
      94            0 : 
      95            0 :             entries: std::sync::Mutex::new(HashMap::new()),
      96            0 : 
      97            0 :             cancel,
      98            0 : 
      99            0 :             read_hit_count: BASEBACKUP_CACHE_READ.with_label_values(&["hit"]),
     100            0 :             read_miss_count: BASEBACKUP_CACHE_READ.with_label_values(&["miss"]),
     101            0 :             read_err_count: BASEBACKUP_CACHE_READ.with_label_values(&["error"]),
     102            0 : 
     103            0 :             prepare_ok_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["ok"]),
     104            0 :             prepare_skip_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["skip"]),
     105            0 :             prepare_err_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["error"]),
     106            0 :         });
     107            0 : 
     108            0 :         if enabled {
     109            0 :             runtime_handle.spawn(
     110            0 :                 cache
     111            0 :                     .clone()
     112            0 :                     .background(prepare_receiver, remove_entry_receiver),
     113            0 :             );
     114            0 :         }
     115              : 
     116            0 :         cache
     117            0 :     }
     118              : 
     119              :     /// Gets a basebackup entry from the cache.
     120              :     /// If the entry is found, opens a file with the basebackup archive and returns it.
     121              :     /// The open file descriptor will prevent the file system from deleting the file
     122              :     /// even if the entry is removed from the cache in the background.
     123            0 :     pub async fn get(
     124            0 :         &self,
     125            0 :         tenant_id: TenantId,
     126            0 :         timeline_id: TimelineId,
     127            0 :         lsn: Lsn,
     128            0 :     ) -> Option<tokio::fs::File> {
     129            0 :         // Fast path. Check if the entry exists using the in-memory state.
     130            0 :         let tti = TenantTimelineId::new(tenant_id, timeline_id);
     131            0 :         if self.entries.lock().unwrap().get(&tti) != Some(&lsn) {
     132            0 :             self.read_miss_count.inc();
     133            0 :             return None;
     134            0 :         }
     135            0 : 
     136            0 :         let path = self.entry_path(tenant_id, timeline_id, lsn);
     137            0 : 
     138            0 :         match tokio::fs::File::open(path).await {
     139            0 :             Ok(file) => {
     140            0 :                 self.read_hit_count.inc();
     141            0 :                 Some(file)
     142              :             }
     143            0 :             Err(e) => {
     144            0 :                 if e.kind() == std::io::ErrorKind::NotFound {
     145            0 :                     // We may end up here if the basebackup was concurrently removed by the cleanup task.
     146            0 :                     self.read_miss_count.inc();
     147            0 :                 } else {
     148            0 :                     self.read_err_count.inc();
     149            0 :                     tracing::warn!("Unexpected error opening basebackup cache file: {:?}", e);
     150              :                 }
     151            0 :                 None
     152              :             }
     153              :         }
     154            0 :     }
     155              : 
     156              :     // Private methods.
     157              : 
     158            0 :     fn entry_filename(tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn) -> String {
     159            0 :         // The default format for LSN is 0/ABCDEF.
     160            0 :         // The backslash is not filename friendly, so serialize it as plain hex.
     161            0 :         let lsn = lsn.0;
     162            0 :         format!("basebackup_{tenant_id}_{timeline_id}_{lsn:016X}.tar.gz")
     163            0 :     }
     164              : 
     165            0 :     fn entry_path(&self, tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn) -> Utf8PathBuf {
     166            0 :         self.data_dir
     167            0 :             .join(Self::entry_filename(tenant_id, timeline_id, lsn))
     168            0 :     }
     169              : 
     170            0 :     fn entry_tmp_path(
     171            0 :         &self,
     172            0 :         tenant_id: TenantId,
     173            0 :         timeline_id: TimelineId,
     174            0 :         lsn: Lsn,
     175            0 :     ) -> Utf8PathBuf {
     176            0 :         self.data_dir
     177            0 :             .join("tmp")
     178            0 :             .join(Self::entry_filename(tenant_id, timeline_id, lsn))
     179            0 :     }
     180              : 
     181            0 :     fn parse_entry_filename(filename: &str) -> Option<(TenantId, TimelineId, Lsn)> {
     182            0 :         let parts: Vec<&str> = filename
     183            0 :             .strip_prefix("basebackup_")?
     184            0 :             .strip_suffix(".tar.gz")?
     185            0 :             .split('_')
     186            0 :             .collect();
     187            0 :         if parts.len() != 3 {
     188            0 :             return None;
     189            0 :         }
     190            0 :         let tenant_id = parts[0].parse::<TenantId>().ok()?;
     191            0 :         let timeline_id = parts[1].parse::<TimelineId>().ok()?;
     192            0 :         let lsn = Lsn(u64::from_str_radix(parts[2], 16).ok()?);
     193              : 
     194            0 :         Some((tenant_id, timeline_id, lsn))
     195            0 :     }
     196              : 
     197            0 :     async fn cleanup(&self) -> anyhow::Result<()> {
     198            0 :         // Cleanup tmp directory.
     199            0 :         let tmp_dir = self.data_dir.join("tmp");
     200            0 :         let mut tmp_dir = tokio::fs::read_dir(&tmp_dir).await?;
     201            0 :         while let Some(dir_entry) = tmp_dir.next_entry().await? {
     202            0 :             if let Err(e) = tokio::fs::remove_file(dir_entry.path()).await {
     203            0 :                 tracing::warn!("Failed to remove basebackup cache tmp file: {:#}", e);
     204            0 :             }
     205              :         }
     206              : 
     207              :         // Remove outdated entries.
     208            0 :         let entries_old = self.entries.lock().unwrap().clone();
     209            0 :         let mut entries_new = HashMap::new();
     210            0 :         for (tenant_shard_id, tenant_slot) in self.tenant_manager.list() {
     211            0 :             if !tenant_shard_id.is_shard_zero() {
     212            0 :                 continue;
     213            0 :             }
     214            0 :             let TenantSlot::Attached(tenant) = tenant_slot else {
     215            0 :                 continue;
     216              :             };
     217            0 :             let tenant_id = tenant_shard_id.tenant_id;
     218              : 
     219            0 :             for timeline in tenant.list_timelines() {
     220            0 :                 let tti = TenantTimelineId::new(tenant_id, timeline.timeline_id);
     221            0 :                 if let Some(&entry_lsn) = entries_old.get(&tti) {
     222            0 :                     if timeline.get_last_record_lsn() <= entry_lsn {
     223            0 :                         entries_new.insert(tti, entry_lsn);
     224            0 :                     }
     225            0 :                 }
     226              :             }
     227              :         }
     228              : 
     229            0 :         for (&tti, &lsn) in entries_old.iter() {
     230            0 :             if !entries_new.contains_key(&tti) {
     231            0 :                 self.remove_entry_sender
     232            0 :                     .send(self.entry_path(tti.tenant_id, tti.timeline_id, lsn))
     233            0 :                     .unwrap();
     234            0 :             }
     235              :         }
     236              : 
     237            0 :         BASEBACKUP_CACHE_ENTRIES.set(entries_new.len() as i64);
     238            0 :         *self.entries.lock().unwrap() = entries_new;
     239            0 : 
     240            0 :         Ok(())
     241            0 :     }
     242              : 
     243            0 :     async fn on_startup(&self) -> anyhow::Result<()> {
     244            0 :         // Create data_dir and tmp directory if they do not exist.
     245            0 :         tokio::fs::create_dir_all(&self.data_dir.join("tmp"))
     246            0 :             .await
     247            0 :             .map_err(|e| {
     248            0 :                 anyhow::anyhow!(
     249            0 :                     "Failed to create basebackup cache data_dir {:?}: {:?}",
     250            0 :                     self.data_dir,
     251            0 :                     e
     252            0 :                 )
     253            0 :             })?;
     254              : 
     255              :         // Read existing entries from the data_dir and add them to in-memory state.
     256            0 :         let mut entries = HashMap::new();
     257            0 :         let mut dir = tokio::fs::read_dir(&self.data_dir).await?;
     258            0 :         while let Some(dir_entry) = dir.next_entry().await? {
     259            0 :             let filename = dir_entry.file_name();
     260            0 : 
     261            0 :             if filename == "tmp" {
     262              :                 // Skip the tmp directory.
     263            0 :                 continue;
     264            0 :             }
     265            0 : 
     266            0 :             let parsed = Self::parse_entry_filename(filename.to_string_lossy().as_ref());
     267            0 :             let Some((tenant_id, timeline_id, lsn)) = parsed else {
     268            0 :                 tracing::warn!("Invalid basebackup cache file name: {:?}", filename);
     269            0 :                 continue;
     270              :             };
     271              : 
     272            0 :             let tti = TenantTimelineId::new(tenant_id, timeline_id);
     273              : 
     274              :             use std::collections::hash_map::Entry::*;
     275              : 
     276            0 :             match entries.entry(tti) {
     277            0 :                 Occupied(mut entry) => {
     278            0 :                     let entry_lsn = *entry.get();
     279            0 :                     // Leave only the latest entry, remove the old one.
     280            0 :                     if lsn < entry_lsn {
     281            0 :                         self.remove_entry_sender.send(self.entry_path(
     282            0 :                             tenant_id,
     283            0 :                             timeline_id,
     284            0 :                             lsn,
     285            0 :                         ))?;
     286            0 :                     } else if lsn > entry_lsn {
     287            0 :                         self.remove_entry_sender.send(self.entry_path(
     288            0 :                             tenant_id,
     289            0 :                             timeline_id,
     290            0 :                             entry_lsn,
     291            0 :                         ))?;
     292            0 :                         entry.insert(lsn);
     293              :                     } else {
     294              :                         // Two different filenames parsed to the same timline_id and LSN.
     295              :                         // Should never happen.
     296            0 :                         return Err(anyhow::anyhow!(
     297            0 :                             "Duplicate basebackup cache entry with the same LSN: {:?}",
     298            0 :                             filename
     299            0 :                         ));
     300              :                     }
     301              :                 }
     302            0 :                 Vacant(entry) => {
     303            0 :                     entry.insert(lsn);
     304            0 :                 }
     305              :             }
     306              :         }
     307              : 
     308            0 :         BASEBACKUP_CACHE_ENTRIES.set(entries.len() as i64);
     309            0 :         *self.entries.lock().unwrap() = entries;
     310            0 : 
     311            0 :         Ok(())
     312            0 :     }
     313              : 
     314            0 :     async fn background(
     315            0 :         self: Arc<Self>,
     316            0 :         mut prepare_receiver: BasebackupPrepareReceiver,
     317            0 :         mut remove_entry_receiver: BasebackupRemoveEntryReceiver,
     318            0 :     ) {
     319            0 :         // Panic in the background is a safe fallback.
     320            0 :         // It will drop receivers and the cache will be effectively disabled.
     321            0 :         self.on_startup()
     322            0 :             .await
     323            0 :             .expect("Failed to initialize basebackup cache");
     324            0 : 
     325            0 :         let mut cleanup_ticker = tokio::time::interval(self.config.cleanup_period);
     326            0 :         cleanup_ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
     327              : 
     328              :         loop {
     329            0 :             tokio::select! {
     330            0 :                 Some(req) = prepare_receiver.recv() => {
     331            0 :                     if let Err(err) = self.prepare_basebackup(
     332            0 :                         req.tenant_shard_id,
     333            0 :                         req.timeline_id,
     334            0 :                         req.lsn,
     335            0 :                     ).await {
     336            0 :                         tracing::info!("Failed to prepare basebackup: {:#}", err);
     337            0 :                         self.prepare_err_count.inc();
     338            0 :                         continue;
     339            0 :                     }
     340              :                 }
     341            0 :                 Some(req) = remove_entry_receiver.recv() => {
     342            0 :                     if let Err(e) = tokio::fs::remove_file(req).await {
     343            0 :                         tracing::warn!("Failed to remove basebackup cache file: {:#}", e);
     344            0 :                     }
     345              :                 }
     346            0 :                 _ = cleanup_ticker.tick() => {
     347            0 :                     self.cleanup().await.unwrap_or_else(|e| {
     348            0 :                         tracing::warn!("Failed to clean up basebackup cache: {:#}", e);
     349            0 :                     });
     350            0 :                 }
     351            0 :                 _ = self.cancel.cancelled() => {
     352            0 :                     tracing::info!("BasebackupCache background task cancelled");
     353            0 :                     break;
     354            0 :                 }
     355            0 :             }
     356            0 :         }
     357            0 :     }
     358              : 
     359              :     /// Prepare a basebackup for the given timeline.
     360              :     ///
     361              :     /// If the basebackup already exists with a higher LSN or the timeline already
     362              :     /// has a higher last_record_lsn, skip the preparation.
     363              :     ///
     364              :     /// The basebackup is prepared in a temporary directory and then moved to the final
     365              :     /// location to make the operation atomic.
     366            0 :     async fn prepare_basebackup(
     367            0 :         &self,
     368            0 :         tenant_shard_id: TenantShardId,
     369            0 :         timeline_id: TimelineId,
     370            0 :         req_lsn: Lsn,
     371            0 :     ) -> anyhow::Result<()> {
     372            0 :         tracing::info!(
     373              :             tenant_id = %tenant_shard_id.tenant_id,
     374              :             %timeline_id,
     375              :             %req_lsn,
     376            0 :             "Preparing basebackup for timeline",
     377              :         );
     378              : 
     379            0 :         let tti = TenantTimelineId::new(tenant_shard_id.tenant_id, timeline_id);
     380            0 : 
     381            0 :         {
     382            0 :             let entries = self.entries.lock().unwrap();
     383            0 :             if let Some(&entry_lsn) = entries.get(&tti) {
     384            0 :                 if entry_lsn >= req_lsn {
     385            0 :                     tracing::info!(
     386              :                         %timeline_id,
     387              :                         %req_lsn,
     388              :                         %entry_lsn,
     389            0 :                         "Basebackup entry already exists for timeline with higher LSN, skipping basebackup",
     390              :                     );
     391            0 :                     self.prepare_skip_count.inc();
     392            0 :                     return Ok(());
     393            0 :                 }
     394            0 :             }
     395              : 
     396            0 :             if entries.len() as i64 >= self.config.max_size_entries {
     397            0 :                 tracing::info!(
     398              :                     %timeline_id,
     399              :                     %req_lsn,
     400            0 :                     "Basebackup cache is full, skipping basebackup",
     401              :                 );
     402            0 :                 self.prepare_skip_count.inc();
     403            0 :                 return Ok(());
     404            0 :             }
     405              :         }
     406              : 
     407            0 :         let tenant = self
     408            0 :             .tenant_manager
     409            0 :             .get_attached_tenant_shard(tenant_shard_id)?;
     410              : 
     411            0 :         let tenant_state = tenant.current_state();
     412            0 :         if tenant_state != TenantState::Active {
     413            0 :             anyhow::bail!(
     414            0 :                 "Tenant {} is not active, current state: {:?}",
     415            0 :                 tenant_shard_id.tenant_id,
     416            0 :                 tenant_state
     417            0 :             )
     418            0 :         }
     419              : 
     420            0 :         let timeline = tenant.get_timeline(timeline_id, true)?;
     421              : 
     422            0 :         let last_record_lsn = timeline.get_last_record_lsn();
     423            0 :         if last_record_lsn > req_lsn {
     424            0 :             tracing::info!(
     425              :                 %timeline_id,
     426              :                 %req_lsn,
     427              :                 %last_record_lsn,
     428            0 :                 "Timeline has a higher LSN than the requested one, skipping basebackup",
     429              :             );
     430            0 :             self.prepare_skip_count.inc();
     431            0 :             return Ok(());
     432            0 :         }
     433            0 : 
     434            0 :         let entry_tmp_path = self.entry_tmp_path(tenant_shard_id.tenant_id, timeline_id, req_lsn);
     435              : 
     436            0 :         let res = self
     437            0 :             .prepare_basebackup_tmp(&entry_tmp_path, &timeline, req_lsn)
     438            0 :             .await;
     439              : 
     440            0 :         if let Err(err) = res {
     441            0 :             tracing::info!("Failed to prepare basebackup tmp file: {:#}", err);
     442              :             // Try to clean up tmp file. If we fail, the background clean up task will take care of it.
     443            0 :             match tokio::fs::remove_file(&entry_tmp_path).await {
     444            0 :                 Ok(_) => {}
     445            0 :                 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
     446            0 :                 Err(e) => {
     447            0 :                     tracing::info!("Failed to remove basebackup tmp file: {:?}", e);
     448              :                 }
     449              :             }
     450            0 :             return Err(err);
     451            0 :         }
     452            0 : 
     453            0 :         // Move the tmp file to the final location atomically.
     454            0 :         let entry_path = self.entry_path(tenant_shard_id.tenant_id, timeline_id, req_lsn);
     455            0 :         tokio::fs::rename(&entry_tmp_path, &entry_path).await?;
     456              : 
     457            0 :         let mut entries = self.entries.lock().unwrap();
     458            0 :         if let Some(old_lsn) = entries.insert(tti, req_lsn) {
     459            0 :             // Remove the old entry if it exists.
     460            0 :             self.remove_entry_sender
     461            0 :                 .send(self.entry_path(tenant_shard_id.tenant_id, timeline_id, old_lsn))
     462            0 :                 .unwrap();
     463            0 :         }
     464            0 :         BASEBACKUP_CACHE_ENTRIES.set(entries.len() as i64);
     465            0 : 
     466            0 :         self.prepare_ok_count.inc();
     467            0 :         Ok(())
     468            0 :     }
     469              : 
     470              :     /// Prepares a basebackup in a temporary file.
     471            0 :     async fn prepare_basebackup_tmp(
     472            0 :         &self,
     473            0 :         emptry_tmp_path: &Utf8Path,
     474            0 :         timeline: &Arc<Timeline>,
     475            0 :         req_lsn: Lsn,
     476            0 :     ) -> anyhow::Result<()> {
     477            0 :         let ctx = RequestContext::new(TaskKind::BasebackupCache, DownloadBehavior::Download);
     478            0 :         let ctx = ctx.with_scope_timeline(timeline);
     479              : 
     480            0 :         let file = tokio::fs::File::create(emptry_tmp_path).await?;
     481            0 :         let mut writer = BufWriter::new(file);
     482            0 : 
     483            0 :         let mut encoder = GzipEncoder::with_quality(
     484            0 :             &mut writer,
     485            0 :             // Level::Best because compression is not on the hot path of basebackup requests.
     486            0 :             // The decompression is almost not affected by the compression level.
     487            0 :             async_compression::Level::Best,
     488            0 :         );
     489            0 : 
     490            0 :         // We may receive a request before the WAL record is applied to the timeline.
     491            0 :         // Wait for the requested LSN to be applied.
     492            0 :         timeline
     493            0 :             .wait_lsn(
     494            0 :                 req_lsn,
     495            0 :                 crate::tenant::timeline::WaitLsnWaiter::BaseBackupCache,
     496            0 :                 crate::tenant::timeline::WaitLsnTimeout::Default,
     497            0 :                 &ctx,
     498            0 :             )
     499            0 :             .await?;
     500              : 
     501            0 :         send_basebackup_tarball(
     502            0 :             &mut encoder,
     503            0 :             timeline,
     504            0 :             Some(req_lsn),
     505            0 :             None,
     506            0 :             false,
     507            0 :             false,
     508            0 :             &ctx,
     509            0 :         )
     510            0 :         .await?;
     511              : 
     512            0 :         encoder.shutdown().await?;
     513            0 :         writer.flush().await?;
     514            0 :         writer.into_inner().sync_all().await?;
     515              : 
     516            0 :         Ok(())
     517            0 :     }
     518              : }
        

Generated by: LCOV version 2.1-beta