Line data Source code
1 : use std::{collections::HashMap, sync::Arc};
2 :
3 : use anyhow::Context;
4 : use async_compression::tokio::write::GzipEncoder;
5 : use camino::{Utf8Path, Utf8PathBuf};
6 : use metrics::core::{AtomicU64, GenericCounter};
7 : use pageserver_api::{config::BasebackupCacheConfig, models::TenantState};
8 : use tokio::{
9 : io::{AsyncWriteExt, BufWriter},
10 : sync::mpsc::{UnboundedReceiver, UnboundedSender},
11 : };
12 : use tokio_util::sync::CancellationToken;
13 : use utils::{
14 : id::{TenantId, TenantTimelineId, TimelineId},
15 : lsn::Lsn,
16 : shard::TenantShardId,
17 : };
18 :
19 : use crate::{
20 : basebackup::send_basebackup_tarball,
21 : context::{DownloadBehavior, RequestContext},
22 : metrics::{BASEBACKUP_CACHE_ENTRIES, BASEBACKUP_CACHE_PREPARE, BASEBACKUP_CACHE_READ},
23 : task_mgr::TaskKind,
24 : tenant::{
25 : Timeline,
26 : mgr::{TenantManager, TenantSlot},
27 : },
28 : };
29 :
30 : pub struct BasebackupPrepareRequest {
31 : pub tenant_shard_id: TenantShardId,
32 : pub timeline_id: TimelineId,
33 : pub lsn: Lsn,
34 : }
35 :
36 : pub type BasebackupPrepareSender = UnboundedSender<BasebackupPrepareRequest>;
37 : pub type BasebackupPrepareReceiver = UnboundedReceiver<BasebackupPrepareRequest>;
38 :
39 : type BasebackupRemoveEntrySender = UnboundedSender<Utf8PathBuf>;
40 : type BasebackupRemoveEntryReceiver = UnboundedReceiver<Utf8PathBuf>;
41 :
42 : /// BasebackupCache stores cached basebackup archives for timelines on local disk.
43 : ///
44 : /// The main purpose of this cache is to speed up the startup process of compute nodes
45 : /// after scaling to zero.
46 : /// Thus, the basebackup is stored only for the latest LSN of the timeline and with
47 : /// fixed set of parameters (gzip=true, full_backup=false, replica=false, prev_lsn=none).
48 : ///
49 : /// The cache receives prepare requests through the `BasebackupPrepareSender` channel,
50 : /// generates a basebackup from the timeline in the background, and stores it on disk.
51 : ///
52 : /// Basebackup requests are pretty rare. We expect ~thousands of entries in the cache
53 : /// and ~1 RPS for get requests.
54 : pub struct BasebackupCache {
55 : data_dir: Utf8PathBuf,
56 : config: BasebackupCacheConfig,
57 : tenant_manager: Arc<TenantManager>,
58 : remove_entry_sender: BasebackupRemoveEntrySender,
59 :
60 : entries: std::sync::Mutex<HashMap<TenantTimelineId, Lsn>>,
61 :
62 : cancel: CancellationToken,
63 :
64 : read_hit_count: GenericCounter<AtomicU64>,
65 : read_miss_count: GenericCounter<AtomicU64>,
66 : read_err_count: GenericCounter<AtomicU64>,
67 :
68 : prepare_ok_count: GenericCounter<AtomicU64>,
69 : prepare_skip_count: GenericCounter<AtomicU64>,
70 : prepare_err_count: GenericCounter<AtomicU64>,
71 : }
72 :
73 : impl BasebackupCache {
74 : /// Creates a BasebackupCache and spawns the background task.
75 : /// The initialization of the cache is performed in the background and does not
76 : /// block the caller. The cache will return `None` for any get requests until
77 : /// initialization is complete.
78 0 : pub fn spawn(
79 0 : runtime_handle: &tokio::runtime::Handle,
80 0 : data_dir: Utf8PathBuf,
81 0 : config: Option<BasebackupCacheConfig>,
82 0 : prepare_receiver: BasebackupPrepareReceiver,
83 0 : tenant_manager: Arc<TenantManager>,
84 0 : cancel: CancellationToken,
85 0 : ) -> Arc<Self> {
86 0 : let (remove_entry_sender, remove_entry_receiver) = tokio::sync::mpsc::unbounded_channel();
87 0 :
88 0 : let enabled = config.is_some();
89 0 :
90 0 : let cache = Arc::new(BasebackupCache {
91 0 : data_dir,
92 0 : config: config.unwrap_or_default(),
93 0 : tenant_manager,
94 0 : remove_entry_sender,
95 0 :
96 0 : entries: std::sync::Mutex::new(HashMap::new()),
97 0 :
98 0 : cancel,
99 0 :
100 0 : read_hit_count: BASEBACKUP_CACHE_READ.with_label_values(&["hit"]),
101 0 : read_miss_count: BASEBACKUP_CACHE_READ.with_label_values(&["miss"]),
102 0 : read_err_count: BASEBACKUP_CACHE_READ.with_label_values(&["error"]),
103 0 :
104 0 : prepare_ok_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["ok"]),
105 0 : prepare_skip_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["skip"]),
106 0 : prepare_err_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["error"]),
107 0 : });
108 0 :
109 0 : if enabled {
110 0 : runtime_handle.spawn(
111 0 : cache
112 0 : .clone()
113 0 : .background(prepare_receiver, remove_entry_receiver),
114 0 : );
115 0 : }
116 :
117 0 : cache
118 0 : }
119 :
120 : /// Gets a basebackup entry from the cache.
121 : /// If the entry is found, opens a file with the basebackup archive and returns it.
122 : /// The open file descriptor will prevent the file system from deleting the file
123 : /// even if the entry is removed from the cache in the background.
124 0 : pub async fn get(
125 0 : &self,
126 0 : tenant_id: TenantId,
127 0 : timeline_id: TimelineId,
128 0 : lsn: Lsn,
129 0 : ) -> Option<tokio::fs::File> {
130 0 : // Fast path. Check if the entry exists using the in-memory state.
131 0 : let tti = TenantTimelineId::new(tenant_id, timeline_id);
132 0 : if self.entries.lock().unwrap().get(&tti) != Some(&lsn) {
133 0 : self.read_miss_count.inc();
134 0 : return None;
135 0 : }
136 0 :
137 0 : let path = self.entry_path(tenant_id, timeline_id, lsn);
138 0 :
139 0 : match tokio::fs::File::open(path).await {
140 0 : Ok(file) => {
141 0 : self.read_hit_count.inc();
142 0 : Some(file)
143 : }
144 0 : Err(e) => {
145 0 : if e.kind() == std::io::ErrorKind::NotFound {
146 0 : // We may end up here if the basebackup was concurrently removed by the cleanup task.
147 0 : self.read_miss_count.inc();
148 0 : } else {
149 0 : self.read_err_count.inc();
150 0 : tracing::warn!("Unexpected error opening basebackup cache file: {:?}", e);
151 : }
152 0 : None
153 : }
154 : }
155 0 : }
156 :
157 : // Private methods.
158 :
159 0 : fn entry_filename(tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn) -> String {
160 0 : // The default format for LSN is 0/ABCDEF.
161 0 : // The backslash is not filename friendly, so serialize it as plain hex.
162 0 : let lsn = lsn.0;
163 0 : format!("basebackup_{tenant_id}_{timeline_id}_{lsn:016X}.tar.gz")
164 0 : }
165 :
166 0 : fn entry_path(&self, tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn) -> Utf8PathBuf {
167 0 : self.data_dir
168 0 : .join(Self::entry_filename(tenant_id, timeline_id, lsn))
169 0 : }
170 :
171 0 : fn tmp_dir(&self) -> Utf8PathBuf {
172 0 : self.data_dir.join("tmp")
173 0 : }
174 :
175 0 : fn entry_tmp_path(
176 0 : &self,
177 0 : tenant_id: TenantId,
178 0 : timeline_id: TimelineId,
179 0 : lsn: Lsn,
180 0 : ) -> Utf8PathBuf {
181 0 : self.tmp_dir()
182 0 : .join(Self::entry_filename(tenant_id, timeline_id, lsn))
183 0 : }
184 :
185 0 : fn parse_entry_filename(filename: &str) -> Option<(TenantId, TimelineId, Lsn)> {
186 0 : let parts: Vec<&str> = filename
187 0 : .strip_prefix("basebackup_")?
188 0 : .strip_suffix(".tar.gz")?
189 0 : .split('_')
190 0 : .collect();
191 0 : if parts.len() != 3 {
192 0 : return None;
193 0 : }
194 0 : let tenant_id = parts[0].parse::<TenantId>().ok()?;
195 0 : let timeline_id = parts[1].parse::<TimelineId>().ok()?;
196 0 : let lsn = Lsn(u64::from_str_radix(parts[2], 16).ok()?);
197 :
198 0 : Some((tenant_id, timeline_id, lsn))
199 0 : }
200 :
201 : // Recreate the tmp directory to clear all files in it.
202 0 : async fn clean_tmp_dir(&self) -> anyhow::Result<()> {
203 0 : let tmp_dir = self.tmp_dir();
204 0 : if tmp_dir.exists() {
205 0 : tokio::fs::remove_dir_all(&tmp_dir).await?;
206 0 : }
207 0 : tokio::fs::create_dir_all(&tmp_dir).await?;
208 0 : Ok(())
209 0 : }
210 :
211 0 : async fn cleanup(&self) -> anyhow::Result<()> {
212 0 : self.clean_tmp_dir().await?;
213 :
214 : // Remove outdated entries.
215 0 : let entries_old = self.entries.lock().unwrap().clone();
216 0 : let mut entries_new = HashMap::new();
217 0 : for (tenant_shard_id, tenant_slot) in self.tenant_manager.list() {
218 0 : if !tenant_shard_id.is_shard_zero() {
219 0 : continue;
220 0 : }
221 0 : let TenantSlot::Attached(tenant) = tenant_slot else {
222 0 : continue;
223 : };
224 0 : let tenant_id = tenant_shard_id.tenant_id;
225 :
226 0 : for timeline in tenant.list_timelines() {
227 0 : let tti = TenantTimelineId::new(tenant_id, timeline.timeline_id);
228 0 : if let Some(&entry_lsn) = entries_old.get(&tti) {
229 0 : if timeline.get_last_record_lsn() <= entry_lsn {
230 0 : entries_new.insert(tti, entry_lsn);
231 0 : }
232 0 : }
233 : }
234 : }
235 :
236 0 : for (&tti, &lsn) in entries_old.iter() {
237 0 : if !entries_new.contains_key(&tti) {
238 0 : self.remove_entry_sender
239 0 : .send(self.entry_path(tti.tenant_id, tti.timeline_id, lsn))
240 0 : .unwrap();
241 0 : }
242 : }
243 :
244 0 : BASEBACKUP_CACHE_ENTRIES.set(entries_new.len() as i64);
245 0 : *self.entries.lock().unwrap() = entries_new;
246 0 :
247 0 : Ok(())
248 0 : }
249 :
250 0 : async fn on_startup(&self) -> anyhow::Result<()> {
251 0 : // Create data_dir if it does not exist.
252 0 : tokio::fs::create_dir_all(&self.data_dir)
253 0 : .await
254 0 : .context("Failed to create basebackup cache data directory")?;
255 :
256 0 : self.clean_tmp_dir()
257 0 : .await
258 0 : .context("Failed to clean tmp directory")?;
259 :
260 : // Read existing entries from the data_dir and add them to in-memory state.
261 0 : let mut entries = HashMap::new();
262 0 : let mut dir = tokio::fs::read_dir(&self.data_dir).await?;
263 0 : while let Some(dir_entry) = dir.next_entry().await? {
264 0 : let filename = dir_entry.file_name();
265 0 :
266 0 : if filename == "tmp" {
267 : // Skip the tmp directory.
268 0 : continue;
269 0 : }
270 0 :
271 0 : let parsed = Self::parse_entry_filename(filename.to_string_lossy().as_ref());
272 0 : let Some((tenant_id, timeline_id, lsn)) = parsed else {
273 0 : tracing::warn!("Invalid basebackup cache file name: {:?}", filename);
274 0 : continue;
275 : };
276 :
277 0 : let tti = TenantTimelineId::new(tenant_id, timeline_id);
278 :
279 : use std::collections::hash_map::Entry::*;
280 :
281 0 : match entries.entry(tti) {
282 0 : Occupied(mut entry) => {
283 0 : let entry_lsn = *entry.get();
284 0 : // Leave only the latest entry, remove the old one.
285 0 : if lsn < entry_lsn {
286 0 : self.remove_entry_sender.send(self.entry_path(
287 0 : tenant_id,
288 0 : timeline_id,
289 0 : lsn,
290 0 : ))?;
291 0 : } else if lsn > entry_lsn {
292 0 : self.remove_entry_sender.send(self.entry_path(
293 0 : tenant_id,
294 0 : timeline_id,
295 0 : entry_lsn,
296 0 : ))?;
297 0 : entry.insert(lsn);
298 : } else {
299 : // Two different filenames parsed to the same timline_id and LSN.
300 : // Should never happen.
301 0 : return Err(anyhow::anyhow!(
302 0 : "Duplicate basebackup cache entry with the same LSN: {:?}",
303 0 : filename
304 0 : ));
305 : }
306 : }
307 0 : Vacant(entry) => {
308 0 : entry.insert(lsn);
309 0 : }
310 : }
311 : }
312 :
313 0 : BASEBACKUP_CACHE_ENTRIES.set(entries.len() as i64);
314 0 : *self.entries.lock().unwrap() = entries;
315 0 :
316 0 : Ok(())
317 0 : }
318 :
319 0 : async fn background(
320 0 : self: Arc<Self>,
321 0 : mut prepare_receiver: BasebackupPrepareReceiver,
322 0 : mut remove_entry_receiver: BasebackupRemoveEntryReceiver,
323 0 : ) {
324 0 : // Panic in the background is a safe fallback.
325 0 : // It will drop receivers and the cache will be effectively disabled.
326 0 : self.on_startup()
327 0 : .await
328 0 : .expect("Failed to initialize basebackup cache");
329 0 :
330 0 : let mut cleanup_ticker = tokio::time::interval(self.config.cleanup_period);
331 0 : cleanup_ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
332 :
333 : loop {
334 0 : tokio::select! {
335 0 : Some(req) = prepare_receiver.recv() => {
336 0 : if let Err(err) = self.prepare_basebackup(
337 0 : req.tenant_shard_id,
338 0 : req.timeline_id,
339 0 : req.lsn,
340 0 : ).await {
341 0 : tracing::info!("Failed to prepare basebackup: {:#}", err);
342 0 : self.prepare_err_count.inc();
343 0 : continue;
344 0 : }
345 : }
346 0 : Some(req) = remove_entry_receiver.recv() => {
347 0 : if let Err(e) = tokio::fs::remove_file(req).await {
348 0 : tracing::warn!("Failed to remove basebackup cache file: {:#}", e);
349 0 : }
350 : }
351 0 : _ = cleanup_ticker.tick() => {
352 0 : self.cleanup().await.unwrap_or_else(|e| {
353 0 : tracing::warn!("Failed to clean up basebackup cache: {:#}", e);
354 0 : });
355 0 : }
356 0 : _ = self.cancel.cancelled() => {
357 0 : tracing::info!("BasebackupCache background task cancelled");
358 0 : break;
359 0 : }
360 0 : }
361 0 : }
362 0 : }
363 :
364 : /// Prepare a basebackup for the given timeline.
365 : ///
366 : /// If the basebackup already exists with a higher LSN or the timeline already
367 : /// has a higher last_record_lsn, skip the preparation.
368 : ///
369 : /// The basebackup is prepared in a temporary directory and then moved to the final
370 : /// location to make the operation atomic.
371 0 : async fn prepare_basebackup(
372 0 : &self,
373 0 : tenant_shard_id: TenantShardId,
374 0 : timeline_id: TimelineId,
375 0 : req_lsn: Lsn,
376 0 : ) -> anyhow::Result<()> {
377 0 : tracing::info!(
378 : tenant_id = %tenant_shard_id.tenant_id,
379 : %timeline_id,
380 : %req_lsn,
381 0 : "Preparing basebackup for timeline",
382 : );
383 :
384 0 : let tti = TenantTimelineId::new(tenant_shard_id.tenant_id, timeline_id);
385 0 :
386 0 : {
387 0 : let entries = self.entries.lock().unwrap();
388 0 : if let Some(&entry_lsn) = entries.get(&tti) {
389 0 : if entry_lsn >= req_lsn {
390 0 : tracing::info!(
391 : %timeline_id,
392 : %req_lsn,
393 : %entry_lsn,
394 0 : "Basebackup entry already exists for timeline with higher LSN, skipping basebackup",
395 : );
396 0 : self.prepare_skip_count.inc();
397 0 : return Ok(());
398 0 : }
399 0 : }
400 :
401 0 : if entries.len() as i64 >= self.config.max_size_entries {
402 0 : tracing::info!(
403 : %timeline_id,
404 : %req_lsn,
405 0 : "Basebackup cache is full, skipping basebackup",
406 : );
407 0 : self.prepare_skip_count.inc();
408 0 : return Ok(());
409 0 : }
410 : }
411 :
412 0 : let tenant = self
413 0 : .tenant_manager
414 0 : .get_attached_tenant_shard(tenant_shard_id)?;
415 :
416 0 : let tenant_state = tenant.current_state();
417 0 : if tenant_state != TenantState::Active {
418 0 : anyhow::bail!(
419 0 : "Tenant {} is not active, current state: {:?}",
420 0 : tenant_shard_id.tenant_id,
421 0 : tenant_state
422 0 : )
423 0 : }
424 :
425 0 : let timeline = tenant.get_timeline(timeline_id, true)?;
426 :
427 0 : let last_record_lsn = timeline.get_last_record_lsn();
428 0 : if last_record_lsn > req_lsn {
429 0 : tracing::info!(
430 : %timeline_id,
431 : %req_lsn,
432 : %last_record_lsn,
433 0 : "Timeline has a higher LSN than the requested one, skipping basebackup",
434 : );
435 0 : self.prepare_skip_count.inc();
436 0 : return Ok(());
437 0 : }
438 0 :
439 0 : let entry_tmp_path = self.entry_tmp_path(tenant_shard_id.tenant_id, timeline_id, req_lsn);
440 :
441 0 : let res = self
442 0 : .prepare_basebackup_tmp(&entry_tmp_path, &timeline, req_lsn)
443 0 : .await;
444 :
445 0 : if let Err(err) = res {
446 0 : tracing::info!("Failed to prepare basebackup tmp file: {:#}", err);
447 : // Try to clean up tmp file. If we fail, the background clean up task will take care of it.
448 0 : match tokio::fs::remove_file(&entry_tmp_path).await {
449 0 : Ok(_) => {}
450 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
451 0 : Err(e) => {
452 0 : tracing::info!("Failed to remove basebackup tmp file: {:?}", e);
453 : }
454 : }
455 0 : return Err(err);
456 0 : }
457 0 :
458 0 : // Move the tmp file to the final location atomically.
459 0 : // The tmp file is fsynced, so it's guaranteed that we will not have a partial file
460 0 : // in the main directory.
461 0 : // It's not necessary to fsync the inode after renaming, because the worst case is that
462 0 : // the rename operation will be rolled back on the disk failure, the entry will disappear
463 0 : // from the main directory, and the entry access will cause a cache miss.
464 0 : let entry_path = self.entry_path(tenant_shard_id.tenant_id, timeline_id, req_lsn);
465 0 : tokio::fs::rename(&entry_tmp_path, &entry_path).await?;
466 :
467 0 : let mut entries = self.entries.lock().unwrap();
468 0 : if let Some(old_lsn) = entries.insert(tti, req_lsn) {
469 0 : // Remove the old entry if it exists.
470 0 : self.remove_entry_sender
471 0 : .send(self.entry_path(tenant_shard_id.tenant_id, timeline_id, old_lsn))
472 0 : .unwrap();
473 0 : }
474 0 : BASEBACKUP_CACHE_ENTRIES.set(entries.len() as i64);
475 0 :
476 0 : self.prepare_ok_count.inc();
477 0 : Ok(())
478 0 : }
479 :
480 : /// Prepares a basebackup in a temporary file.
481 : /// Guarantees that the tmp file is fsynced before returning.
482 0 : async fn prepare_basebackup_tmp(
483 0 : &self,
484 0 : entry_tmp_path: &Utf8Path,
485 0 : timeline: &Arc<Timeline>,
486 0 : req_lsn: Lsn,
487 0 : ) -> anyhow::Result<()> {
488 0 : let ctx = RequestContext::new(TaskKind::BasebackupCache, DownloadBehavior::Download);
489 0 : let ctx = ctx.with_scope_timeline(timeline);
490 :
491 0 : let file = tokio::fs::File::create(entry_tmp_path).await?;
492 0 : let mut writer = BufWriter::new(file);
493 0 :
494 0 : let mut encoder = GzipEncoder::with_quality(
495 0 : &mut writer,
496 0 : // Level::Best because compression is not on the hot path of basebackup requests.
497 0 : // The decompression is almost not affected by the compression level.
498 0 : async_compression::Level::Best,
499 0 : );
500 0 :
501 0 : // We may receive a request before the WAL record is applied to the timeline.
502 0 : // Wait for the requested LSN to be applied.
503 0 : timeline
504 0 : .wait_lsn(
505 0 : req_lsn,
506 0 : crate::tenant::timeline::WaitLsnWaiter::BaseBackupCache,
507 0 : crate::tenant::timeline::WaitLsnTimeout::Default,
508 0 : &ctx,
509 0 : )
510 0 : .await?;
511 :
512 0 : send_basebackup_tarball(
513 0 : &mut encoder,
514 0 : timeline,
515 0 : Some(req_lsn),
516 0 : None,
517 0 : false,
518 0 : false,
519 0 : &ctx,
520 0 : )
521 0 : .await?;
522 :
523 0 : encoder.shutdown().await?;
524 0 : writer.flush().await?;
525 0 : writer.into_inner().sync_all().await?;
526 :
527 0 : Ok(())
528 0 : }
529 : }
|