Line data Source code
1 : //! This module contains global `(tenant_id, timeline_id)` -> `Arc<Timeline>` mapping.
2 : //! All timelines should always be present in this map, this is done by loading them
3 : //! all from the disk on startup and keeping them in memory.
4 :
5 : use std::collections::HashMap;
6 : use std::str::FromStr;
7 : use std::sync::{Arc, Mutex};
8 : use std::time::{Duration, Instant};
9 :
10 : use anyhow::{Context, Result, bail};
11 : use camino::Utf8PathBuf;
12 : use camino_tempfile::Utf8TempDir;
13 : use safekeeper_api::membership::{Configuration, SafekeeperGeneration};
14 : use safekeeper_api::models::{SafekeeperUtilization, TimelineDeleteResult};
15 : use safekeeper_api::{ServerInfo, membership};
16 : use tokio::fs;
17 : use tracing::*;
18 : use utils::crashsafe::{durable_rename, fsync_async_opt};
19 : use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
20 : use utils::lsn::Lsn;
21 :
22 : use crate::defaults::DEFAULT_EVICTION_CONCURRENCY;
23 : use crate::http::routes::DeleteOrExcludeError;
24 : use crate::rate_limit::RateLimiter;
25 : use crate::state::TimelinePersistentState;
26 : use crate::timeline::{Timeline, TimelineError, delete_dir, get_tenant_dir, get_timeline_dir};
27 : use crate::timelines_set::TimelinesSet;
28 : use crate::wal_backup::WalBackup;
29 : use crate::wal_storage::Storage;
30 : use crate::{SafeKeeperConf, control_file, wal_storage};
31 :
32 : // Timeline entry in the global map: either a ready timeline, or mark that it is
33 : // being created.
34 : #[derive(Clone)]
35 : enum GlobalMapTimeline {
36 : CreationInProgress,
37 : Timeline(Arc<Timeline>),
38 : }
39 :
40 : struct GlobalTimelinesState {
41 : timelines: HashMap<TenantTimelineId, GlobalMapTimeline>,
42 :
43 : /// A tombstone indicates this timeline used to exist has been deleted. These are used to prevent
44 : /// on-demand timeline creation from recreating deleted timelines. This is only soft-enforced, as
45 : /// this map is dropped on restart.
46 : /// The timeline might also be locally deleted (excluded) via safekeeper migration algorithm. In that case,
47 : /// the tombsone contains the corresponding safekeeper generation. The pull_timeline requests with
48 : /// higher generation ignore such tombstones and can recreate the timeline.
49 : timeline_tombstones: HashMap<TenantTimelineId, TimelineTombstone>,
50 : /// A tombstone indicates that the tenant used to exist has been deleted.
51 : /// These are created only by tenant_delete requests. They are always valid regardless of the
52 : /// request generation.
53 : /// This is only soft-enforced, as this map is dropped on restart.
54 : tenant_tombstones: HashMap<TenantId, Instant>,
55 :
56 : conf: Arc<SafeKeeperConf>,
57 : broker_active_set: Arc<TimelinesSet>,
58 : global_rate_limiter: RateLimiter,
59 : wal_backup: Arc<WalBackup>,
60 : }
61 :
62 : impl GlobalTimelinesState {
63 : /// Get dependencies for a timeline constructor.
64 0 : fn get_dependencies(
65 0 : &self,
66 0 : ) -> (
67 0 : Arc<SafeKeeperConf>,
68 0 : Arc<TimelinesSet>,
69 0 : RateLimiter,
70 0 : Arc<WalBackup>,
71 0 : ) {
72 0 : (
73 0 : self.conf.clone(),
74 0 : self.broker_active_set.clone(),
75 0 : self.global_rate_limiter.clone(),
76 0 : self.wal_backup.clone(),
77 0 : )
78 0 : }
79 :
80 : /// Get timeline from the map. Returns error if timeline doesn't exist or
81 : /// creation is in progress.
82 0 : fn get(&self, ttid: &TenantTimelineId) -> Result<Arc<Timeline>, TimelineError> {
83 0 : match self.timelines.get(ttid).cloned() {
84 0 : Some(GlobalMapTimeline::Timeline(tli)) => Ok(tli),
85 : Some(GlobalMapTimeline::CreationInProgress) => {
86 0 : Err(TimelineError::CreationInProgress(*ttid))
87 : }
88 : None => {
89 0 : if self.has_tombstone(ttid, None) {
90 0 : Err(TimelineError::Deleted(*ttid))
91 : } else {
92 0 : Err(TimelineError::NotFound(*ttid))
93 : }
94 : }
95 : }
96 0 : }
97 :
98 0 : fn has_timeline_tombstone(
99 0 : &self,
100 0 : ttid: &TenantTimelineId,
101 0 : generation: Option<SafekeeperGeneration>,
102 0 : ) -> bool {
103 0 : if let Some(generation) = generation {
104 0 : self.timeline_tombstones
105 0 : .get(ttid)
106 0 : .is_some_and(|t| t.is_valid(generation))
107 : } else {
108 0 : self.timeline_tombstones.contains_key(ttid)
109 : }
110 0 : }
111 :
112 0 : fn has_tenant_tombstone(&self, tenant_id: &TenantId) -> bool {
113 0 : self.tenant_tombstones.contains_key(tenant_id)
114 0 : }
115 :
116 : /// Check if the state has a tenant or a timeline tombstone.
117 : /// If `generation` is provided, check only for timeline tombsotnes with same or higher generation.
118 : /// If `generation` is `None`, check for any timeline tombstone.
119 : /// Tenant tombstones are checked regardless of the generation.
120 0 : fn has_tombstone(
121 0 : &self,
122 0 : ttid: &TenantTimelineId,
123 0 : generation: Option<SafekeeperGeneration>,
124 0 : ) -> bool {
125 0 : self.has_timeline_tombstone(ttid, generation) || self.has_tenant_tombstone(&ttid.tenant_id)
126 0 : }
127 :
128 : /// Removes timeline tombstone for the given timeline ID.
129 : /// Returns `true` if there have been actual changes.
130 0 : fn remove_timeline_tombstone(&mut self, ttid: &TenantTimelineId) -> bool {
131 0 : self.timeline_tombstones.remove(ttid).is_some()
132 0 : }
133 :
134 0 : fn delete(&mut self, ttid: TenantTimelineId, generation: Option<SafekeeperGeneration>) {
135 0 : self.timelines.remove(&ttid);
136 0 : self.timeline_tombstones
137 0 : .insert(ttid, TimelineTombstone::new(generation));
138 0 : }
139 :
140 0 : fn add_tenant_tombstone(&mut self, tenant_id: TenantId) {
141 0 : self.tenant_tombstones.insert(tenant_id, Instant::now());
142 0 : }
143 : }
144 :
145 : /// A struct used to manage access to the global timelines map.
146 : pub struct GlobalTimelines {
147 : state: Mutex<GlobalTimelinesState>,
148 : }
149 :
150 : impl GlobalTimelines {
151 : /// Create a new instance of the global timelines map.
152 0 : pub fn new(conf: Arc<SafeKeeperConf>, wal_backup: Arc<WalBackup>) -> Self {
153 0 : Self {
154 0 : state: Mutex::new(GlobalTimelinesState {
155 0 : timelines: HashMap::new(),
156 0 : timeline_tombstones: HashMap::new(),
157 0 : tenant_tombstones: HashMap::new(),
158 0 : conf,
159 0 : broker_active_set: Arc::new(TimelinesSet::default()),
160 0 : global_rate_limiter: RateLimiter::new(1, 1),
161 0 : wal_backup,
162 0 : }),
163 0 : }
164 0 : }
165 :
166 : /// Inject dependencies needed for the timeline constructors and load all timelines to memory.
167 0 : pub async fn init(&self) -> Result<()> {
168 : // clippy isn't smart enough to understand that drop(state) releases the
169 : // lock, so use explicit block
170 0 : let tenants_dir = {
171 0 : let mut state = self.state.lock().unwrap();
172 0 : state.global_rate_limiter = RateLimiter::new(
173 0 : state.conf.partial_backup_concurrency,
174 : DEFAULT_EVICTION_CONCURRENCY,
175 : );
176 :
177 : // Iterate through all directories and load tenants for all directories
178 : // named as a valid tenant_id.
179 0 : state.conf.workdir.clone()
180 : };
181 0 : let mut tenant_count = 0;
182 0 : for tenants_dir_entry in std::fs::read_dir(&tenants_dir)
183 0 : .with_context(|| format!("failed to list tenants dir {tenants_dir}"))?
184 : {
185 0 : match &tenants_dir_entry {
186 0 : Ok(tenants_dir_entry) => {
187 0 : if let Ok(tenant_id) =
188 0 : TenantId::from_str(tenants_dir_entry.file_name().to_str().unwrap_or(""))
189 : {
190 0 : tenant_count += 1;
191 0 : self.load_tenant_timelines(tenant_id).await?;
192 0 : }
193 : }
194 0 : Err(e) => error!(
195 0 : "failed to list tenants dir entry {:?} in directory {}, reason: {:?}",
196 : tenants_dir_entry, tenants_dir, e
197 : ),
198 : }
199 : }
200 :
201 0 : info!(
202 0 : "found {} tenants directories, successfully loaded {} timelines",
203 : tenant_count,
204 0 : self.state.lock().unwrap().timelines.len()
205 : );
206 0 : Ok(())
207 0 : }
208 :
209 : /// Loads all timelines for the given tenant to memory. Returns fs::read_dir
210 : /// errors if any.
211 : ///
212 : /// It is async, but self.state lock is sync and there is no important
213 : /// reason to make it async (it is always held for a short while), so we
214 : /// just lock and unlock it for each timeline -- this function is called
215 : /// during init when nothing else is running, so this is fine.
216 0 : async fn load_tenant_timelines(&self, tenant_id: TenantId) -> Result<()> {
217 0 : let (conf, broker_active_set, partial_backup_rate_limiter, wal_backup) = {
218 0 : let state = self.state.lock().unwrap();
219 0 : state.get_dependencies()
220 0 : };
221 :
222 0 : let timelines_dir = get_tenant_dir(&conf, &tenant_id);
223 0 : for timelines_dir_entry in std::fs::read_dir(&timelines_dir)
224 0 : .with_context(|| format!("failed to list timelines dir {timelines_dir}"))?
225 : {
226 0 : match &timelines_dir_entry {
227 0 : Ok(timeline_dir_entry) => {
228 0 : if let Ok(timeline_id) =
229 0 : TimelineId::from_str(timeline_dir_entry.file_name().to_str().unwrap_or(""))
230 : {
231 0 : let ttid = TenantTimelineId::new(tenant_id, timeline_id);
232 0 : match Timeline::load_timeline(conf.clone(), ttid, wal_backup.clone()) {
233 0 : Ok(tli) => {
234 0 : let mut shared_state = tli.write_shared_state().await;
235 0 : self.state
236 0 : .lock()
237 0 : .unwrap()
238 0 : .timelines
239 0 : .insert(ttid, GlobalMapTimeline::Timeline(tli.clone()));
240 0 : tli.bootstrap(
241 0 : &mut shared_state,
242 0 : &conf,
243 0 : broker_active_set.clone(),
244 0 : partial_backup_rate_limiter.clone(),
245 0 : wal_backup.clone(),
246 : );
247 : }
248 : // If we can't load a timeline, it's most likely because of a corrupted
249 : // directory. We will log an error and won't allow to delete/recreate
250 : // this timeline. The only way to fix this timeline is to repair manually
251 : // and restart the safekeeper.
252 0 : Err(e) => error!(
253 0 : "failed to load timeline {} for tenant {}, reason: {:?}",
254 : timeline_id, tenant_id, e
255 : ),
256 : }
257 0 : }
258 : }
259 0 : Err(e) => error!(
260 0 : "failed to list timelines dir entry {:?} in directory {}, reason: {:?}",
261 : timelines_dir_entry, timelines_dir, e
262 : ),
263 : }
264 : }
265 :
266 0 : Ok(())
267 0 : }
268 :
269 : /// Get the number of timelines in the map.
270 0 : pub fn timelines_count(&self) -> usize {
271 0 : self.state.lock().unwrap().timelines.len()
272 0 : }
273 :
274 : /// Get the global safekeeper config.
275 0 : pub fn get_global_config(&self) -> Arc<SafeKeeperConf> {
276 0 : self.state.lock().unwrap().conf.clone()
277 0 : }
278 :
279 0 : pub fn get_global_broker_active_set(&self) -> Arc<TimelinesSet> {
280 0 : self.state.lock().unwrap().broker_active_set.clone()
281 0 : }
282 :
283 0 : pub fn get_wal_backup(&self) -> Arc<WalBackup> {
284 0 : self.state.lock().unwrap().wal_backup.clone()
285 0 : }
286 :
287 : /// Create a new timeline with the given id. If the timeline already exists, returns
288 : /// an existing timeline.
289 0 : pub(crate) async fn create(
290 0 : &self,
291 0 : ttid: TenantTimelineId,
292 0 : mconf: Configuration,
293 0 : server_info: ServerInfo,
294 0 : start_lsn: Lsn,
295 0 : commit_lsn: Lsn,
296 0 : ) -> Result<Arc<Timeline>> {
297 0 : let generation = Some(mconf.generation);
298 :
299 0 : let (conf, _, _, _) = {
300 0 : let state = self.state.lock().unwrap();
301 0 : if let Ok(timeline) = state.get(&ttid) {
302 : // Timeline already exists, return it.
303 0 : return Ok(timeline);
304 0 : }
305 :
306 0 : if state.has_tombstone(&ttid, generation) {
307 0 : anyhow::bail!(TimelineError::Deleted(ttid));
308 0 : }
309 :
310 0 : state.get_dependencies()
311 : };
312 :
313 0 : info!("creating new timeline {}", ttid);
314 :
315 : // Do on disk initialization in tmp dir.
316 0 : let (_tmp_dir, tmp_dir_path) = create_temp_timeline_dir(&conf, ttid).await?;
317 :
318 : // TODO: currently we create only cfile. It would be reasonable to
319 : // immediately initialize first WAL segment as well.
320 0 : let state = TimelinePersistentState::new(&ttid, mconf, server_info, start_lsn, commit_lsn)?;
321 0 : control_file::FileStorage::create_new(&tmp_dir_path, state, conf.no_sync).await?;
322 0 : let timeline = self
323 0 : .load_temp_timeline(ttid, &tmp_dir_path, generation)
324 0 : .await?;
325 0 : Ok(timeline)
326 0 : }
327 :
328 : /// Move timeline from a temp directory to the main storage, and load it to
329 : /// the global map. Creating timeline in this way ensures atomicity: rename
330 : /// is atomic, so either move of the whole datadir succeeds or it doesn't,
331 : /// but corrupted data dir shouldn't be possible.
332 : ///
333 : /// We'd like to avoid holding map lock while doing IO, so it's a 3 step
334 : /// process:
335 : /// 1) check the global map that timeline doesn't exist and mark that we're
336 : /// creating it;
337 : /// 2) move the directory and load the timeline
338 : /// 3) take lock again and insert the timeline into the global map.
339 0 : pub async fn load_temp_timeline(
340 0 : &self,
341 0 : ttid: TenantTimelineId,
342 0 : tmp_path: &Utf8PathBuf,
343 0 : generation: Option<SafekeeperGeneration>,
344 0 : ) -> Result<Arc<Timeline>> {
345 : // Check for existence and mark that we're creating it.
346 0 : let (conf, broker_active_set, partial_backup_rate_limiter, wal_backup) = {
347 0 : let mut state = self.state.lock().unwrap();
348 0 : match state.timelines.get(&ttid) {
349 : Some(GlobalMapTimeline::CreationInProgress) => {
350 0 : bail!(TimelineError::CreationInProgress(ttid));
351 : }
352 : Some(GlobalMapTimeline::Timeline(_)) => {
353 0 : bail!(TimelineError::AlreadyExists(ttid));
354 : }
355 0 : _ => {}
356 : }
357 :
358 0 : if state.has_tombstone(&ttid, generation) {
359 : // If the timeline is deleted, we refuse to recreate it.
360 : // This is a safeguard against accidentally overwriting a timeline that was deleted
361 : // by concurrent request.
362 0 : anyhow::bail!(TimelineError::Deleted(ttid));
363 0 : }
364 :
365 : // We might have an outdated tombstone with the older generation.
366 : // Remove it unconditionally.
367 0 : state.remove_timeline_tombstone(&ttid);
368 :
369 0 : state
370 0 : .timelines
371 0 : .insert(ttid, GlobalMapTimeline::CreationInProgress);
372 0 : state.get_dependencies()
373 : };
374 :
375 : // Do the actual move and reflect the result in the map.
376 0 : match GlobalTimelines::install_temp_timeline(
377 0 : ttid,
378 0 : tmp_path,
379 0 : conf.clone(),
380 0 : wal_backup.clone(),
381 : )
382 0 : .await
383 : {
384 0 : Ok(timeline) => {
385 0 : let mut timeline_shared_state = timeline.write_shared_state().await;
386 0 : let mut state = self.state.lock().unwrap();
387 0 : assert!(matches!(
388 0 : state.timelines.get(&ttid),
389 : Some(GlobalMapTimeline::CreationInProgress)
390 : ));
391 :
392 0 : state
393 0 : .timelines
394 0 : .insert(ttid, GlobalMapTimeline::Timeline(timeline.clone()));
395 0 : drop(state);
396 0 : timeline.bootstrap(
397 0 : &mut timeline_shared_state,
398 0 : &conf,
399 0 : broker_active_set,
400 0 : partial_backup_rate_limiter,
401 0 : wal_backup,
402 : );
403 0 : drop(timeline_shared_state);
404 0 : Ok(timeline)
405 : }
406 0 : Err(e) => {
407 : // Init failed, remove the marker from the map
408 0 : let mut state = self.state.lock().unwrap();
409 0 : assert!(matches!(
410 0 : state.timelines.get(&ttid),
411 : Some(GlobalMapTimeline::CreationInProgress)
412 : ));
413 0 : state.timelines.remove(&ttid);
414 0 : Err(e)
415 : }
416 : }
417 0 : }
418 :
419 : /// Main part of load_temp_timeline: do the move and load.
420 0 : async fn install_temp_timeline(
421 0 : ttid: TenantTimelineId,
422 0 : tmp_path: &Utf8PathBuf,
423 0 : conf: Arc<SafeKeeperConf>,
424 0 : wal_backup: Arc<WalBackup>,
425 0 : ) -> Result<Arc<Timeline>> {
426 0 : let tenant_path = get_tenant_dir(conf.as_ref(), &ttid.tenant_id);
427 0 : let timeline_path = get_timeline_dir(conf.as_ref(), &ttid);
428 :
429 : // We must have already checked that timeline doesn't exist in the map,
430 : // but there might be existing datadir: if timeline is corrupted it is
431 : // not loaded. We don't want to overwrite such a dir, so check for its
432 : // existence.
433 0 : match fs::metadata(&timeline_path).await {
434 : Ok(_) => {
435 : // Timeline directory exists on disk, we should leave state unchanged
436 : // and return error.
437 0 : bail!(TimelineError::Invalid(ttid));
438 : }
439 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
440 0 : Err(e) => {
441 0 : return Err(e.into());
442 : }
443 : }
444 :
445 0 : info!(
446 0 : "moving timeline {} from {} to {}",
447 : ttid, tmp_path, timeline_path
448 : );
449 :
450 : // Now it is safe to move the timeline directory to the correct
451 : // location. First, create tenant directory. Ignore error if it already
452 : // exists.
453 0 : if let Err(e) = tokio::fs::create_dir(&tenant_path).await {
454 0 : if e.kind() != std::io::ErrorKind::AlreadyExists {
455 0 : return Err(e.into());
456 0 : }
457 0 : }
458 : // fsync it
459 0 : fsync_async_opt(&tenant_path, !conf.no_sync).await?;
460 : // and its creation
461 0 : fsync_async_opt(&conf.workdir, !conf.no_sync).await?;
462 :
463 : // Do the move.
464 0 : durable_rename(tmp_path, &timeline_path, !conf.no_sync).await?;
465 :
466 0 : Timeline::load_timeline(conf, ttid, wal_backup)
467 0 : }
468 :
469 : /// Get a timeline from the global map. If it's not present, it doesn't exist on disk,
470 : /// or was corrupted and couldn't be loaded on startup. Returned timeline is always valid,
471 : /// i.e. loaded in memory and not cancelled.
472 0 : pub(crate) fn get(&self, ttid: TenantTimelineId) -> Result<Arc<Timeline>, TimelineError> {
473 0 : let tli_res = {
474 0 : let state = self.state.lock().unwrap();
475 0 : state.get(&ttid)
476 : };
477 0 : match tli_res {
478 0 : Ok(tli) => {
479 0 : if tli.is_cancelled() {
480 0 : return Err(TimelineError::Cancelled(ttid));
481 0 : }
482 0 : Ok(tli)
483 : }
484 0 : _ => tli_res,
485 : }
486 0 : }
487 :
488 : /// Returns all timelines. This is used for background timeline processes.
489 0 : pub fn get_all(&self) -> Vec<Arc<Timeline>> {
490 0 : let global_lock = self.state.lock().unwrap();
491 0 : global_lock
492 0 : .timelines
493 0 : .values()
494 0 : .filter_map(|t| match t {
495 0 : GlobalMapTimeline::Timeline(t) => {
496 0 : if t.is_cancelled() {
497 0 : None
498 : } else {
499 0 : Some(t.clone())
500 : }
501 : }
502 0 : _ => None,
503 0 : })
504 0 : .collect()
505 0 : }
506 :
507 : /// Returns statistics about timeline counts
508 0 : pub fn get_timeline_counts(&self) -> SafekeeperUtilization {
509 0 : let global_lock = self.state.lock().unwrap();
510 0 : let timeline_count = global_lock
511 0 : .timelines
512 0 : .values()
513 0 : .filter(|t| match t {
514 0 : GlobalMapTimeline::CreationInProgress => false,
515 0 : GlobalMapTimeline::Timeline(t) => !t.is_cancelled(),
516 0 : })
517 0 : .count() as u64;
518 0 : SafekeeperUtilization { timeline_count }
519 0 : }
520 :
521 : /// Returns all timelines belonging to a given tenant. Used for deleting all timelines of a tenant,
522 : /// and that's why it can return cancelled timelines, to retry deleting them.
523 0 : fn get_all_for_tenant(&self, tenant_id: TenantId) -> Vec<Arc<Timeline>> {
524 0 : let global_lock = self.state.lock().unwrap();
525 0 : global_lock
526 0 : .timelines
527 0 : .values()
528 0 : .filter_map(|t| match t {
529 0 : GlobalMapTimeline::Timeline(t) => Some(t.clone()),
530 0 : _ => None,
531 0 : })
532 0 : .filter(|t| t.ttid.tenant_id == tenant_id)
533 0 : .collect()
534 0 : }
535 :
536 : /// Delete timeline, only locally on this node or globally (also cleaning
537 : /// remote storage WAL), depending on `action` value.
538 0 : pub(crate) async fn delete_or_exclude(
539 0 : &self,
540 0 : ttid: &TenantTimelineId,
541 0 : action: DeleteOrExclude,
542 0 : ) -> Result<TimelineDeleteResult, DeleteOrExcludeError> {
543 0 : let generation = match &action {
544 0 : DeleteOrExclude::Delete | DeleteOrExclude::DeleteLocal => None,
545 0 : DeleteOrExclude::Exclude(mconf) => Some(mconf.generation),
546 : };
547 :
548 0 : let tli_res = {
549 0 : let state = self.state.lock().unwrap();
550 :
551 : // Do NOT check tenant tombstones here: those were set earlier
552 0 : if state.has_timeline_tombstone(ttid, generation) {
553 : // Presence of a tombstone guarantees that a previous deletion has completed and there is no work to do.
554 0 : info!("Timeline {ttid} was already deleted");
555 0 : return Ok(TimelineDeleteResult { dir_existed: false });
556 0 : }
557 :
558 0 : state.get(ttid)
559 : };
560 :
561 0 : let result = match tli_res {
562 0 : Ok(timeline) => {
563 0 : info!("deleting timeline {}, action={:?}", ttid, action);
564 :
565 : // If node is getting excluded, check the generation first.
566 : // Then, while holding the lock cancel the timeline; it will be
567 : // unusable after this point, and if node is added back first
568 : // deletion must be completed and node seeded anew.
569 : //
570 : // We would like to avoid holding the lock while waiting for the
571 : // gate to finish as this is deadlock prone, so for actual
572 : // deletion will take it second time.
573 : //
574 : // Canceling the timeline will block membership switch requests,
575 : // ensuring that the timeline generation will not increase
576 : // after this point, and we will not remove a timeline with a generation
577 : // higher than the requested one.
578 0 : if let DeleteOrExclude::Exclude(ref mconf) = action {
579 0 : let shared_state = timeline.read_shared_state().await;
580 0 : if shared_state.sk.state().mconf.generation > mconf.generation {
581 0 : return Err(DeleteOrExcludeError::Conflict {
582 0 : requested: mconf.clone(),
583 0 : current: shared_state.sk.state().mconf.clone(),
584 0 : });
585 0 : }
586 0 : timeline.cancel();
587 0 : } else {
588 0 : timeline.cancel();
589 0 : }
590 :
591 0 : timeline.close().await;
592 :
593 0 : info!("timeline {ttid} shut down for deletion");
594 :
595 : // Take a lock and finish the deletion holding this mutex.
596 0 : let mut shared_state = timeline.write_shared_state().await;
597 :
598 0 : let only_local = !matches!(action, DeleteOrExclude::Delete);
599 0 : let dir_existed = timeline.delete(&mut shared_state, only_local).await?;
600 :
601 0 : Ok(TimelineDeleteResult { dir_existed })
602 : }
603 : Err(_) => {
604 : // Timeline is not memory, but it may still exist on disk in broken state.
605 0 : let dir_path = get_timeline_dir(self.state.lock().unwrap().conf.as_ref(), ttid);
606 0 : let dir_existed = delete_dir(&dir_path).await?;
607 :
608 0 : Ok(TimelineDeleteResult { dir_existed })
609 : }
610 : };
611 :
612 : // Finalize deletion, by dropping Timeline objects and storing smaller tombstones. The tombstones
613 : // are used to prevent still-running computes from re-creating the same timeline when they send data,
614 : // and to speed up repeated deletion calls by avoiding re-listing objects.
615 0 : self.state.lock().unwrap().delete(*ttid, generation);
616 :
617 0 : result
618 0 : }
619 :
620 : /// Deactivates and deletes all timelines for the tenant. Returns map of all timelines which
621 : /// the tenant had, `true` if a timeline was active. There may be a race if new timelines are
622 : /// created simultaneously. In that case the function will return error and the caller should
623 : /// retry tenant deletion again later.
624 : ///
625 : /// If only_local, doesn't remove WAL segments in remote storage.
626 0 : pub async fn delete_all_for_tenant(
627 0 : &self,
628 0 : tenant_id: &TenantId,
629 0 : action: DeleteOrExclude,
630 0 : ) -> Result<HashMap<TenantTimelineId, TimelineDeleteResult>> {
631 0 : info!("deleting all timelines for tenant {}", tenant_id);
632 :
633 : // Adding a tombstone before getting the timelines to prevent new timeline additions
634 0 : self.state.lock().unwrap().add_tenant_tombstone(*tenant_id);
635 :
636 0 : let to_delete = self.get_all_for_tenant(*tenant_id);
637 :
638 0 : let mut err = None;
639 :
640 0 : let mut deleted = HashMap::new();
641 0 : for tli in &to_delete {
642 0 : match self.delete_or_exclude(&tli.ttid, action.clone()).await {
643 0 : Ok(result) => {
644 0 : deleted.insert(tli.ttid, result);
645 0 : }
646 0 : Err(e) => {
647 0 : error!("failed to delete timeline {}: {}", tli.ttid, e);
648 : // Save error to return later.
649 0 : err = Some(e);
650 : }
651 : }
652 : }
653 :
654 : // If there was an error, return it.
655 0 : if let Some(e) = err {
656 0 : return Err(anyhow::Error::from(e));
657 0 : }
658 :
659 : // There may be broken timelines on disk, so delete the whole tenant dir as well.
660 : // Note that we could concurrently create new timelines while we were deleting them,
661 : // so the directory may be not empty. In this case timelines will have bad state
662 : // and timeline background jobs can panic.
663 0 : let tenant_dir = get_tenant_dir(self.state.lock().unwrap().conf.as_ref(), tenant_id);
664 0 : delete_dir(&tenant_dir).await?;
665 :
666 0 : Ok(deleted)
667 0 : }
668 :
669 0 : pub fn housekeeping(&self, tombstone_ttl: &Duration) {
670 0 : let mut state = self.state.lock().unwrap();
671 :
672 : // We keep tombstones long enough to have a good chance of preventing rogue computes from re-creating deleted
673 : // timelines. If a compute kept running for longer than this TTL (or across a safekeeper restart) then they
674 : // may recreate a deleted timeline.
675 0 : let now = Instant::now();
676 0 : state
677 0 : .timeline_tombstones
678 0 : .retain(|_, v| now.duration_since(v.timestamp) < *tombstone_ttl);
679 0 : state
680 0 : .tenant_tombstones
681 0 : .retain(|_, v| now.duration_since(*v) < *tombstone_ttl);
682 0 : }
683 :
684 0 : pub fn get_sk_id(&self) -> NodeId {
685 0 : self.state.lock().unwrap().conf.my_id
686 0 : }
687 : }
688 :
689 : /// Action for delete_or_exclude.
690 : #[derive(Clone, Debug)]
691 : pub enum DeleteOrExclude {
692 : /// Delete timeline globally.
693 : Delete,
694 : /// Legacy mode until we fully migrate to generations: like exclude deletes
695 : /// timeline only locally, but ignores generation number.
696 : DeleteLocal,
697 : /// This node is getting excluded, delete timeline locally.
698 : Exclude(membership::Configuration),
699 : }
700 :
701 : /// Create temp directory for a new timeline. It needs to be located on the same
702 : /// filesystem as the rest of the timelines. It will be automatically deleted when
703 : /// Utf8TempDir goes out of scope.
704 0 : pub async fn create_temp_timeline_dir(
705 0 : conf: &SafeKeeperConf,
706 0 : ttid: TenantTimelineId,
707 0 : ) -> Result<(Utf8TempDir, Utf8PathBuf)> {
708 0 : let temp_base = conf.workdir.join("tmp");
709 :
710 0 : tokio::fs::create_dir_all(&temp_base).await?;
711 :
712 0 : let tli_dir = camino_tempfile::Builder::new()
713 0 : .suffix("_temptli")
714 0 : .prefix(&format!("{}_{}_", ttid.tenant_id, ttid.timeline_id))
715 0 : .tempdir_in(temp_base)?;
716 :
717 0 : let tli_dir_path = tli_dir.path().to_path_buf();
718 :
719 0 : Ok((tli_dir, tli_dir_path))
720 0 : }
721 :
722 : /// Do basic validation of a temp timeline, before moving it to the global map.
723 0 : pub async fn validate_temp_timeline(
724 0 : conf: &SafeKeeperConf,
725 0 : ttid: TenantTimelineId,
726 0 : path: &Utf8PathBuf,
727 0 : generation: Option<SafekeeperGeneration>,
728 0 : ) -> Result<(Lsn, Lsn)> {
729 0 : let control_path = path.join("safekeeper.control");
730 :
731 0 : let control_store = control_file::FileStorage::load_control_file(control_path)?;
732 0 : if control_store.server.wal_seg_size == 0 {
733 0 : bail!("wal_seg_size is not set");
734 0 : }
735 :
736 0 : if let Some(generation) = generation {
737 0 : if control_store.mconf.generation > generation {
738 0 : bail!(
739 0 : "tmp timeline generation {} is higher than expected {generation}",
740 : control_store.mconf.generation
741 : );
742 0 : }
743 0 : }
744 :
745 0 : let wal_store = wal_storage::PhysicalStorage::new(&ttid, path, &control_store, conf.no_sync)?;
746 :
747 0 : let commit_lsn = control_store.commit_lsn;
748 0 : let flush_lsn = wal_store.flush_lsn();
749 :
750 0 : Ok((commit_lsn, flush_lsn))
751 0 : }
752 :
753 : /// A tombstone for a deleted timeline.
754 : /// The generation is passed with "exclude" request and stored in the tombstone.
755 : /// We ignore the tombstone if the request generation is higher than
756 : /// the tombstone generation.
757 : /// If the tombstone doesn't have a generation, it's considered permanent,
758 : /// e.g. after "delete" request.
759 : struct TimelineTombstone {
760 : timestamp: Instant,
761 : generation: Option<SafekeeperGeneration>,
762 : }
763 :
764 : impl TimelineTombstone {
765 0 : fn new(generation: Option<SafekeeperGeneration>) -> Self {
766 0 : TimelineTombstone {
767 0 : timestamp: Instant::now(),
768 0 : generation,
769 0 : }
770 0 : }
771 :
772 : /// Check if the timeline is still valid for the given generation.
773 0 : fn is_valid(&self, generation: SafekeeperGeneration) -> bool {
774 0 : self.generation.is_none_or(|g| g >= generation)
775 0 : }
776 : }
|