Line data Source code
1 : use std::{collections::HashMap, sync::Arc};
2 :
3 : use anyhow::Context;
4 : use camino::{Utf8Path, Utf8PathBuf};
5 : use metrics::core::{AtomicU64, GenericCounter};
6 : use pageserver_api::{config::BasebackupCacheConfig, models::TenantState};
7 : use tokio::{
8 : io::{AsyncWriteExt, BufWriter},
9 : sync::mpsc::{Receiver, Sender, error::TrySendError},
10 : };
11 : use tokio_util::sync::CancellationToken;
12 : use utils::{
13 : id::{TenantId, TenantTimelineId, TimelineId},
14 : lsn::Lsn,
15 : shard::TenantShardId,
16 : };
17 :
18 : use crate::{
19 : basebackup::send_basebackup_tarball,
20 : context::{DownloadBehavior, RequestContext},
21 : metrics::{
22 : BASEBACKUP_CACHE_ENTRIES, BASEBACKUP_CACHE_PREPARE, BASEBACKUP_CACHE_PREPARE_QUEUE_SIZE,
23 : BASEBACKUP_CACHE_READ, BASEBACKUP_CACHE_SIZE,
24 : },
25 : task_mgr::TaskKind,
26 : tenant::{
27 : Timeline,
28 : mgr::{TenantManager, TenantSlot},
29 : },
30 : };
31 :
32 : pub struct BasebackupPrepareRequest {
33 : pub tenant_shard_id: TenantShardId,
34 : pub timeline_id: TimelineId,
35 : pub lsn: Lsn,
36 : }
37 :
38 : pub type BasebackupPrepareSender = Sender<BasebackupPrepareRequest>;
39 : pub type BasebackupPrepareReceiver = Receiver<BasebackupPrepareRequest>;
40 :
41 : #[derive(Clone)]
42 : struct CacheEntry {
43 : /// LSN at which the basebackup was taken.
44 : lsn: Lsn,
45 : /// Size of the basebackup archive in bytes.
46 : size_bytes: u64,
47 : }
48 :
49 : /// BasebackupCache stores cached basebackup archives for timelines on local disk.
50 : ///
51 : /// The main purpose of this cache is to speed up the startup process of compute nodes
52 : /// after scaling to zero.
53 : /// Thus, the basebackup is stored only for the latest LSN of the timeline and with
54 : /// fixed set of parameters (gzip=true, full_backup=false, replica=false, prev_lsn=none).
55 : ///
56 : /// The cache receives prepare requests through the `BasebackupPrepareSender` channel,
57 : /// generates a basebackup from the timeline in the background, and stores it on disk.
58 : ///
59 : /// Basebackup requests are pretty rare. We expect ~thousands of entries in the cache
60 : /// and ~1 RPS for get requests.
61 : pub struct BasebackupCache {
62 : data_dir: Utf8PathBuf,
63 : config: Option<BasebackupCacheConfig>,
64 :
65 : entries: std::sync::Mutex<HashMap<TenantTimelineId, CacheEntry>>,
66 :
67 : prepare_sender: BasebackupPrepareSender,
68 :
69 : read_hit_count: GenericCounter<AtomicU64>,
70 : read_miss_count: GenericCounter<AtomicU64>,
71 : read_err_count: GenericCounter<AtomicU64>,
72 :
73 : prepare_skip_count: GenericCounter<AtomicU64>,
74 : }
75 :
76 : impl BasebackupCache {
77 : /// Create a new BasebackupCache instance.
78 : /// Also returns a BasebackupPrepareReceiver which is needed to start
79 : /// the background task.
80 : /// The cache is initialized from the data_dir in the background task.
81 : /// The cache will return `None` for any get requests until the initialization is complete.
82 : /// The background task is spawned separately using [`Self::spawn_background_task`]
83 : /// to avoid a circular dependency between the cache and the tenant manager.
84 120 : pub fn new(
85 120 : data_dir: Utf8PathBuf,
86 120 : config: Option<BasebackupCacheConfig>,
87 120 : ) -> (Arc<Self>, BasebackupPrepareReceiver) {
88 120 : let chan_size = config.as_ref().map(|c| c.max_size_entries).unwrap_or(1);
89 :
90 120 : let (prepare_sender, prepare_receiver) = tokio::sync::mpsc::channel(chan_size);
91 :
92 120 : let cache = Arc::new(BasebackupCache {
93 120 : data_dir,
94 120 : config,
95 120 : entries: std::sync::Mutex::new(HashMap::new()),
96 120 : prepare_sender,
97 120 :
98 120 : read_hit_count: BASEBACKUP_CACHE_READ.with_label_values(&["hit"]),
99 120 : read_miss_count: BASEBACKUP_CACHE_READ.with_label_values(&["miss"]),
100 120 : read_err_count: BASEBACKUP_CACHE_READ.with_label_values(&["error"]),
101 120 :
102 120 : prepare_skip_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["skip"]),
103 120 : });
104 :
105 120 : (cache, prepare_receiver)
106 120 : }
107 :
108 : /// Spawns the background task.
109 : /// The background task initializes the cache from the disk,
110 : /// processes prepare requests, and cleans up outdated cache entries.
111 : /// Noop if the cache is disabled (config is None).
112 0 : pub fn spawn_background_task(
113 0 : self: Arc<Self>,
114 0 : runtime_handle: &tokio::runtime::Handle,
115 0 : prepare_receiver: BasebackupPrepareReceiver,
116 0 : tenant_manager: Arc<TenantManager>,
117 0 : cancel: CancellationToken,
118 0 : ) {
119 0 : if let Some(config) = self.config.clone() {
120 0 : let background = BackgroundTask {
121 0 : c: self,
122 0 :
123 0 : config,
124 0 : tenant_manager,
125 0 : cancel,
126 0 :
127 0 : entry_count: 0,
128 0 : total_size_bytes: 0,
129 0 :
130 0 : prepare_ok_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["ok"]),
131 0 : prepare_skip_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["skip"]),
132 0 : prepare_err_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["error"]),
133 0 : };
134 0 : runtime_handle.spawn(background.run(prepare_receiver));
135 0 : }
136 0 : }
137 :
138 : /// Send a basebackup prepare request to the background task.
139 : /// The basebackup will be prepared asynchronously, it does not block the caller.
140 : /// The request will be skipped if any cache limits are exceeded.
141 0 : pub fn send_prepare(&self, tenant_shard_id: TenantShardId, timeline_id: TimelineId, lsn: Lsn) {
142 0 : let req = BasebackupPrepareRequest {
143 0 : tenant_shard_id,
144 0 : timeline_id,
145 0 : lsn,
146 0 : };
147 :
148 0 : BASEBACKUP_CACHE_PREPARE_QUEUE_SIZE.inc();
149 0 : let res = self.prepare_sender.try_send(req);
150 :
151 0 : if let Err(e) = res {
152 0 : BASEBACKUP_CACHE_PREPARE_QUEUE_SIZE.dec();
153 0 : self.prepare_skip_count.inc();
154 0 : match e {
155 : TrySendError::Full(_) => {
156 : // Basebackup prepares are pretty rare, normally we should not hit this.
157 0 : tracing::info!(
158 : tenant_id = %tenant_shard_id.tenant_id,
159 : %timeline_id,
160 : %lsn,
161 0 : "Basebackup prepare channel is full, skipping the request"
162 : );
163 : }
164 : TrySendError::Closed(_) => {
165 : // Normal during shutdown, not critical.
166 0 : tracing::info!(
167 : tenant_id = %tenant_shard_id.tenant_id,
168 : %timeline_id,
169 : %lsn,
170 0 : "Basebackup prepare channel is closed, skipping the request"
171 : );
172 : }
173 : }
174 0 : }
175 0 : }
176 :
177 : /// Gets a basebackup entry from the cache.
178 : /// If the entry is found, opens a file with the basebackup archive and returns it.
179 : /// The open file descriptor will prevent the file system from deleting the file
180 : /// even if the entry is removed from the cache in the background.
181 0 : pub async fn get(
182 0 : &self,
183 0 : tenant_id: TenantId,
184 0 : timeline_id: TimelineId,
185 0 : lsn: Lsn,
186 0 : ) -> Option<tokio::fs::File> {
187 0 : if !self.is_enabled() {
188 0 : return None;
189 0 : }
190 :
191 : // Fast path. Check if the entry exists using the in-memory state.
192 0 : let tti = TenantTimelineId::new(tenant_id, timeline_id);
193 0 : if self.entries.lock().unwrap().get(&tti).map(|e| e.lsn) != Some(lsn) {
194 0 : self.read_miss_count.inc();
195 0 : return None;
196 0 : }
197 :
198 0 : let path = self.entry_path(tenant_id, timeline_id, lsn);
199 :
200 0 : match tokio::fs::File::open(path).await {
201 0 : Ok(file) => {
202 0 : self.read_hit_count.inc();
203 0 : Some(file)
204 : }
205 0 : Err(e) => {
206 0 : if e.kind() == std::io::ErrorKind::NotFound {
207 0 : // We may end up here if the basebackup was concurrently removed by the cleanup task.
208 0 : self.read_miss_count.inc();
209 0 : } else {
210 0 : self.read_err_count.inc();
211 0 : tracing::warn!("Unexpected error opening basebackup cache file: {:?}", e);
212 : }
213 0 : None
214 : }
215 : }
216 0 : }
217 :
218 0 : pub fn is_enabled(&self) -> bool {
219 0 : self.config.is_some()
220 0 : }
221 :
222 : // Private methods.
223 :
224 0 : fn entry_filename(tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn) -> String {
225 : // The default format for LSN is 0/ABCDEF.
226 : // The backslash is not filename friendly, so serialize it as plain hex.
227 0 : let lsn = lsn.0;
228 0 : format!("basebackup_{tenant_id}_{timeline_id}_{lsn:016X}.tar.gz")
229 0 : }
230 :
231 0 : fn entry_path(&self, tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn) -> Utf8PathBuf {
232 0 : self.data_dir
233 0 : .join(Self::entry_filename(tenant_id, timeline_id, lsn))
234 0 : }
235 : }
236 :
237 : /// The background task that does the job to prepare basebackups
238 : /// and manage the cache entries on disk.
239 : /// It is a separate struct from BasebackupCache to allow holding
240 : /// a mutable reference to this state without a mutex lock,
241 : /// while BasebackupCache is referenced by the clients.
242 : struct BackgroundTask {
243 : c: Arc<BasebackupCache>,
244 :
245 : config: BasebackupCacheConfig,
246 : tenant_manager: Arc<TenantManager>,
247 : cancel: CancellationToken,
248 :
249 : /// Number of the entries in the cache.
250 : /// This counter is used for metrics and applying cache limits.
251 : /// It generally should be equal to c.entries.len(), but it's calculated
252 : /// pessimistically for abnormal situations: if we encountered some errors
253 : /// during removing the entry from disk, we won't decrement this counter to
254 : /// make sure that we don't exceed the limit with "trashed" files on the disk.
255 : /// It will also count files in the data_dir that are not valid cache entries.
256 : entry_count: usize,
257 : /// Total size of all the entries on the disk.
258 : /// This counter is used for metrics and applying cache limits.
259 : /// Similar to entry_count, it is calculated pessimistically for abnormal situations.
260 : total_size_bytes: u64,
261 :
262 : prepare_ok_count: GenericCounter<AtomicU64>,
263 : prepare_skip_count: GenericCounter<AtomicU64>,
264 : prepare_err_count: GenericCounter<AtomicU64>,
265 : }
266 :
267 : impl BackgroundTask {
268 0 : fn tmp_dir(&self) -> Utf8PathBuf {
269 0 : self.c.data_dir.join("tmp")
270 0 : }
271 :
272 0 : fn entry_tmp_path(
273 0 : &self,
274 0 : tenant_id: TenantId,
275 0 : timeline_id: TimelineId,
276 0 : lsn: Lsn,
277 0 : ) -> Utf8PathBuf {
278 0 : self.tmp_dir()
279 0 : .join(BasebackupCache::entry_filename(tenant_id, timeline_id, lsn))
280 0 : }
281 :
282 0 : fn parse_entry_filename(filename: &str) -> Option<(TenantId, TimelineId, Lsn)> {
283 0 : let parts: Vec<&str> = filename
284 0 : .strip_prefix("basebackup_")?
285 0 : .strip_suffix(".tar.gz")?
286 0 : .split('_')
287 0 : .collect();
288 0 : if parts.len() != 3 {
289 0 : return None;
290 0 : }
291 0 : let tenant_id = parts[0].parse::<TenantId>().ok()?;
292 0 : let timeline_id = parts[1].parse::<TimelineId>().ok()?;
293 0 : let lsn = Lsn(u64::from_str_radix(parts[2], 16).ok()?);
294 :
295 0 : Some((tenant_id, timeline_id, lsn))
296 0 : }
297 :
298 : // Recreate the tmp directory to clear all files in it.
299 0 : async fn clean_tmp_dir(&self) -> anyhow::Result<()> {
300 0 : let tmp_dir = self.tmp_dir();
301 0 : if tmp_dir.exists() {
302 0 : tokio::fs::remove_dir_all(&tmp_dir).await?;
303 0 : }
304 0 : tokio::fs::create_dir_all(&tmp_dir).await?;
305 0 : Ok(())
306 0 : }
307 :
308 0 : async fn cleanup(&mut self) -> anyhow::Result<()> {
309 0 : self.clean_tmp_dir().await?;
310 :
311 : // Leave only up-to-date entries.
312 0 : let entries_old = self.c.entries.lock().unwrap().clone();
313 0 : let mut entries_new = HashMap::new();
314 0 : for (tenant_shard_id, tenant_slot) in self.tenant_manager.list() {
315 0 : if !tenant_shard_id.is_shard_zero() {
316 0 : continue;
317 0 : }
318 0 : let TenantSlot::Attached(tenant) = tenant_slot else {
319 0 : continue;
320 : };
321 0 : let tenant_id = tenant_shard_id.tenant_id;
322 :
323 0 : for timeline in tenant.list_timelines() {
324 0 : let tti = TenantTimelineId::new(tenant_id, timeline.timeline_id);
325 0 : if let Some(entry) = entries_old.get(&tti) {
326 0 : if timeline.get_last_record_lsn() <= entry.lsn {
327 0 : entries_new.insert(tti, entry.clone());
328 0 : }
329 0 : }
330 : }
331 : }
332 :
333 : // Try to remove all entries that are not up-to-date.
334 0 : for (&tti, entry) in entries_old.iter() {
335 0 : if !entries_new.contains_key(&tti) {
336 0 : self.try_remove_entry(tti.tenant_id, tti.timeline_id, entry)
337 0 : .await;
338 0 : }
339 : }
340 :
341 : // Note: BackgroundTask is the only writer for self.c.entries,
342 : // so it couldn't have been modified concurrently.
343 0 : *self.c.entries.lock().unwrap() = entries_new;
344 :
345 0 : Ok(())
346 0 : }
347 :
348 0 : async fn on_startup(&mut self) -> anyhow::Result<()> {
349 : // Create data_dir if it does not exist.
350 0 : tokio::fs::create_dir_all(&self.c.data_dir)
351 0 : .await
352 0 : .context("Failed to create basebackup cache data directory")?;
353 :
354 0 : self.clean_tmp_dir()
355 0 : .await
356 0 : .context("Failed to clean tmp directory")?;
357 :
358 : // Read existing entries from the data_dir and add them to in-memory state.
359 0 : let mut entries = HashMap::<TenantTimelineId, CacheEntry>::new();
360 0 : let mut dir = tokio::fs::read_dir(&self.c.data_dir).await?;
361 0 : while let Some(dir_entry) = dir.next_entry().await? {
362 0 : let filename = dir_entry.file_name();
363 :
364 0 : if filename == "tmp" {
365 : // Skip the tmp directory.
366 0 : continue;
367 0 : }
368 :
369 0 : let size_bytes = dir_entry
370 0 : .metadata()
371 0 : .await
372 0 : .map_err(|e| {
373 0 : anyhow::anyhow!("Failed to read metadata for file {:?}: {:?}", filename, e)
374 0 : })?
375 0 : .len();
376 :
377 0 : self.entry_count += 1;
378 0 : BASEBACKUP_CACHE_ENTRIES.set(self.entry_count as u64);
379 :
380 0 : self.total_size_bytes += size_bytes;
381 0 : BASEBACKUP_CACHE_SIZE.set(self.total_size_bytes);
382 :
383 0 : let parsed = Self::parse_entry_filename(filename.to_string_lossy().as_ref());
384 0 : let Some((tenant_id, timeline_id, lsn)) = parsed else {
385 0 : tracing::warn!("Invalid basebackup cache file name: {:?}", filename);
386 0 : continue;
387 : };
388 :
389 0 : let cur_entry = CacheEntry { lsn, size_bytes };
390 :
391 0 : let tti = TenantTimelineId::new(tenant_id, timeline_id);
392 :
393 : use std::collections::hash_map::Entry::*;
394 :
395 0 : match entries.entry(tti) {
396 0 : Occupied(mut entry) => {
397 0 : let found_entry = entry.get();
398 : // Leave only the latest entry, remove the old one.
399 0 : if cur_entry.lsn < found_entry.lsn {
400 0 : self.try_remove_entry(tenant_id, timeline_id, &cur_entry)
401 0 : .await;
402 0 : } else if cur_entry.lsn > found_entry.lsn {
403 0 : self.try_remove_entry(tenant_id, timeline_id, found_entry)
404 0 : .await;
405 0 : entry.insert(cur_entry);
406 : } else {
407 : // Two different filenames parsed to the same timline_id and LSN.
408 : // Should never happen.
409 0 : return Err(anyhow::anyhow!(
410 0 : "Duplicate basebackup cache entry with the same LSN: {:?}",
411 0 : filename
412 0 : ));
413 : }
414 : }
415 0 : Vacant(entry) => {
416 0 : entry.insert(cur_entry);
417 0 : }
418 : }
419 : }
420 :
421 0 : *self.c.entries.lock().unwrap() = entries;
422 :
423 0 : Ok(())
424 0 : }
425 :
426 0 : async fn run(mut self, mut prepare_receiver: BasebackupPrepareReceiver) {
427 : // Panic in the background is a safe fallback.
428 : // It will drop receivers and the cache will be effectively disabled.
429 0 : self.on_startup()
430 0 : .await
431 0 : .expect("Failed to initialize basebackup cache");
432 :
433 0 : let mut cleanup_ticker = tokio::time::interval(self.config.cleanup_period);
434 0 : cleanup_ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
435 :
436 : loop {
437 0 : tokio::select! {
438 0 : Some(req) = prepare_receiver.recv() => {
439 0 : BASEBACKUP_CACHE_PREPARE_QUEUE_SIZE.dec();
440 0 : if let Err(err) = self.prepare_basebackup(
441 0 : req.tenant_shard_id,
442 0 : req.timeline_id,
443 0 : req.lsn,
444 0 : ).await {
445 0 : tracing::info!("Failed to prepare basebackup: {:#}", err);
446 0 : self.prepare_err_count.inc();
447 0 : continue;
448 0 : }
449 : }
450 0 : _ = cleanup_ticker.tick() => {
451 0 : self.cleanup().await.unwrap_or_else(|e| {
452 0 : tracing::warn!("Failed to clean up basebackup cache: {:#}", e);
453 0 : });
454 : }
455 0 : _ = self.cancel.cancelled() => {
456 0 : tracing::info!("BasebackupCache background task cancelled");
457 0 : break;
458 : }
459 : }
460 : }
461 0 : }
462 :
463 : /// Try to remove an entry from disk.
464 : /// The caller is responsible for removing the entry from the in-memory state.
465 : /// Updates size counters and corresponding metrics.
466 : /// Ignores the filesystem errors as not-so-important, but the size counters
467 : /// are not decremented in this case, so the file will continue to be counted
468 : /// towards the size limits.
469 0 : async fn try_remove_entry(
470 0 : &mut self,
471 0 : tenant_id: TenantId,
472 0 : timeline_id: TimelineId,
473 0 : entry: &CacheEntry,
474 0 : ) {
475 0 : let entry_path = self.c.entry_path(tenant_id, timeline_id, entry.lsn);
476 :
477 0 : match tokio::fs::remove_file(&entry_path).await {
478 0 : Ok(_) => {}
479 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
480 0 : Err(e) => {
481 0 : tracing::warn!(
482 0 : "Failed to remove basebackup cache file for tenant {} timeline {} LSN {}: {:#}",
483 : tenant_id,
484 : timeline_id,
485 : entry.lsn,
486 : e
487 : );
488 0 : return;
489 : }
490 : }
491 :
492 0 : self.entry_count -= 1;
493 0 : BASEBACKUP_CACHE_ENTRIES.set(self.entry_count as u64);
494 :
495 0 : self.total_size_bytes -= entry.size_bytes;
496 0 : BASEBACKUP_CACHE_SIZE.set(self.total_size_bytes);
497 0 : }
498 :
499 : /// Insert the cache entry into in-memory state and update the size counters.
500 : /// Assumes that the file for the entry already exists on disk.
501 : /// If the entry already exists with previous LSN, it will be removed.
502 0 : async fn upsert_entry(
503 0 : &mut self,
504 0 : tenant_id: TenantId,
505 0 : timeline_id: TimelineId,
506 0 : entry: CacheEntry,
507 0 : ) {
508 0 : let tti = TenantTimelineId::new(tenant_id, timeline_id);
509 :
510 0 : self.entry_count += 1;
511 0 : BASEBACKUP_CACHE_ENTRIES.set(self.entry_count as u64);
512 :
513 0 : self.total_size_bytes += entry.size_bytes;
514 0 : BASEBACKUP_CACHE_SIZE.set(self.total_size_bytes);
515 :
516 0 : let old_entry = self.c.entries.lock().unwrap().insert(tti, entry);
517 :
518 0 : if let Some(old_entry) = old_entry {
519 0 : self.try_remove_entry(tenant_id, timeline_id, &old_entry)
520 0 : .await;
521 0 : }
522 0 : }
523 :
524 : /// Prepare a basebackup for the given timeline.
525 : ///
526 : /// If the basebackup already exists with a higher LSN or the timeline already
527 : /// has a higher last_record_lsn, skip the preparation.
528 : ///
529 : /// The basebackup is prepared in a temporary directory and then moved to the final
530 : /// location to make the operation atomic.
531 0 : async fn prepare_basebackup(
532 0 : &mut self,
533 0 : tenant_shard_id: TenantShardId,
534 0 : timeline_id: TimelineId,
535 0 : req_lsn: Lsn,
536 0 : ) -> anyhow::Result<()> {
537 0 : tracing::info!(
538 : tenant_id = %tenant_shard_id.tenant_id,
539 : %timeline_id,
540 : %req_lsn,
541 0 : "Preparing basebackup for timeline",
542 : );
543 :
544 0 : let tti = TenantTimelineId::new(tenant_shard_id.tenant_id, timeline_id);
545 :
546 : // TODO(diko): I don't think we will hit the limit,
547 : // but if we do, it makes sense to try to evict oldest entries. here
548 0 : if self.entry_count >= self.config.max_size_entries {
549 0 : tracing::info!(
550 : %tenant_shard_id,
551 : %timeline_id,
552 : %req_lsn,
553 0 : "Basebackup cache is full (max_size_entries), skipping basebackup",
554 : );
555 0 : self.prepare_skip_count.inc();
556 0 : return Ok(());
557 0 : }
558 :
559 0 : if self.total_size_bytes >= self.config.max_total_size_bytes {
560 0 : tracing::info!(
561 : %tenant_shard_id,
562 : %timeline_id,
563 : %req_lsn,
564 0 : "Basebackup cache is full (max_total_size_bytes), skipping basebackup",
565 : );
566 0 : self.prepare_skip_count.inc();
567 0 : return Ok(());
568 0 : }
569 :
570 : {
571 0 : let entries = self.c.entries.lock().unwrap();
572 0 : if let Some(entry) = entries.get(&tti) {
573 0 : if entry.lsn >= req_lsn {
574 0 : tracing::info!(
575 : %timeline_id,
576 : %req_lsn,
577 : %entry.lsn,
578 0 : "Basebackup entry already exists for timeline with higher LSN, skipping basebackup",
579 : );
580 0 : self.prepare_skip_count.inc();
581 0 : return Ok(());
582 0 : }
583 0 : }
584 : }
585 :
586 0 : let tenant = self
587 0 : .tenant_manager
588 0 : .get_attached_tenant_shard(tenant_shard_id)?;
589 :
590 0 : let tenant_state = tenant.current_state();
591 0 : if tenant_state != TenantState::Active {
592 0 : anyhow::bail!(
593 0 : "Tenant {} is not active, current state: {:?}",
594 : tenant_shard_id.tenant_id,
595 : tenant_state
596 : )
597 0 : }
598 :
599 0 : let timeline = tenant.get_timeline(timeline_id, true)?;
600 :
601 0 : let last_record_lsn = timeline.get_last_record_lsn();
602 0 : if last_record_lsn > req_lsn {
603 0 : tracing::info!(
604 : %timeline_id,
605 : %req_lsn,
606 : %last_record_lsn,
607 0 : "Timeline has a higher LSN than the requested one, skipping basebackup",
608 : );
609 0 : self.prepare_skip_count.inc();
610 0 : return Ok(());
611 0 : }
612 :
613 0 : let entry_tmp_path = self.entry_tmp_path(tenant_shard_id.tenant_id, timeline_id, req_lsn);
614 :
615 0 : let res = self
616 0 : .prepare_basebackup_tmp(&entry_tmp_path, &timeline, req_lsn)
617 0 : .await;
618 :
619 0 : let entry = match res {
620 0 : Ok(entry) => entry,
621 0 : Err(err) => {
622 0 : tracing::info!("Failed to prepare basebackup tmp file: {:#}", err);
623 : // Try to clean up tmp file. If we fail, the background clean up task will take care of it.
624 0 : match tokio::fs::remove_file(&entry_tmp_path).await {
625 0 : Ok(_) => {}
626 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
627 0 : Err(e) => {
628 0 : tracing::info!("Failed to remove basebackup tmp file: {:?}", e);
629 : }
630 : }
631 0 : return Err(err);
632 : }
633 : };
634 :
635 : // Move the tmp file to the final location atomically.
636 : // The tmp file is fsynced, so it's guaranteed that we will not have a partial file
637 : // in the main directory.
638 : // It's not necessary to fsync the inode after renaming, because the worst case is that
639 : // the rename operation will be rolled back on the disk failure, the entry will disappear
640 : // from the main directory, and the entry access will cause a cache miss.
641 0 : let entry_path = self
642 0 : .c
643 0 : .entry_path(tenant_shard_id.tenant_id, timeline_id, req_lsn);
644 0 : tokio::fs::rename(&entry_tmp_path, &entry_path).await?;
645 :
646 0 : self.upsert_entry(tenant_shard_id.tenant_id, timeline_id, entry)
647 0 : .await;
648 :
649 0 : self.prepare_ok_count.inc();
650 0 : Ok(())
651 0 : }
652 :
653 : /// Prepares a basebackup in a temporary file.
654 : /// Guarantees that the tmp file is fsynced before returning.
655 0 : async fn prepare_basebackup_tmp(
656 0 : &self,
657 0 : entry_tmp_path: &Utf8Path,
658 0 : timeline: &Arc<Timeline>,
659 0 : req_lsn: Lsn,
660 0 : ) -> anyhow::Result<CacheEntry> {
661 0 : let ctx = RequestContext::new(TaskKind::BasebackupCache, DownloadBehavior::Download);
662 0 : let ctx = ctx.with_scope_timeline(timeline);
663 :
664 0 : let file = tokio::fs::File::create(entry_tmp_path).await?;
665 0 : let mut writer = BufWriter::new(file);
666 :
667 : // We may receive a request before the WAL record is applied to the timeline.
668 : // Wait for the requested LSN to be applied.
669 0 : timeline
670 0 : .wait_lsn(
671 0 : req_lsn,
672 0 : crate::tenant::timeline::WaitLsnWaiter::BaseBackupCache,
673 0 : crate::tenant::timeline::WaitLsnTimeout::Default,
674 0 : &ctx,
675 0 : )
676 0 : .await?;
677 :
678 0 : send_basebackup_tarball(
679 0 : &mut writer,
680 0 : timeline,
681 0 : Some(req_lsn),
682 0 : None,
683 0 : false,
684 0 : false,
685 0 : // Level::Best because compression is not on the hot path of basebackup requests.
686 0 : // The decompression is almost not affected by the compression level.
687 0 : Some(async_compression::Level::Best),
688 0 : &ctx,
689 0 : )
690 0 : .await?;
691 :
692 0 : writer.flush().await?;
693 0 : writer.into_inner().sync_all().await?;
694 :
695 : // TODO(diko): we can count it via Writer wrapper instead of a syscall.
696 0 : let size_bytes = tokio::fs::metadata(entry_tmp_path).await?.len();
697 :
698 0 : Ok(CacheEntry {
699 0 : lsn: req_lsn,
700 0 : size_bytes,
701 0 : })
702 0 : }
703 : }
|