Line data Source code
1 : //! This module contains global `(tenant_id, timeline_id)` -> `Arc<Timeline>` mapping.
2 : //! All timelines should always be present in this map, this is done by loading them
3 : //! all from the disk on startup and keeping them in memory.
4 :
5 : use crate::defaults::DEFAULT_EVICTION_CONCURRENCY;
6 : use crate::rate_limit::RateLimiter;
7 : use crate::safekeeper::ServerInfo;
8 : use crate::timeline::{get_tenant_dir, get_timeline_dir, Timeline, TimelineError};
9 : use crate::timelines_set::TimelinesSet;
10 : use crate::SafeKeeperConf;
11 : use anyhow::{bail, Context, Result};
12 : use camino::Utf8PathBuf;
13 : use once_cell::sync::Lazy;
14 : use serde::Serialize;
15 : use std::collections::HashMap;
16 : use std::str::FromStr;
17 : use std::sync::atomic::Ordering;
18 : use std::sync::{Arc, Mutex};
19 : use std::time::{Duration, Instant};
20 : use tracing::*;
21 : use utils::id::{TenantId, TenantTimelineId, TimelineId};
22 : use utils::lsn::Lsn;
23 :
24 : struct GlobalTimelinesState {
25 : timelines: HashMap<TenantTimelineId, Arc<Timeline>>,
26 :
27 : // A tombstone indicates this timeline used to exist has been deleted. These are used to prevent
28 : // on-demand timeline creation from recreating deleted timelines. This is only soft-enforced, as
29 : // this map is dropped on restart.
30 : tombstones: HashMap<TenantTimelineId, Instant>,
31 :
32 : conf: Option<SafeKeeperConf>,
33 : broker_active_set: Arc<TimelinesSet>,
34 : load_lock: Arc<tokio::sync::Mutex<TimelineLoadLock>>,
35 : global_rate_limiter: RateLimiter,
36 : }
37 :
38 : // Used to prevent concurrent timeline loading.
39 : pub struct TimelineLoadLock;
40 :
41 : impl GlobalTimelinesState {
42 : /// Get configuration, which must be set once during init.
43 0 : fn get_conf(&self) -> &SafeKeeperConf {
44 0 : self.conf
45 0 : .as_ref()
46 0 : .expect("GlobalTimelinesState conf is not initialized")
47 0 : }
48 :
49 : /// Get dependencies for a timeline constructor.
50 0 : fn get_dependencies(&self) -> (SafeKeeperConf, Arc<TimelinesSet>, RateLimiter) {
51 0 : (
52 0 : self.get_conf().clone(),
53 0 : self.broker_active_set.clone(),
54 0 : self.global_rate_limiter.clone(),
55 0 : )
56 0 : }
57 :
58 : /// Insert timeline into the map. Returns error if timeline with the same id already exists.
59 0 : fn try_insert(&mut self, timeline: Arc<Timeline>) -> Result<()> {
60 0 : let ttid = timeline.ttid;
61 0 : if self.timelines.contains_key(&ttid) {
62 0 : bail!(TimelineError::AlreadyExists(ttid));
63 0 : }
64 0 : self.timelines.insert(ttid, timeline);
65 0 : Ok(())
66 0 : }
67 :
68 : /// Get timeline from the map. Returns error if timeline doesn't exist.
69 0 : fn get(&self, ttid: &TenantTimelineId) -> Result<Arc<Timeline>, TimelineError> {
70 0 : self.timelines
71 0 : .get(ttid)
72 0 : .cloned()
73 0 : .ok_or(TimelineError::NotFound(*ttid))
74 0 : }
75 :
76 0 : fn delete(&mut self, ttid: TenantTimelineId) {
77 0 : self.timelines.remove(&ttid);
78 0 : self.tombstones.insert(ttid, Instant::now());
79 0 : }
80 : }
81 :
82 0 : static TIMELINES_STATE: Lazy<Mutex<GlobalTimelinesState>> = Lazy::new(|| {
83 0 : Mutex::new(GlobalTimelinesState {
84 0 : timelines: HashMap::new(),
85 0 : tombstones: HashMap::new(),
86 0 : conf: None,
87 0 : broker_active_set: Arc::new(TimelinesSet::default()),
88 0 : load_lock: Arc::new(tokio::sync::Mutex::new(TimelineLoadLock)),
89 0 : global_rate_limiter: RateLimiter::new(1, 1),
90 0 : })
91 0 : });
92 :
93 : /// A zero-sized struct used to manage access to the global timelines map.
94 : pub struct GlobalTimelines;
95 :
96 : impl GlobalTimelines {
97 : /// Inject dependencies needed for the timeline constructors and load all timelines to memory.
98 0 : pub async fn init(conf: SafeKeeperConf) -> Result<()> {
99 0 : // clippy isn't smart enough to understand that drop(state) releases the
100 0 : // lock, so use explicit block
101 0 : let tenants_dir = {
102 0 : let mut state = TIMELINES_STATE.lock().unwrap();
103 0 : state.global_rate_limiter = RateLimiter::new(
104 0 : conf.partial_backup_concurrency,
105 0 : DEFAULT_EVICTION_CONCURRENCY,
106 0 : );
107 0 : state.conf = Some(conf);
108 0 :
109 0 : // Iterate through all directories and load tenants for all directories
110 0 : // named as a valid tenant_id.
111 0 : state.get_conf().workdir.clone()
112 0 : };
113 0 : let mut tenant_count = 0;
114 0 : for tenants_dir_entry in std::fs::read_dir(&tenants_dir)
115 0 : .with_context(|| format!("failed to list tenants dir {}", tenants_dir))?
116 : {
117 0 : match &tenants_dir_entry {
118 0 : Ok(tenants_dir_entry) => {
119 0 : if let Ok(tenant_id) =
120 0 : TenantId::from_str(tenants_dir_entry.file_name().to_str().unwrap_or(""))
121 : {
122 0 : tenant_count += 1;
123 0 : GlobalTimelines::load_tenant_timelines(tenant_id).await?;
124 0 : }
125 : }
126 0 : Err(e) => error!(
127 0 : "failed to list tenants dir entry {:?} in directory {}, reason: {:?}",
128 : tenants_dir_entry, tenants_dir, e
129 : ),
130 : }
131 : }
132 :
133 0 : info!(
134 0 : "found {} tenants directories, successfully loaded {} timelines",
135 0 : tenant_count,
136 0 : TIMELINES_STATE.lock().unwrap().timelines.len()
137 : );
138 0 : Ok(())
139 0 : }
140 :
141 : /// Loads all timelines for the given tenant to memory. Returns fs::read_dir
142 : /// errors if any.
143 : ///
144 : /// It is async for update_status_notify sake. Since TIMELINES_STATE lock is
145 : /// sync and there is no important reason to make it async (it is always
146 : /// held for a short while) we just lock and unlock it for each timeline --
147 : /// this function is called during init when nothing else is running, so
148 : /// this is fine.
149 0 : async fn load_tenant_timelines(tenant_id: TenantId) -> Result<()> {
150 0 : let (conf, broker_active_set, partial_backup_rate_limiter) = {
151 0 : let state = TIMELINES_STATE.lock().unwrap();
152 0 : state.get_dependencies()
153 0 : };
154 0 :
155 0 : let timelines_dir = get_tenant_dir(&conf, &tenant_id);
156 0 : for timelines_dir_entry in std::fs::read_dir(&timelines_dir)
157 0 : .with_context(|| format!("failed to list timelines dir {}", timelines_dir))?
158 : {
159 0 : match &timelines_dir_entry {
160 0 : Ok(timeline_dir_entry) => {
161 0 : if let Ok(timeline_id) =
162 0 : TimelineId::from_str(timeline_dir_entry.file_name().to_str().unwrap_or(""))
163 : {
164 0 : let ttid = TenantTimelineId::new(tenant_id, timeline_id);
165 0 : match Timeline::load_timeline(&conf, ttid) {
166 0 : Ok(timeline) => {
167 0 : let tli = Arc::new(timeline);
168 0 : let mut shared_state = tli.write_shared_state().await;
169 0 : TIMELINES_STATE
170 0 : .lock()
171 0 : .unwrap()
172 0 : .timelines
173 0 : .insert(ttid, tli.clone());
174 0 : tli.bootstrap(
175 0 : &mut shared_state,
176 0 : &conf,
177 0 : broker_active_set.clone(),
178 0 : partial_backup_rate_limiter.clone(),
179 0 : );
180 : }
181 : // If we can't load a timeline, it's most likely because of a corrupted
182 : // directory. We will log an error and won't allow to delete/recreate
183 : // this timeline. The only way to fix this timeline is to repair manually
184 : // and restart the safekeeper.
185 0 : Err(e) => error!(
186 0 : "failed to load timeline {} for tenant {}, reason: {:?}",
187 : timeline_id, tenant_id, e
188 : ),
189 : }
190 0 : }
191 : }
192 0 : Err(e) => error!(
193 0 : "failed to list timelines dir entry {:?} in directory {}, reason: {:?}",
194 : timelines_dir_entry, timelines_dir, e
195 : ),
196 : }
197 : }
198 :
199 0 : Ok(())
200 0 : }
201 :
202 : /// Take a lock for timeline loading.
203 0 : pub async fn loading_lock() -> Arc<tokio::sync::Mutex<TimelineLoadLock>> {
204 0 : TIMELINES_STATE.lock().unwrap().load_lock.clone()
205 0 : }
206 :
207 : /// Load timeline from disk to the memory.
208 0 : pub async fn load_timeline<'a>(
209 0 : _guard: &tokio::sync::MutexGuard<'a, TimelineLoadLock>,
210 0 : ttid: TenantTimelineId,
211 0 : ) -> Result<Arc<Timeline>> {
212 0 : let (conf, broker_active_set, partial_backup_rate_limiter) =
213 0 : TIMELINES_STATE.lock().unwrap().get_dependencies();
214 0 :
215 0 : match Timeline::load_timeline(&conf, ttid) {
216 0 : Ok(timeline) => {
217 0 : let tli = Arc::new(timeline);
218 0 : let mut shared_state = tli.write_shared_state().await;
219 :
220 : // TODO: prevent concurrent timeline creation/loading
221 : {
222 0 : let mut state = TIMELINES_STATE.lock().unwrap();
223 0 :
224 0 : // We may be have been asked to load a timeline that was previously deleted (e.g. from `pull_timeline.rs`). We trust
225 0 : // that the human doing this manual intervention knows what they are doing, and remove its tombstone.
226 0 : if state.tombstones.remove(&ttid).is_some() {
227 0 : warn!("Un-deleted timeline {ttid}");
228 0 : }
229 :
230 0 : state.timelines.insert(ttid, tli.clone());
231 0 : }
232 0 :
233 0 : tli.bootstrap(
234 0 : &mut shared_state,
235 0 : &conf,
236 0 : broker_active_set,
237 0 : partial_backup_rate_limiter,
238 0 : );
239 0 : drop(shared_state);
240 0 : Ok(tli)
241 : }
242 : // If we can't load a timeline, it's bad. Caller will figure it out.
243 0 : Err(e) => bail!("failed to load timeline {}, reason: {:?}", ttid, e),
244 : }
245 0 : }
246 :
247 : /// Get the number of timelines in the map.
248 0 : pub fn timelines_count() -> usize {
249 0 : TIMELINES_STATE.lock().unwrap().timelines.len()
250 0 : }
251 :
252 : /// Get the global safekeeper config.
253 0 : pub fn get_global_config() -> SafeKeeperConf {
254 0 : TIMELINES_STATE.lock().unwrap().get_conf().clone()
255 0 : }
256 :
257 0 : pub fn get_global_broker_active_set() -> Arc<TimelinesSet> {
258 0 : TIMELINES_STATE.lock().unwrap().broker_active_set.clone()
259 0 : }
260 :
261 : /// Create a new timeline with the given id. If the timeline already exists, returns
262 : /// an existing timeline.
263 0 : pub(crate) async fn create(
264 0 : ttid: TenantTimelineId,
265 0 : server_info: ServerInfo,
266 0 : commit_lsn: Lsn,
267 0 : local_start_lsn: Lsn,
268 0 : ) -> Result<Arc<Timeline>> {
269 0 : let (conf, broker_active_set, partial_backup_rate_limiter) = {
270 0 : let state = TIMELINES_STATE.lock().unwrap();
271 0 : if let Ok(timeline) = state.get(&ttid) {
272 : // Timeline already exists, return it.
273 0 : return Ok(timeline);
274 0 : }
275 0 :
276 0 : if state.tombstones.contains_key(&ttid) {
277 0 : anyhow::bail!("Timeline {ttid} is deleted, refusing to recreate");
278 0 : }
279 0 :
280 0 : state.get_dependencies()
281 0 : };
282 0 :
283 0 : info!("creating new timeline {}", ttid);
284 :
285 0 : let timeline = Arc::new(Timeline::create_empty(
286 0 : &conf,
287 0 : ttid,
288 0 : server_info,
289 0 : commit_lsn,
290 0 : local_start_lsn,
291 0 : )?);
292 :
293 : // Take a lock and finish the initialization holding this mutex. No other threads
294 : // can interfere with creation after we will insert timeline into the map.
295 : {
296 0 : let mut shared_state = timeline.write_shared_state().await;
297 :
298 : // We can get a race condition here in case of concurrent create calls, but only
299 : // in theory. create() will return valid timeline on the next try.
300 0 : TIMELINES_STATE
301 0 : .lock()
302 0 : .unwrap()
303 0 : .try_insert(timeline.clone())?;
304 :
305 : // Write the new timeline to the disk and start background workers.
306 : // Bootstrap is transactional, so if it fails, the timeline will be deleted,
307 : // and the state on disk should remain unchanged.
308 0 : if let Err(e) = timeline
309 0 : .init_new(
310 0 : &mut shared_state,
311 0 : &conf,
312 0 : broker_active_set,
313 0 : partial_backup_rate_limiter,
314 0 : )
315 0 : .await
316 : {
317 : // Note: the most likely reason for init failure is that the timeline
318 : // directory already exists on disk. This happens when timeline is corrupted
319 : // and wasn't loaded from disk on startup because of that. We want to preserve
320 : // the timeline directory in this case, for further inspection.
321 :
322 : // TODO: this is an unusual error, perhaps we should send it to sentry
323 : // TODO: compute will try to create timeline every second, we should add backoff
324 0 : error!("failed to init new timeline {}: {}", ttid, e);
325 :
326 : // Timeline failed to init, it cannot be used. Remove it from the map.
327 0 : TIMELINES_STATE.lock().unwrap().timelines.remove(&ttid);
328 0 : return Err(e);
329 0 : }
330 0 : // We are done with bootstrap, release the lock, return the timeline.
331 0 : // {} block forces release before .await
332 0 : }
333 0 : Ok(timeline)
334 0 : }
335 :
336 : /// Get a timeline from the global map. If it's not present, it doesn't exist on disk,
337 : /// or was corrupted and couldn't be loaded on startup. Returned timeline is always valid,
338 : /// i.e. loaded in memory and not cancelled.
339 0 : pub(crate) fn get(ttid: TenantTimelineId) -> Result<Arc<Timeline>, TimelineError> {
340 0 : let tli_res = {
341 0 : let state = TIMELINES_STATE.lock().unwrap();
342 0 : state.get(&ttid)
343 0 : };
344 0 : match tli_res {
345 0 : Ok(tli) => {
346 0 : if tli.is_cancelled() {
347 0 : return Err(TimelineError::Cancelled(ttid));
348 0 : }
349 0 : Ok(tli)
350 : }
351 0 : _ => tli_res,
352 : }
353 0 : }
354 :
355 : /// Returns all timelines. This is used for background timeline processes.
356 0 : pub fn get_all() -> Vec<Arc<Timeline>> {
357 0 : let global_lock = TIMELINES_STATE.lock().unwrap();
358 0 : global_lock
359 0 : .timelines
360 0 : .values()
361 0 : .filter(|t| !t.is_cancelled())
362 0 : .cloned()
363 0 : .collect()
364 0 : }
365 :
366 : /// Returns all timelines belonging to a given tenant. Used for deleting all timelines of a tenant,
367 : /// and that's why it can return cancelled timelines, to retry deleting them.
368 0 : fn get_all_for_tenant(tenant_id: TenantId) -> Vec<Arc<Timeline>> {
369 0 : let global_lock = TIMELINES_STATE.lock().unwrap();
370 0 : global_lock
371 0 : .timelines
372 0 : .values()
373 0 : .filter(|t| t.ttid.tenant_id == tenant_id)
374 0 : .cloned()
375 0 : .collect()
376 0 : }
377 :
378 : /// Cancels timeline, then deletes the corresponding data directory.
379 : /// If only_local, doesn't remove WAL segments in remote storage.
380 0 : pub(crate) async fn delete(
381 0 : ttid: &TenantTimelineId,
382 0 : only_local: bool,
383 0 : ) -> Result<TimelineDeleteForceResult> {
384 0 : let tli_res = {
385 0 : let state = TIMELINES_STATE.lock().unwrap();
386 0 :
387 0 : if state.tombstones.contains_key(ttid) {
388 : // Presence of a tombstone guarantees that a previous deletion has completed and there is no work to do.
389 0 : info!("Timeline {ttid} was already deleted");
390 0 : return Ok(TimelineDeleteForceResult {
391 0 : dir_existed: false,
392 0 : was_active: false,
393 0 : });
394 0 : }
395 0 :
396 0 : state.get(ttid)
397 : };
398 :
399 0 : let result = match tli_res {
400 0 : Ok(timeline) => {
401 0 : let was_active = timeline.broker_active.load(Ordering::Relaxed);
402 :
403 : // Take a lock and finish the deletion holding this mutex.
404 0 : let mut shared_state = timeline.write_shared_state().await;
405 :
406 0 : info!("deleting timeline {}, only_local={}", ttid, only_local);
407 0 : let dir_existed = timeline.delete(&mut shared_state, only_local).await?;
408 :
409 0 : Ok(TimelineDeleteForceResult {
410 0 : dir_existed,
411 0 : was_active, // TODO: we probably should remove this field
412 0 : })
413 : }
414 : Err(_) => {
415 : // Timeline is not memory, but it may still exist on disk in broken state.
416 0 : let dir_path = get_timeline_dir(TIMELINES_STATE.lock().unwrap().get_conf(), ttid);
417 0 : let dir_existed = delete_dir(dir_path)?;
418 :
419 0 : Ok(TimelineDeleteForceResult {
420 0 : dir_existed,
421 0 : was_active: false,
422 0 : })
423 : }
424 : };
425 :
426 : // Finalize deletion, by dropping Timeline objects and storing smaller tombstones. The tombstones
427 : // are used to prevent still-running computes from re-creating the same timeline when they send data,
428 : // and to speed up repeated deletion calls by avoiding re-listing objects.
429 0 : TIMELINES_STATE.lock().unwrap().delete(*ttid);
430 0 :
431 0 : result
432 0 : }
433 :
434 : /// Deactivates and deletes all timelines for the tenant. Returns map of all timelines which
435 : /// the tenant had, `true` if a timeline was active. There may be a race if new timelines are
436 : /// created simultaneously. In that case the function will return error and the caller should
437 : /// retry tenant deletion again later.
438 : ///
439 : /// If only_local, doesn't remove WAL segments in remote storage.
440 0 : pub async fn delete_force_all_for_tenant(
441 0 : tenant_id: &TenantId,
442 0 : only_local: bool,
443 0 : ) -> Result<HashMap<TenantTimelineId, TimelineDeleteForceResult>> {
444 0 : info!("deleting all timelines for tenant {}", tenant_id);
445 0 : let to_delete = Self::get_all_for_tenant(*tenant_id);
446 0 :
447 0 : let mut err = None;
448 0 :
449 0 : let mut deleted = HashMap::new();
450 0 : for tli in &to_delete {
451 0 : match Self::delete(&tli.ttid, only_local).await {
452 0 : Ok(result) => {
453 0 : deleted.insert(tli.ttid, result);
454 0 : }
455 0 : Err(e) => {
456 0 : error!("failed to delete timeline {}: {}", tli.ttid, e);
457 : // Save error to return later.
458 0 : err = Some(e);
459 : }
460 : }
461 : }
462 :
463 : // If there was an error, return it.
464 0 : if let Some(e) = err {
465 0 : return Err(e);
466 0 : }
467 0 :
468 0 : // There may be broken timelines on disk, so delete the whole tenant dir as well.
469 0 : // Note that we could concurrently create new timelines while we were deleting them,
470 0 : // so the directory may be not empty. In this case timelines will have bad state
471 0 : // and timeline background jobs can panic.
472 0 : delete_dir(get_tenant_dir(
473 0 : TIMELINES_STATE.lock().unwrap().get_conf(),
474 0 : tenant_id,
475 0 : ))?;
476 :
477 0 : Ok(deleted)
478 0 : }
479 :
480 0 : pub fn housekeeping(tombstone_ttl: &Duration) {
481 0 : let mut state = TIMELINES_STATE.lock().unwrap();
482 0 :
483 0 : // We keep tombstones long enough to have a good chance of preventing rogue computes from re-creating deleted
484 0 : // timelines. If a compute kept running for longer than this TTL (or across a safekeeper restart) then they
485 0 : // may recreate a deleted timeline.
486 0 : let now = Instant::now();
487 0 : state
488 0 : .tombstones
489 0 : .retain(|_, v| now.duration_since(*v) < *tombstone_ttl);
490 0 : }
491 : }
492 :
493 : #[derive(Clone, Copy, Serialize)]
494 : pub struct TimelineDeleteForceResult {
495 : pub dir_existed: bool,
496 : pub was_active: bool,
497 : }
498 :
499 : /// Deletes directory and it's contents. Returns false if directory does not exist.
500 0 : fn delete_dir(path: Utf8PathBuf) -> Result<bool> {
501 0 : match std::fs::remove_dir_all(path) {
502 0 : Ok(_) => Ok(true),
503 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
504 0 : Err(e) => Err(e.into()),
505 : }
506 0 : }
|