Line data Source code
1 : //! This module contains global `(tenant_id, timeline_id)` -> `Arc<Timeline>` mapping.
2 : //! All timelines should always be present in this map, this is done by loading them
3 : //! all from the disk on startup and keeping them in memory.
4 :
5 : use std::collections::HashMap;
6 : use std::str::FromStr;
7 : use std::sync::{Arc, Mutex};
8 : use std::time::{Duration, Instant};
9 :
10 : use anyhow::{Context, Result, bail};
11 : use camino::Utf8PathBuf;
12 : use camino_tempfile::Utf8TempDir;
13 : use safekeeper_api::membership::Configuration;
14 : use safekeeper_api::models::{SafekeeperUtilization, TimelineDeleteResult};
15 : use safekeeper_api::{ServerInfo, membership};
16 : use tokio::fs;
17 : use tracing::*;
18 : use utils::crashsafe::{durable_rename, fsync_async_opt};
19 : use utils::id::{TenantId, TenantTimelineId, TimelineId};
20 : use utils::lsn::Lsn;
21 :
22 : use crate::defaults::DEFAULT_EVICTION_CONCURRENCY;
23 : use crate::http::routes::DeleteOrExcludeError;
24 : use crate::rate_limit::RateLimiter;
25 : use crate::state::TimelinePersistentState;
26 : use crate::timeline::{Timeline, TimelineError, delete_dir, get_tenant_dir, get_timeline_dir};
27 : use crate::timelines_set::TimelinesSet;
28 : use crate::wal_backup::WalBackup;
29 : use crate::wal_storage::Storage;
30 : use crate::{SafeKeeperConf, control_file, wal_storage};
31 :
32 : // Timeline entry in the global map: either a ready timeline, or mark that it is
33 : // being created.
34 : #[derive(Clone)]
35 : enum GlobalMapTimeline {
36 : CreationInProgress,
37 : Timeline(Arc<Timeline>),
38 : }
39 :
40 : struct GlobalTimelinesState {
41 : timelines: HashMap<TenantTimelineId, GlobalMapTimeline>,
42 :
43 : // A tombstone indicates this timeline used to exist has been deleted. These are used to prevent
44 : // on-demand timeline creation from recreating deleted timelines. This is only soft-enforced, as
45 : // this map is dropped on restart.
46 : tombstones: HashMap<TenantTimelineId, Instant>,
47 :
48 : conf: Arc<SafeKeeperConf>,
49 : broker_active_set: Arc<TimelinesSet>,
50 : global_rate_limiter: RateLimiter,
51 : wal_backup: Arc<WalBackup>,
52 : }
53 :
54 : impl GlobalTimelinesState {
55 : /// Get dependencies for a timeline constructor.
56 0 : fn get_dependencies(
57 0 : &self,
58 0 : ) -> (
59 0 : Arc<SafeKeeperConf>,
60 0 : Arc<TimelinesSet>,
61 0 : RateLimiter,
62 0 : Arc<WalBackup>,
63 0 : ) {
64 0 : (
65 0 : self.conf.clone(),
66 0 : self.broker_active_set.clone(),
67 0 : self.global_rate_limiter.clone(),
68 0 : self.wal_backup.clone(),
69 0 : )
70 0 : }
71 :
72 : /// Get timeline from the map. Returns error if timeline doesn't exist or
73 : /// creation is in progress.
74 0 : fn get(&self, ttid: &TenantTimelineId) -> Result<Arc<Timeline>, TimelineError> {
75 0 : match self.timelines.get(ttid).cloned() {
76 0 : Some(GlobalMapTimeline::Timeline(tli)) => Ok(tli),
77 : Some(GlobalMapTimeline::CreationInProgress) => {
78 0 : Err(TimelineError::CreationInProgress(*ttid))
79 : }
80 0 : None => Err(TimelineError::NotFound(*ttid)),
81 : }
82 0 : }
83 :
84 0 : fn delete(&mut self, ttid: TenantTimelineId) {
85 0 : self.timelines.remove(&ttid);
86 0 : self.tombstones.insert(ttid, Instant::now());
87 0 : }
88 : }
89 :
90 : /// A struct used to manage access to the global timelines map.
91 : pub struct GlobalTimelines {
92 : state: Mutex<GlobalTimelinesState>,
93 : }
94 :
95 : impl GlobalTimelines {
96 : /// Create a new instance of the global timelines map.
97 0 : pub fn new(conf: Arc<SafeKeeperConf>, wal_backup: Arc<WalBackup>) -> Self {
98 0 : Self {
99 0 : state: Mutex::new(GlobalTimelinesState {
100 0 : timelines: HashMap::new(),
101 0 : tombstones: HashMap::new(),
102 0 : conf,
103 0 : broker_active_set: Arc::new(TimelinesSet::default()),
104 0 : global_rate_limiter: RateLimiter::new(1, 1),
105 0 : wal_backup,
106 0 : }),
107 0 : }
108 0 : }
109 :
110 : /// Inject dependencies needed for the timeline constructors and load all timelines to memory.
111 0 : pub async fn init(&self) -> Result<()> {
112 0 : // clippy isn't smart enough to understand that drop(state) releases the
113 0 : // lock, so use explicit block
114 0 : let tenants_dir = {
115 0 : let mut state = self.state.lock().unwrap();
116 0 : state.global_rate_limiter = RateLimiter::new(
117 0 : state.conf.partial_backup_concurrency,
118 0 : DEFAULT_EVICTION_CONCURRENCY,
119 0 : );
120 0 :
121 0 : // Iterate through all directories and load tenants for all directories
122 0 : // named as a valid tenant_id.
123 0 : state.conf.workdir.clone()
124 0 : };
125 0 : let mut tenant_count = 0;
126 0 : for tenants_dir_entry in std::fs::read_dir(&tenants_dir)
127 0 : .with_context(|| format!("failed to list tenants dir {}", tenants_dir))?
128 : {
129 0 : match &tenants_dir_entry {
130 0 : Ok(tenants_dir_entry) => {
131 0 : if let Ok(tenant_id) =
132 0 : TenantId::from_str(tenants_dir_entry.file_name().to_str().unwrap_or(""))
133 : {
134 0 : tenant_count += 1;
135 0 : self.load_tenant_timelines(tenant_id).await?;
136 0 : }
137 : }
138 0 : Err(e) => error!(
139 0 : "failed to list tenants dir entry {:?} in directory {}, reason: {:?}",
140 : tenants_dir_entry, tenants_dir, e
141 : ),
142 : }
143 : }
144 :
145 0 : info!(
146 0 : "found {} tenants directories, successfully loaded {} timelines",
147 0 : tenant_count,
148 0 : self.state.lock().unwrap().timelines.len()
149 : );
150 0 : Ok(())
151 0 : }
152 :
153 : /// Loads all timelines for the given tenant to memory. Returns fs::read_dir
154 : /// errors if any.
155 : ///
156 : /// It is async, but self.state lock is sync and there is no important
157 : /// reason to make it async (it is always held for a short while), so we
158 : /// just lock and unlock it for each timeline -- this function is called
159 : /// during init when nothing else is running, so this is fine.
160 0 : async fn load_tenant_timelines(&self, tenant_id: TenantId) -> Result<()> {
161 0 : let (conf, broker_active_set, partial_backup_rate_limiter, wal_backup) = {
162 0 : let state = self.state.lock().unwrap();
163 0 : state.get_dependencies()
164 0 : };
165 0 :
166 0 : let timelines_dir = get_tenant_dir(&conf, &tenant_id);
167 0 : for timelines_dir_entry in std::fs::read_dir(&timelines_dir)
168 0 : .with_context(|| format!("failed to list timelines dir {}", timelines_dir))?
169 : {
170 0 : match &timelines_dir_entry {
171 0 : Ok(timeline_dir_entry) => {
172 0 : if let Ok(timeline_id) =
173 0 : TimelineId::from_str(timeline_dir_entry.file_name().to_str().unwrap_or(""))
174 : {
175 0 : let ttid = TenantTimelineId::new(tenant_id, timeline_id);
176 0 : match Timeline::load_timeline(conf.clone(), ttid, wal_backup.clone()) {
177 0 : Ok(tli) => {
178 0 : let mut shared_state = tli.write_shared_state().await;
179 0 : self.state
180 0 : .lock()
181 0 : .unwrap()
182 0 : .timelines
183 0 : .insert(ttid, GlobalMapTimeline::Timeline(tli.clone()));
184 0 : tli.bootstrap(
185 0 : &mut shared_state,
186 0 : &conf,
187 0 : broker_active_set.clone(),
188 0 : partial_backup_rate_limiter.clone(),
189 0 : wal_backup.clone(),
190 0 : );
191 : }
192 : // If we can't load a timeline, it's most likely because of a corrupted
193 : // directory. We will log an error and won't allow to delete/recreate
194 : // this timeline. The only way to fix this timeline is to repair manually
195 : // and restart the safekeeper.
196 0 : Err(e) => error!(
197 0 : "failed to load timeline {} for tenant {}, reason: {:?}",
198 : timeline_id, tenant_id, e
199 : ),
200 : }
201 0 : }
202 : }
203 0 : Err(e) => error!(
204 0 : "failed to list timelines dir entry {:?} in directory {}, reason: {:?}",
205 : timelines_dir_entry, timelines_dir, e
206 : ),
207 : }
208 : }
209 :
210 0 : Ok(())
211 0 : }
212 :
213 : /// Get the number of timelines in the map.
214 0 : pub fn timelines_count(&self) -> usize {
215 0 : self.state.lock().unwrap().timelines.len()
216 0 : }
217 :
218 : /// Get the global safekeeper config.
219 0 : pub fn get_global_config(&self) -> Arc<SafeKeeperConf> {
220 0 : self.state.lock().unwrap().conf.clone()
221 0 : }
222 :
223 0 : pub fn get_global_broker_active_set(&self) -> Arc<TimelinesSet> {
224 0 : self.state.lock().unwrap().broker_active_set.clone()
225 0 : }
226 :
227 0 : pub fn get_wal_backup(&self) -> Arc<WalBackup> {
228 0 : self.state.lock().unwrap().wal_backup.clone()
229 0 : }
230 :
231 : /// Create a new timeline with the given id. If the timeline already exists, returns
232 : /// an existing timeline.
233 0 : pub(crate) async fn create(
234 0 : &self,
235 0 : ttid: TenantTimelineId,
236 0 : mconf: Configuration,
237 0 : server_info: ServerInfo,
238 0 : start_lsn: Lsn,
239 0 : commit_lsn: Lsn,
240 0 : ) -> Result<Arc<Timeline>> {
241 0 : let (conf, _, _, _) = {
242 0 : let state = self.state.lock().unwrap();
243 0 : if let Ok(timeline) = state.get(&ttid) {
244 : // Timeline already exists, return it.
245 0 : return Ok(timeline);
246 0 : }
247 0 :
248 0 : if state.tombstones.contains_key(&ttid) {
249 0 : anyhow::bail!("Timeline {ttid} is deleted, refusing to recreate");
250 0 : }
251 0 :
252 0 : state.get_dependencies()
253 0 : };
254 0 :
255 0 : info!("creating new timeline {}", ttid);
256 :
257 : // Do on disk initialization in tmp dir.
258 0 : let (_tmp_dir, tmp_dir_path) = create_temp_timeline_dir(&conf, ttid).await?;
259 :
260 : // TODO: currently we create only cfile. It would be reasonable to
261 : // immediately initialize first WAL segment as well.
262 0 : let state = TimelinePersistentState::new(&ttid, mconf, server_info, start_lsn, commit_lsn)?;
263 0 : control_file::FileStorage::create_new(&tmp_dir_path, state, conf.no_sync).await?;
264 0 : let timeline = self.load_temp_timeline(ttid, &tmp_dir_path, true).await?;
265 0 : Ok(timeline)
266 0 : }
267 :
268 : /// Move timeline from a temp directory to the main storage, and load it to
269 : /// the global map. Creating timeline in this way ensures atomicity: rename
270 : /// is atomic, so either move of the whole datadir succeeds or it doesn't,
271 : /// but corrupted data dir shouldn't be possible.
272 : ///
273 : /// We'd like to avoid holding map lock while doing IO, so it's a 3 step
274 : /// process:
275 : /// 1) check the global map that timeline doesn't exist and mark that we're
276 : /// creating it;
277 : /// 2) move the directory and load the timeline
278 : /// 3) take lock again and insert the timeline into the global map.
279 0 : pub async fn load_temp_timeline(
280 0 : &self,
281 0 : ttid: TenantTimelineId,
282 0 : tmp_path: &Utf8PathBuf,
283 0 : check_tombstone: bool,
284 0 : ) -> Result<Arc<Timeline>> {
285 : // Check for existence and mark that we're creating it.
286 0 : let (conf, broker_active_set, partial_backup_rate_limiter, wal_backup) = {
287 0 : let mut state = self.state.lock().unwrap();
288 0 : match state.timelines.get(&ttid) {
289 : Some(GlobalMapTimeline::CreationInProgress) => {
290 0 : bail!(TimelineError::CreationInProgress(ttid));
291 : }
292 : Some(GlobalMapTimeline::Timeline(_)) => {
293 0 : bail!(TimelineError::AlreadyExists(ttid));
294 : }
295 0 : _ => {}
296 0 : }
297 0 : if check_tombstone {
298 0 : if state.tombstones.contains_key(&ttid) {
299 0 : anyhow::bail!("timeline {ttid} is deleted, refusing to recreate");
300 0 : }
301 : } else {
302 : // We may be have been asked to load a timeline that was previously deleted (e.g. from `pull_timeline.rs`). We trust
303 : // that the human doing this manual intervention knows what they are doing, and remove its tombstone.
304 0 : if state.tombstones.remove(&ttid).is_some() {
305 0 : warn!("un-deleted timeline {ttid}");
306 0 : }
307 : }
308 0 : state
309 0 : .timelines
310 0 : .insert(ttid, GlobalMapTimeline::CreationInProgress);
311 0 : state.get_dependencies()
312 0 : };
313 0 :
314 0 : // Do the actual move and reflect the result in the map.
315 0 : match GlobalTimelines::install_temp_timeline(
316 0 : ttid,
317 0 : tmp_path,
318 0 : conf.clone(),
319 0 : wal_backup.clone(),
320 0 : )
321 0 : .await
322 : {
323 0 : Ok(timeline) => {
324 0 : let mut timeline_shared_state = timeline.write_shared_state().await;
325 0 : let mut state = self.state.lock().unwrap();
326 0 : assert!(matches!(
327 0 : state.timelines.get(&ttid),
328 : Some(GlobalMapTimeline::CreationInProgress)
329 : ));
330 :
331 0 : state
332 0 : .timelines
333 0 : .insert(ttid, GlobalMapTimeline::Timeline(timeline.clone()));
334 0 : drop(state);
335 0 : timeline.bootstrap(
336 0 : &mut timeline_shared_state,
337 0 : &conf,
338 0 : broker_active_set,
339 0 : partial_backup_rate_limiter,
340 0 : wal_backup,
341 0 : );
342 0 : drop(timeline_shared_state);
343 0 : Ok(timeline)
344 : }
345 0 : Err(e) => {
346 0 : // Init failed, remove the marker from the map
347 0 : let mut state = self.state.lock().unwrap();
348 0 : assert!(matches!(
349 0 : state.timelines.get(&ttid),
350 : Some(GlobalMapTimeline::CreationInProgress)
351 : ));
352 0 : state.timelines.remove(&ttid);
353 0 : Err(e)
354 : }
355 : }
356 0 : }
357 :
358 : /// Main part of load_temp_timeline: do the move and load.
359 0 : async fn install_temp_timeline(
360 0 : ttid: TenantTimelineId,
361 0 : tmp_path: &Utf8PathBuf,
362 0 : conf: Arc<SafeKeeperConf>,
363 0 : wal_backup: Arc<WalBackup>,
364 0 : ) -> Result<Arc<Timeline>> {
365 0 : let tenant_path = get_tenant_dir(conf.as_ref(), &ttid.tenant_id);
366 0 : let timeline_path = get_timeline_dir(conf.as_ref(), &ttid);
367 0 :
368 0 : // We must have already checked that timeline doesn't exist in the map,
369 0 : // but there might be existing datadir: if timeline is corrupted it is
370 0 : // not loaded. We don't want to overwrite such a dir, so check for its
371 0 : // existence.
372 0 : match fs::metadata(&timeline_path).await {
373 : Ok(_) => {
374 : // Timeline directory exists on disk, we should leave state unchanged
375 : // and return error.
376 0 : bail!(TimelineError::Invalid(ttid));
377 : }
378 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
379 0 : Err(e) => {
380 0 : return Err(e.into());
381 : }
382 : }
383 :
384 0 : info!(
385 0 : "moving timeline {} from {} to {}",
386 : ttid, tmp_path, timeline_path
387 : );
388 :
389 : // Now it is safe to move the timeline directory to the correct
390 : // location. First, create tenant directory. Ignore error if it already
391 : // exists.
392 0 : if let Err(e) = tokio::fs::create_dir(&tenant_path).await {
393 0 : if e.kind() != std::io::ErrorKind::AlreadyExists {
394 0 : return Err(e.into());
395 0 : }
396 0 : }
397 : // fsync it
398 0 : fsync_async_opt(&tenant_path, !conf.no_sync).await?;
399 : // and its creation
400 0 : fsync_async_opt(&conf.workdir, !conf.no_sync).await?;
401 :
402 : // Do the move.
403 0 : durable_rename(tmp_path, &timeline_path, !conf.no_sync).await?;
404 :
405 0 : Timeline::load_timeline(conf, ttid, wal_backup)
406 0 : }
407 :
408 : /// Get a timeline from the global map. If it's not present, it doesn't exist on disk,
409 : /// or was corrupted and couldn't be loaded on startup. Returned timeline is always valid,
410 : /// i.e. loaded in memory and not cancelled.
411 0 : pub(crate) fn get(&self, ttid: TenantTimelineId) -> Result<Arc<Timeline>, TimelineError> {
412 0 : let tli_res = {
413 0 : let state = self.state.lock().unwrap();
414 0 : state.get(&ttid)
415 0 : };
416 0 : match tli_res {
417 0 : Ok(tli) => {
418 0 : if tli.is_cancelled() {
419 0 : return Err(TimelineError::Cancelled(ttid));
420 0 : }
421 0 : Ok(tli)
422 : }
423 0 : _ => tli_res,
424 : }
425 0 : }
426 :
427 : /// Returns all timelines. This is used for background timeline processes.
428 0 : pub fn get_all(&self) -> Vec<Arc<Timeline>> {
429 0 : let global_lock = self.state.lock().unwrap();
430 0 : global_lock
431 0 : .timelines
432 0 : .values()
433 0 : .filter_map(|t| match t {
434 0 : GlobalMapTimeline::Timeline(t) => {
435 0 : if t.is_cancelled() {
436 0 : None
437 : } else {
438 0 : Some(t.clone())
439 : }
440 : }
441 0 : _ => None,
442 0 : })
443 0 : .collect()
444 0 : }
445 :
446 : /// Returns statistics about timeline counts
447 0 : pub fn get_timeline_counts(&self) -> SafekeeperUtilization {
448 0 : let global_lock = self.state.lock().unwrap();
449 0 : let timeline_count = global_lock
450 0 : .timelines
451 0 : .values()
452 0 : .filter(|t| match t {
453 0 : GlobalMapTimeline::CreationInProgress => false,
454 0 : GlobalMapTimeline::Timeline(t) => !t.is_cancelled(),
455 0 : })
456 0 : .count() as u64;
457 0 : SafekeeperUtilization { timeline_count }
458 0 : }
459 :
460 : /// Returns all timelines belonging to a given tenant. Used for deleting all timelines of a tenant,
461 : /// and that's why it can return cancelled timelines, to retry deleting them.
462 0 : fn get_all_for_tenant(&self, tenant_id: TenantId) -> Vec<Arc<Timeline>> {
463 0 : let global_lock = self.state.lock().unwrap();
464 0 : global_lock
465 0 : .timelines
466 0 : .values()
467 0 : .filter_map(|t| match t {
468 0 : GlobalMapTimeline::Timeline(t) => Some(t.clone()),
469 0 : _ => None,
470 0 : })
471 0 : .filter(|t| t.ttid.tenant_id == tenant_id)
472 0 : .collect()
473 0 : }
474 :
475 : /// Delete timeline, only locally on this node or globally (also cleaning
476 : /// remote storage WAL), depending on `action` value.
477 0 : pub(crate) async fn delete_or_exclude(
478 0 : &self,
479 0 : ttid: &TenantTimelineId,
480 0 : action: DeleteOrExclude,
481 0 : ) -> Result<TimelineDeleteResult, DeleteOrExcludeError> {
482 0 : let tli_res = {
483 0 : let state = self.state.lock().unwrap();
484 0 :
485 0 : if state.tombstones.contains_key(ttid) {
486 : // Presence of a tombstone guarantees that a previous deletion has completed and there is no work to do.
487 0 : info!("Timeline {ttid} was already deleted");
488 0 : return Ok(TimelineDeleteResult { dir_existed: false });
489 0 : }
490 0 :
491 0 : state.get(ttid)
492 : };
493 :
494 0 : let result = match tli_res {
495 0 : Ok(timeline) => {
496 0 : info!("deleting timeline {}, action={:?}", ttid, action);
497 :
498 : // If node is getting excluded, check the generation first.
499 : // Then, while holding the lock cancel the timeline; it will be
500 : // unusable after this point, and if node is added back first
501 : // deletion must be completed and node seeded anew.
502 : //
503 : // We would like to avoid holding the lock while waiting for the
504 : // gate to finish as this is deadlock prone, so for actual
505 : // deletion will take it second time.
506 0 : if let DeleteOrExclude::Exclude(ref mconf) = action {
507 0 : let shared_state = timeline.read_shared_state().await;
508 0 : if shared_state.sk.state().mconf.generation > mconf.generation {
509 0 : return Err(DeleteOrExcludeError::Conflict {
510 0 : requested: mconf.clone(),
511 0 : current: shared_state.sk.state().mconf.clone(),
512 0 : });
513 0 : }
514 0 : timeline.cancel().await;
515 : } else {
516 0 : timeline.cancel().await;
517 : }
518 :
519 0 : timeline.close().await;
520 :
521 0 : info!("timeline {ttid} shut down for deletion");
522 :
523 : // Take a lock and finish the deletion holding this mutex.
524 0 : let mut shared_state = timeline.write_shared_state().await;
525 :
526 0 : let only_local = !matches!(action, DeleteOrExclude::Delete);
527 0 : let dir_existed = timeline.delete(&mut shared_state, only_local).await?;
528 :
529 0 : Ok(TimelineDeleteResult { dir_existed })
530 : }
531 : Err(_) => {
532 : // Timeline is not memory, but it may still exist on disk in broken state.
533 0 : let dir_path = get_timeline_dir(self.state.lock().unwrap().conf.as_ref(), ttid);
534 0 : let dir_existed = delete_dir(&dir_path).await?;
535 :
536 0 : Ok(TimelineDeleteResult { dir_existed })
537 : }
538 : };
539 :
540 : // Finalize deletion, by dropping Timeline objects and storing smaller tombstones. The tombstones
541 : // are used to prevent still-running computes from re-creating the same timeline when they send data,
542 : // and to speed up repeated deletion calls by avoiding re-listing objects.
543 0 : self.state.lock().unwrap().delete(*ttid);
544 0 :
545 0 : result
546 0 : }
547 :
548 : /// Deactivates and deletes all timelines for the tenant. Returns map of all timelines which
549 : /// the tenant had, `true` if a timeline was active. There may be a race if new timelines are
550 : /// created simultaneously. In that case the function will return error and the caller should
551 : /// retry tenant deletion again later.
552 : ///
553 : /// If only_local, doesn't remove WAL segments in remote storage.
554 0 : pub async fn delete_all_for_tenant(
555 0 : &self,
556 0 : tenant_id: &TenantId,
557 0 : action: DeleteOrExclude,
558 0 : ) -> Result<HashMap<TenantTimelineId, TimelineDeleteResult>> {
559 0 : info!("deleting all timelines for tenant {}", tenant_id);
560 0 : let to_delete = self.get_all_for_tenant(*tenant_id);
561 0 :
562 0 : let mut err = None;
563 0 :
564 0 : let mut deleted = HashMap::new();
565 0 : for tli in &to_delete {
566 0 : match self.delete_or_exclude(&tli.ttid, action.clone()).await {
567 0 : Ok(result) => {
568 0 : deleted.insert(tli.ttid, result);
569 0 : }
570 0 : Err(e) => {
571 0 : error!("failed to delete timeline {}: {}", tli.ttid, e);
572 : // Save error to return later.
573 0 : err = Some(e);
574 : }
575 : }
576 : }
577 :
578 : // If there was an error, return it.
579 0 : if let Some(e) = err {
580 0 : return Err(anyhow::Error::from(e));
581 0 : }
582 0 :
583 0 : // There may be broken timelines on disk, so delete the whole tenant dir as well.
584 0 : // Note that we could concurrently create new timelines while we were deleting them,
585 0 : // so the directory may be not empty. In this case timelines will have bad state
586 0 : // and timeline background jobs can panic.
587 0 : let tenant_dir = get_tenant_dir(self.state.lock().unwrap().conf.as_ref(), tenant_id);
588 0 : delete_dir(&tenant_dir).await?;
589 :
590 0 : Ok(deleted)
591 0 : }
592 :
593 0 : pub fn housekeeping(&self, tombstone_ttl: &Duration) {
594 0 : let mut state = self.state.lock().unwrap();
595 0 :
596 0 : // We keep tombstones long enough to have a good chance of preventing rogue computes from re-creating deleted
597 0 : // timelines. If a compute kept running for longer than this TTL (or across a safekeeper restart) then they
598 0 : // may recreate a deleted timeline.
599 0 : let now = Instant::now();
600 0 : state
601 0 : .tombstones
602 0 : .retain(|_, v| now.duration_since(*v) < *tombstone_ttl);
603 0 : }
604 : }
605 :
606 : /// Action for delete_or_exclude.
607 : #[derive(Clone, Debug)]
608 : pub enum DeleteOrExclude {
609 : /// Delete timeline globally.
610 : Delete,
611 : /// Legacy mode until we fully migrate to generations: like exclude deletes
612 : /// timeline only locally, but ignores generation number.
613 : DeleteLocal,
614 : /// This node is getting excluded, delete timeline locally.
615 : Exclude(membership::Configuration),
616 : }
617 :
618 : /// Create temp directory for a new timeline. It needs to be located on the same
619 : /// filesystem as the rest of the timelines. It will be automatically deleted when
620 : /// Utf8TempDir goes out of scope.
621 0 : pub async fn create_temp_timeline_dir(
622 0 : conf: &SafeKeeperConf,
623 0 : ttid: TenantTimelineId,
624 0 : ) -> Result<(Utf8TempDir, Utf8PathBuf)> {
625 0 : let temp_base = conf.workdir.join("tmp");
626 0 :
627 0 : tokio::fs::create_dir_all(&temp_base).await?;
628 :
629 0 : let tli_dir = camino_tempfile::Builder::new()
630 0 : .suffix("_temptli")
631 0 : .prefix(&format!("{}_{}_", ttid.tenant_id, ttid.timeline_id))
632 0 : .tempdir_in(temp_base)?;
633 :
634 0 : let tli_dir_path = tli_dir.path().to_path_buf();
635 0 :
636 0 : Ok((tli_dir, tli_dir_path))
637 0 : }
638 :
639 : /// Do basic validation of a temp timeline, before moving it to the global map.
640 0 : pub async fn validate_temp_timeline(
641 0 : conf: &SafeKeeperConf,
642 0 : ttid: TenantTimelineId,
643 0 : path: &Utf8PathBuf,
644 0 : ) -> Result<(Lsn, Lsn)> {
645 0 : let control_path = path.join("safekeeper.control");
646 :
647 0 : let control_store = control_file::FileStorage::load_control_file(control_path)?;
648 0 : if control_store.server.wal_seg_size == 0 {
649 0 : bail!("wal_seg_size is not set");
650 0 : }
651 :
652 0 : let wal_store = wal_storage::PhysicalStorage::new(&ttid, path, &control_store, conf.no_sync)?;
653 :
654 0 : let commit_lsn = control_store.commit_lsn;
655 0 : let flush_lsn = wal_store.flush_lsn();
656 0 :
657 0 : Ok((commit_lsn, flush_lsn))
658 0 : }
|