Line data Source code
1 : use std::{collections::HashMap, sync::Arc};
2 :
3 : use async_compression::tokio::write::GzipEncoder;
4 : use camino::{Utf8Path, Utf8PathBuf};
5 : use metrics::core::{AtomicU64, GenericCounter};
6 : use pageserver_api::{config::BasebackupCacheConfig, models::TenantState};
7 : use tokio::{
8 : io::{AsyncWriteExt, BufWriter},
9 : sync::mpsc::{UnboundedReceiver, UnboundedSender},
10 : };
11 : use tokio_util::sync::CancellationToken;
12 : use utils::{
13 : id::{TenantId, TenantTimelineId, TimelineId},
14 : lsn::Lsn,
15 : shard::TenantShardId,
16 : };
17 :
18 : use crate::{
19 : basebackup::send_basebackup_tarball,
20 : context::{DownloadBehavior, RequestContext},
21 : metrics::{BASEBACKUP_CACHE_ENTRIES, BASEBACKUP_CACHE_PREPARE, BASEBACKUP_CACHE_READ},
22 : task_mgr::TaskKind,
23 : tenant::{
24 : Timeline,
25 : mgr::{TenantManager, TenantSlot},
26 : },
27 : };
28 :
29 : pub struct BasebackupPrepareRequest {
30 : pub tenant_shard_id: TenantShardId,
31 : pub timeline_id: TimelineId,
32 : pub lsn: Lsn,
33 : }
34 :
35 : pub type BasebackupPrepareSender = UnboundedSender<BasebackupPrepareRequest>;
36 : pub type BasebackupPrepareReceiver = UnboundedReceiver<BasebackupPrepareRequest>;
37 :
38 : type BasebackupRemoveEntrySender = UnboundedSender<Utf8PathBuf>;
39 : type BasebackupRemoveEntryReceiver = UnboundedReceiver<Utf8PathBuf>;
40 :
41 : /// BasebackupCache stores cached basebackup archives for timelines on local disk.
42 : ///
43 : /// The main purpose of this cache is to speed up the startup process of compute nodes
44 : /// after scaling to zero.
45 : /// Thus, the basebackup is stored only for the latest LSN of the timeline and with
46 : /// fixed set of parameters (gzip=true, full_backup=false, replica=false, prev_lsn=none).
47 : ///
48 : /// The cache receives prepare requests through the `BasebackupPrepareSender` channel,
49 : /// generates a basebackup from the timeline in the background, and stores it on disk.
50 : ///
51 : /// Basebackup requests are pretty rare. We expect ~thousands of entries in the cache
52 : /// and ~1 RPS for get requests.
53 : pub struct BasebackupCache {
54 : data_dir: Utf8PathBuf,
55 : config: BasebackupCacheConfig,
56 : tenant_manager: Arc<TenantManager>,
57 : remove_entry_sender: BasebackupRemoveEntrySender,
58 :
59 : entries: std::sync::Mutex<HashMap<TenantTimelineId, Lsn>>,
60 :
61 : cancel: CancellationToken,
62 :
63 : read_hit_count: GenericCounter<AtomicU64>,
64 : read_miss_count: GenericCounter<AtomicU64>,
65 : read_err_count: GenericCounter<AtomicU64>,
66 :
67 : prepare_ok_count: GenericCounter<AtomicU64>,
68 : prepare_skip_count: GenericCounter<AtomicU64>,
69 : prepare_err_count: GenericCounter<AtomicU64>,
70 : }
71 :
72 : impl BasebackupCache {
73 : /// Creates a BasebackupCache and spawns the background task.
74 : /// The initialization of the cache is performed in the background and does not
75 : /// block the caller. The cache will return `None` for any get requests until
76 : /// initialization is complete.
77 0 : pub fn spawn(
78 0 : runtime_handle: &tokio::runtime::Handle,
79 0 : data_dir: Utf8PathBuf,
80 0 : config: Option<BasebackupCacheConfig>,
81 0 : prepare_receiver: BasebackupPrepareReceiver,
82 0 : tenant_manager: Arc<TenantManager>,
83 0 : cancel: CancellationToken,
84 0 : ) -> Arc<Self> {
85 0 : let (remove_entry_sender, remove_entry_receiver) = tokio::sync::mpsc::unbounded_channel();
86 0 :
87 0 : let enabled = config.is_some();
88 0 :
89 0 : let cache = Arc::new(BasebackupCache {
90 0 : data_dir,
91 0 : config: config.unwrap_or_default(),
92 0 : tenant_manager,
93 0 : remove_entry_sender,
94 0 :
95 0 : entries: std::sync::Mutex::new(HashMap::new()),
96 0 :
97 0 : cancel,
98 0 :
99 0 : read_hit_count: BASEBACKUP_CACHE_READ.with_label_values(&["hit"]),
100 0 : read_miss_count: BASEBACKUP_CACHE_READ.with_label_values(&["miss"]),
101 0 : read_err_count: BASEBACKUP_CACHE_READ.with_label_values(&["error"]),
102 0 :
103 0 : prepare_ok_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["ok"]),
104 0 : prepare_skip_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["skip"]),
105 0 : prepare_err_count: BASEBACKUP_CACHE_PREPARE.with_label_values(&["error"]),
106 0 : });
107 0 :
108 0 : if enabled {
109 0 : runtime_handle.spawn(
110 0 : cache
111 0 : .clone()
112 0 : .background(prepare_receiver, remove_entry_receiver),
113 0 : );
114 0 : }
115 :
116 0 : cache
117 0 : }
118 :
119 : /// Gets a basebackup entry from the cache.
120 : /// If the entry is found, opens a file with the basebackup archive and returns it.
121 : /// The open file descriptor will prevent the file system from deleting the file
122 : /// even if the entry is removed from the cache in the background.
123 0 : pub async fn get(
124 0 : &self,
125 0 : tenant_id: TenantId,
126 0 : timeline_id: TimelineId,
127 0 : lsn: Lsn,
128 0 : ) -> Option<tokio::fs::File> {
129 0 : // Fast path. Check if the entry exists using the in-memory state.
130 0 : let tti = TenantTimelineId::new(tenant_id, timeline_id);
131 0 : if self.entries.lock().unwrap().get(&tti) != Some(&lsn) {
132 0 : self.read_miss_count.inc();
133 0 : return None;
134 0 : }
135 0 :
136 0 : let path = self.entry_path(tenant_id, timeline_id, lsn);
137 0 :
138 0 : match tokio::fs::File::open(path).await {
139 0 : Ok(file) => {
140 0 : self.read_hit_count.inc();
141 0 : Some(file)
142 : }
143 0 : Err(e) => {
144 0 : if e.kind() == std::io::ErrorKind::NotFound {
145 0 : // We may end up here if the basebackup was concurrently removed by the cleanup task.
146 0 : self.read_miss_count.inc();
147 0 : } else {
148 0 : self.read_err_count.inc();
149 0 : tracing::warn!("Unexpected error opening basebackup cache file: {:?}", e);
150 : }
151 0 : None
152 : }
153 : }
154 0 : }
155 :
156 : // Private methods.
157 :
158 0 : fn entry_filename(tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn) -> String {
159 0 : // The default format for LSN is 0/ABCDEF.
160 0 : // The backslash is not filename friendly, so serialize it as plain hex.
161 0 : let lsn = lsn.0;
162 0 : format!("basebackup_{tenant_id}_{timeline_id}_{lsn:016X}.tar.gz")
163 0 : }
164 :
165 0 : fn entry_path(&self, tenant_id: TenantId, timeline_id: TimelineId, lsn: Lsn) -> Utf8PathBuf {
166 0 : self.data_dir
167 0 : .join(Self::entry_filename(tenant_id, timeline_id, lsn))
168 0 : }
169 :
170 0 : fn entry_tmp_path(
171 0 : &self,
172 0 : tenant_id: TenantId,
173 0 : timeline_id: TimelineId,
174 0 : lsn: Lsn,
175 0 : ) -> Utf8PathBuf {
176 0 : self.data_dir
177 0 : .join("tmp")
178 0 : .join(Self::entry_filename(tenant_id, timeline_id, lsn))
179 0 : }
180 :
181 0 : fn parse_entry_filename(filename: &str) -> Option<(TenantId, TimelineId, Lsn)> {
182 0 : let parts: Vec<&str> = filename
183 0 : .strip_prefix("basebackup_")?
184 0 : .strip_suffix(".tar.gz")?
185 0 : .split('_')
186 0 : .collect();
187 0 : if parts.len() != 3 {
188 0 : return None;
189 0 : }
190 0 : let tenant_id = parts[0].parse::<TenantId>().ok()?;
191 0 : let timeline_id = parts[1].parse::<TimelineId>().ok()?;
192 0 : let lsn = Lsn(u64::from_str_radix(parts[2], 16).ok()?);
193 :
194 0 : Some((tenant_id, timeline_id, lsn))
195 0 : }
196 :
197 0 : async fn cleanup(&self) -> anyhow::Result<()> {
198 0 : // Cleanup tmp directory.
199 0 : let tmp_dir = self.data_dir.join("tmp");
200 0 : let mut tmp_dir = tokio::fs::read_dir(&tmp_dir).await?;
201 0 : while let Some(dir_entry) = tmp_dir.next_entry().await? {
202 0 : if let Err(e) = tokio::fs::remove_file(dir_entry.path()).await {
203 0 : tracing::warn!("Failed to remove basebackup cache tmp file: {:#}", e);
204 0 : }
205 : }
206 :
207 : // Remove outdated entries.
208 0 : let entries_old = self.entries.lock().unwrap().clone();
209 0 : let mut entries_new = HashMap::new();
210 0 : for (tenant_shard_id, tenant_slot) in self.tenant_manager.list() {
211 0 : if !tenant_shard_id.is_shard_zero() {
212 0 : continue;
213 0 : }
214 0 : let TenantSlot::Attached(tenant) = tenant_slot else {
215 0 : continue;
216 : };
217 0 : let tenant_id = tenant_shard_id.tenant_id;
218 :
219 0 : for timeline in tenant.list_timelines() {
220 0 : let tti = TenantTimelineId::new(tenant_id, timeline.timeline_id);
221 0 : if let Some(&entry_lsn) = entries_old.get(&tti) {
222 0 : if timeline.get_last_record_lsn() <= entry_lsn {
223 0 : entries_new.insert(tti, entry_lsn);
224 0 : }
225 0 : }
226 : }
227 : }
228 :
229 0 : for (&tti, &lsn) in entries_old.iter() {
230 0 : if !entries_new.contains_key(&tti) {
231 0 : self.remove_entry_sender
232 0 : .send(self.entry_path(tti.tenant_id, tti.timeline_id, lsn))
233 0 : .unwrap();
234 0 : }
235 : }
236 :
237 0 : BASEBACKUP_CACHE_ENTRIES.set(entries_new.len() as i64);
238 0 : *self.entries.lock().unwrap() = entries_new;
239 0 :
240 0 : Ok(())
241 0 : }
242 :
243 0 : async fn on_startup(&self) -> anyhow::Result<()> {
244 0 : // Create data_dir and tmp directory if they do not exist.
245 0 : tokio::fs::create_dir_all(&self.data_dir.join("tmp"))
246 0 : .await
247 0 : .map_err(|e| {
248 0 : anyhow::anyhow!(
249 0 : "Failed to create basebackup cache data_dir {:?}: {:?}",
250 0 : self.data_dir,
251 0 : e
252 0 : )
253 0 : })?;
254 :
255 : // Read existing entries from the data_dir and add them to in-memory state.
256 0 : let mut entries = HashMap::new();
257 0 : let mut dir = tokio::fs::read_dir(&self.data_dir).await?;
258 0 : while let Some(dir_entry) = dir.next_entry().await? {
259 0 : let filename = dir_entry.file_name();
260 0 :
261 0 : if filename == "tmp" {
262 : // Skip the tmp directory.
263 0 : continue;
264 0 : }
265 0 :
266 0 : let parsed = Self::parse_entry_filename(filename.to_string_lossy().as_ref());
267 0 : let Some((tenant_id, timeline_id, lsn)) = parsed else {
268 0 : tracing::warn!("Invalid basebackup cache file name: {:?}", filename);
269 0 : continue;
270 : };
271 :
272 0 : let tti = TenantTimelineId::new(tenant_id, timeline_id);
273 :
274 : use std::collections::hash_map::Entry::*;
275 :
276 0 : match entries.entry(tti) {
277 0 : Occupied(mut entry) => {
278 0 : let entry_lsn = *entry.get();
279 0 : // Leave only the latest entry, remove the old one.
280 0 : if lsn < entry_lsn {
281 0 : self.remove_entry_sender.send(self.entry_path(
282 0 : tenant_id,
283 0 : timeline_id,
284 0 : lsn,
285 0 : ))?;
286 0 : } else if lsn > entry_lsn {
287 0 : self.remove_entry_sender.send(self.entry_path(
288 0 : tenant_id,
289 0 : timeline_id,
290 0 : entry_lsn,
291 0 : ))?;
292 0 : entry.insert(lsn);
293 : } else {
294 : // Two different filenames parsed to the same timline_id and LSN.
295 : // Should never happen.
296 0 : return Err(anyhow::anyhow!(
297 0 : "Duplicate basebackup cache entry with the same LSN: {:?}",
298 0 : filename
299 0 : ));
300 : }
301 : }
302 0 : Vacant(entry) => {
303 0 : entry.insert(lsn);
304 0 : }
305 : }
306 : }
307 :
308 0 : BASEBACKUP_CACHE_ENTRIES.set(entries.len() as i64);
309 0 : *self.entries.lock().unwrap() = entries;
310 0 :
311 0 : Ok(())
312 0 : }
313 :
314 0 : async fn background(
315 0 : self: Arc<Self>,
316 0 : mut prepare_receiver: BasebackupPrepareReceiver,
317 0 : mut remove_entry_receiver: BasebackupRemoveEntryReceiver,
318 0 : ) {
319 0 : // Panic in the background is a safe fallback.
320 0 : // It will drop receivers and the cache will be effectively disabled.
321 0 : self.on_startup()
322 0 : .await
323 0 : .expect("Failed to initialize basebackup cache");
324 0 :
325 0 : let mut cleanup_ticker = tokio::time::interval(self.config.cleanup_period);
326 0 : cleanup_ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
327 :
328 : loop {
329 0 : tokio::select! {
330 0 : Some(req) = prepare_receiver.recv() => {
331 0 : if let Err(err) = self.prepare_basebackup(
332 0 : req.tenant_shard_id,
333 0 : req.timeline_id,
334 0 : req.lsn,
335 0 : ).await {
336 0 : tracing::info!("Failed to prepare basebackup: {:#}", err);
337 0 : self.prepare_err_count.inc();
338 0 : continue;
339 0 : }
340 : }
341 0 : Some(req) = remove_entry_receiver.recv() => {
342 0 : if let Err(e) = tokio::fs::remove_file(req).await {
343 0 : tracing::warn!("Failed to remove basebackup cache file: {:#}", e);
344 0 : }
345 : }
346 0 : _ = cleanup_ticker.tick() => {
347 0 : self.cleanup().await.unwrap_or_else(|e| {
348 0 : tracing::warn!("Failed to clean up basebackup cache: {:#}", e);
349 0 : });
350 0 : }
351 0 : _ = self.cancel.cancelled() => {
352 0 : tracing::info!("BasebackupCache background task cancelled");
353 0 : break;
354 0 : }
355 0 : }
356 0 : }
357 0 : }
358 :
359 : /// Prepare a basebackup for the given timeline.
360 : ///
361 : /// If the basebackup already exists with a higher LSN or the timeline already
362 : /// has a higher last_record_lsn, skip the preparation.
363 : ///
364 : /// The basebackup is prepared in a temporary directory and then moved to the final
365 : /// location to make the operation atomic.
366 0 : async fn prepare_basebackup(
367 0 : &self,
368 0 : tenant_shard_id: TenantShardId,
369 0 : timeline_id: TimelineId,
370 0 : req_lsn: Lsn,
371 0 : ) -> anyhow::Result<()> {
372 0 : tracing::info!(
373 : tenant_id = %tenant_shard_id.tenant_id,
374 : %timeline_id,
375 : %req_lsn,
376 0 : "Preparing basebackup for timeline",
377 : );
378 :
379 0 : let tti = TenantTimelineId::new(tenant_shard_id.tenant_id, timeline_id);
380 0 :
381 0 : {
382 0 : let entries = self.entries.lock().unwrap();
383 0 : if let Some(&entry_lsn) = entries.get(&tti) {
384 0 : if entry_lsn >= req_lsn {
385 0 : tracing::info!(
386 : %timeline_id,
387 : %req_lsn,
388 : %entry_lsn,
389 0 : "Basebackup entry already exists for timeline with higher LSN, skipping basebackup",
390 : );
391 0 : self.prepare_skip_count.inc();
392 0 : return Ok(());
393 0 : }
394 0 : }
395 :
396 0 : if entries.len() as i64 >= self.config.max_size_entries {
397 0 : tracing::info!(
398 : %timeline_id,
399 : %req_lsn,
400 0 : "Basebackup cache is full, skipping basebackup",
401 : );
402 0 : self.prepare_skip_count.inc();
403 0 : return Ok(());
404 0 : }
405 : }
406 :
407 0 : let tenant = self
408 0 : .tenant_manager
409 0 : .get_attached_tenant_shard(tenant_shard_id)?;
410 :
411 0 : let tenant_state = tenant.current_state();
412 0 : if tenant_state != TenantState::Active {
413 0 : anyhow::bail!(
414 0 : "Tenant {} is not active, current state: {:?}",
415 0 : tenant_shard_id.tenant_id,
416 0 : tenant_state
417 0 : )
418 0 : }
419 :
420 0 : let timeline = tenant.get_timeline(timeline_id, true)?;
421 :
422 0 : let last_record_lsn = timeline.get_last_record_lsn();
423 0 : if last_record_lsn > req_lsn {
424 0 : tracing::info!(
425 : %timeline_id,
426 : %req_lsn,
427 : %last_record_lsn,
428 0 : "Timeline has a higher LSN than the requested one, skipping basebackup",
429 : );
430 0 : self.prepare_skip_count.inc();
431 0 : return Ok(());
432 0 : }
433 0 :
434 0 : let entry_tmp_path = self.entry_tmp_path(tenant_shard_id.tenant_id, timeline_id, req_lsn);
435 :
436 0 : let res = self
437 0 : .prepare_basebackup_tmp(&entry_tmp_path, &timeline, req_lsn)
438 0 : .await;
439 :
440 0 : if let Err(err) = res {
441 0 : tracing::info!("Failed to prepare basebackup tmp file: {:#}", err);
442 : // Try to clean up tmp file. If we fail, the background clean up task will take care of it.
443 0 : match tokio::fs::remove_file(&entry_tmp_path).await {
444 0 : Ok(_) => {}
445 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
446 0 : Err(e) => {
447 0 : tracing::info!("Failed to remove basebackup tmp file: {:?}", e);
448 : }
449 : }
450 0 : return Err(err);
451 0 : }
452 0 :
453 0 : // Move the tmp file to the final location atomically.
454 0 : let entry_path = self.entry_path(tenant_shard_id.tenant_id, timeline_id, req_lsn);
455 0 : tokio::fs::rename(&entry_tmp_path, &entry_path).await?;
456 :
457 0 : let mut entries = self.entries.lock().unwrap();
458 0 : if let Some(old_lsn) = entries.insert(tti, req_lsn) {
459 0 : // Remove the old entry if it exists.
460 0 : self.remove_entry_sender
461 0 : .send(self.entry_path(tenant_shard_id.tenant_id, timeline_id, old_lsn))
462 0 : .unwrap();
463 0 : }
464 0 : BASEBACKUP_CACHE_ENTRIES.set(entries.len() as i64);
465 0 :
466 0 : self.prepare_ok_count.inc();
467 0 : Ok(())
468 0 : }
469 :
470 : /// Prepares a basebackup in a temporary file.
471 0 : async fn prepare_basebackup_tmp(
472 0 : &self,
473 0 : emptry_tmp_path: &Utf8Path,
474 0 : timeline: &Arc<Timeline>,
475 0 : req_lsn: Lsn,
476 0 : ) -> anyhow::Result<()> {
477 0 : let ctx = RequestContext::new(TaskKind::BasebackupCache, DownloadBehavior::Download);
478 0 : let ctx = ctx.with_scope_timeline(timeline);
479 :
480 0 : let file = tokio::fs::File::create(emptry_tmp_path).await?;
481 0 : let mut writer = BufWriter::new(file);
482 0 :
483 0 : let mut encoder = GzipEncoder::with_quality(
484 0 : &mut writer,
485 0 : // Level::Best because compression is not on the hot path of basebackup requests.
486 0 : // The decompression is almost not affected by the compression level.
487 0 : async_compression::Level::Best,
488 0 : );
489 0 :
490 0 : // We may receive a request before the WAL record is applied to the timeline.
491 0 : // Wait for the requested LSN to be applied.
492 0 : timeline
493 0 : .wait_lsn(
494 0 : req_lsn,
495 0 : crate::tenant::timeline::WaitLsnWaiter::BaseBackupCache,
496 0 : crate::tenant::timeline::WaitLsnTimeout::Default,
497 0 : &ctx,
498 0 : )
499 0 : .await?;
500 :
501 0 : send_basebackup_tarball(
502 0 : &mut encoder,
503 0 : timeline,
504 0 : Some(req_lsn),
505 0 : None,
506 0 : false,
507 0 : false,
508 0 : &ctx,
509 0 : )
510 0 : .await?;
511 :
512 0 : encoder.shutdown().await?;
513 0 : writer.flush().await?;
514 0 : writer.into_inner().sync_all().await?;
515 :
516 0 : Ok(())
517 0 : }
518 : }
|