Line data Source code
1 : //! This module contains global `(tenant_id, timeline_id)` -> `Arc<Timeline>` mapping.
2 : //! All timelines should always be present in this map, this is done by loading them
3 : //! all from the disk on startup and keeping them in memory.
4 :
5 : use crate::safekeeper::ServerInfo;
6 : use crate::timeline::{Timeline, TimelineError};
7 : use crate::SafeKeeperConf;
8 : use anyhow::{bail, Context, Result};
9 : use camino::Utf8PathBuf;
10 : use once_cell::sync::Lazy;
11 : use serde::Serialize;
12 : use std::collections::HashMap;
13 : use std::str::FromStr;
14 : use std::sync::{Arc, Mutex};
15 : use tokio::sync::mpsc::Sender;
16 : use tracing::*;
17 : use utils::id::{TenantId, TenantTimelineId, TimelineId};
18 : use utils::lsn::Lsn;
19 :
20 : struct GlobalTimelinesState {
21 : timelines: HashMap<TenantTimelineId, Arc<Timeline>>,
22 : wal_backup_launcher_tx: Option<Sender<TenantTimelineId>>,
23 : conf: Option<SafeKeeperConf>,
24 : load_lock: Arc<tokio::sync::Mutex<TimelineLoadLock>>,
25 : }
26 :
27 : // Used to prevent concurrent timeline loading.
28 : pub struct TimelineLoadLock;
29 :
30 : impl GlobalTimelinesState {
31 : /// Get configuration, which must be set once during init.
32 1270 : fn get_conf(&self) -> &SafeKeeperConf {
33 1270 : self.conf
34 1270 : .as_ref()
35 1270 : .expect("GlobalTimelinesState conf is not initialized")
36 1270 : }
37 :
38 : /// Get dependencies for a timeline constructor.
39 528 : fn get_dependencies(&self) -> (SafeKeeperConf, Sender<TenantTimelineId>) {
40 528 : (
41 528 : self.get_conf().clone(),
42 528 : self.wal_backup_launcher_tx.as_ref().unwrap().clone(),
43 528 : )
44 528 : }
45 :
46 : /// Insert timeline into the map. Returns error if timeline with the same id already exists.
47 479 : fn try_insert(&mut self, timeline: Arc<Timeline>) -> Result<()> {
48 479 : let ttid = timeline.ttid;
49 479 : if self.timelines.contains_key(&ttid) {
50 0 : bail!(TimelineError::AlreadyExists(ttid));
51 479 : }
52 479 : self.timelines.insert(ttid, timeline);
53 479 : Ok(())
54 479 : }
55 :
56 : /// Get timeline from the map. Returns error if timeline doesn't exist.
57 15790 : fn get(&self, ttid: &TenantTimelineId) -> Result<Arc<Timeline>, TimelineError> {
58 15790 : self.timelines
59 15790 : .get(ttid)
60 15790 : .cloned()
61 15790 : .ok_or(TimelineError::NotFound(*ttid))
62 15790 : }
63 : }
64 :
65 508 : static TIMELINES_STATE: Lazy<Mutex<GlobalTimelinesState>> = Lazy::new(|| {
66 508 : Mutex::new(GlobalTimelinesState {
67 508 : timelines: HashMap::new(),
68 508 : wal_backup_launcher_tx: None,
69 508 : conf: None,
70 508 : load_lock: Arc::new(tokio::sync::Mutex::new(TimelineLoadLock)),
71 508 : })
72 508 : });
73 :
74 : /// A zero-sized struct used to manage access to the global timelines map.
75 : pub struct GlobalTimelines;
76 :
77 : impl GlobalTimelines {
78 : /// Inject dependencies needed for the timeline constructors and load all timelines to memory.
79 508 : pub async fn init(
80 508 : conf: SafeKeeperConf,
81 508 : wal_backup_launcher_tx: Sender<TenantTimelineId>,
82 508 : ) -> Result<()> {
83 : // clippy isn't smart enough to understand that drop(state) releases the
84 : // lock, so use explicit block
85 508 : let tenants_dir = {
86 508 : let mut state = TIMELINES_STATE.lock().unwrap();
87 508 : assert!(state.wal_backup_launcher_tx.is_none());
88 508 : state.wal_backup_launcher_tx = Some(wal_backup_launcher_tx);
89 508 : state.conf = Some(conf);
90 508 :
91 508 : // Iterate through all directories and load tenants for all directories
92 508 : // named as a valid tenant_id.
93 508 : state.get_conf().workdir.clone()
94 508 : };
95 508 : let mut tenant_count = 0;
96 1606 : for tenants_dir_entry in std::fs::read_dir(&tenants_dir)
97 508 : .with_context(|| format!("failed to list tenants dir {}", tenants_dir))?
98 : {
99 1606 : match &tenants_dir_entry {
100 1606 : Ok(tenants_dir_entry) => {
101 81 : if let Ok(tenant_id) =
102 1606 : TenantId::from_str(tenants_dir_entry.file_name().to_str().unwrap_or(""))
103 : {
104 81 : tenant_count += 1;
105 81 : GlobalTimelines::load_tenant_timelines(tenant_id).await?;
106 1525 : }
107 : }
108 0 : Err(e) => error!(
109 0 : "failed to list tenants dir entry {:?} in directory {}, reason: {:?}",
110 0 : tenants_dir_entry, tenants_dir, e
111 0 : ),
112 : }
113 : }
114 :
115 508 : info!(
116 508 : "found {} tenants directories, successfully loaded {} timelines",
117 508 : tenant_count,
118 508 : TIMELINES_STATE.lock().unwrap().timelines.len()
119 508 : );
120 508 : Ok(())
121 508 : }
122 :
123 : /// Loads all timelines for the given tenant to memory. Returns fs::read_dir
124 : /// errors if any.
125 : ///
126 : /// It is async for update_status_notify sake. Since TIMELINES_STATE lock is
127 : /// sync and there is no important reason to make it async (it is always
128 : /// held for a short while) we just lock and unlock it for each timeline --
129 : /// this function is called during init when nothing else is running, so
130 : /// this is fine.
131 81 : async fn load_tenant_timelines(tenant_id: TenantId) -> Result<()> {
132 81 : let (conf, wal_backup_launcher_tx) = {
133 81 : let state = TIMELINES_STATE.lock().unwrap();
134 81 : (
135 81 : state.get_conf().clone(),
136 81 : state.wal_backup_launcher_tx.as_ref().unwrap().clone(),
137 81 : )
138 81 : };
139 81 :
140 81 : let timelines_dir = conf.tenant_dir(&tenant_id);
141 84 : for timelines_dir_entry in std::fs::read_dir(&timelines_dir)
142 81 : .with_context(|| format!("failed to list timelines dir {}", timelines_dir))?
143 : {
144 84 : match &timelines_dir_entry {
145 84 : Ok(timeline_dir_entry) => {
146 84 : if let Ok(timeline_id) =
147 84 : TimelineId::from_str(timeline_dir_entry.file_name().to_str().unwrap_or(""))
148 : {
149 84 : let ttid = TenantTimelineId::new(tenant_id, timeline_id);
150 84 : match Timeline::load_timeline(&conf, ttid, wal_backup_launcher_tx.clone()) {
151 84 : Ok(timeline) => {
152 84 : let tli = Arc::new(timeline);
153 84 : TIMELINES_STATE
154 84 : .lock()
155 84 : .unwrap()
156 84 : .timelines
157 84 : .insert(ttid, tli.clone());
158 84 : tli.bootstrap(&conf);
159 84 : tli.update_status_notify().await.unwrap();
160 : }
161 : // If we can't load a timeline, it's most likely because of a corrupted
162 : // directory. We will log an error and won't allow to delete/recreate
163 : // this timeline. The only way to fix this timeline is to repair manually
164 : // and restart the safekeeper.
165 0 : Err(e) => error!(
166 0 : "failed to load timeline {} for tenant {}, reason: {:?}",
167 0 : timeline_id, tenant_id, e
168 0 : ),
169 : }
170 0 : }
171 : }
172 0 : Err(e) => error!(
173 0 : "failed to list timelines dir entry {:?} in directory {}, reason: {:?}",
174 0 : timelines_dir_entry, timelines_dir, e
175 0 : ),
176 : }
177 : }
178 :
179 81 : Ok(())
180 81 : }
181 :
182 : /// Take a lock for timeline loading.
183 49 : pub async fn loading_lock() -> Arc<tokio::sync::Mutex<TimelineLoadLock>> {
184 49 : TIMELINES_STATE.lock().unwrap().load_lock.clone()
185 49 : }
186 :
187 : /// Load timeline from disk to the memory.
188 49 : pub async fn load_timeline<'a>(
189 49 : _guard: &tokio::sync::MutexGuard<'a, TimelineLoadLock>,
190 49 : ttid: TenantTimelineId,
191 49 : ) -> Result<Arc<Timeline>> {
192 49 : let (conf, wal_backup_launcher_tx) = TIMELINES_STATE.lock().unwrap().get_dependencies();
193 49 :
194 49 : match Timeline::load_timeline(&conf, ttid, wal_backup_launcher_tx) {
195 49 : Ok(timeline) => {
196 49 : let tli = Arc::new(timeline);
197 49 :
198 49 : // TODO: prevent concurrent timeline creation/loading
199 49 : TIMELINES_STATE
200 49 : .lock()
201 49 : .unwrap()
202 49 : .timelines
203 49 : .insert(ttid, tli.clone());
204 49 :
205 49 : tli.bootstrap(&conf);
206 49 :
207 49 : Ok(tli)
208 : }
209 : // If we can't load a timeline, it's bad. Caller will figure it out.
210 0 : Err(e) => bail!("failed to load timeline {}, reason: {:?}", ttid, e),
211 : }
212 49 : }
213 :
214 : /// Get the number of timelines in the map.
215 6 : pub fn timelines_count() -> usize {
216 6 : TIMELINES_STATE.lock().unwrap().timelines.len()
217 6 : }
218 :
219 : /// Get the global safekeeper config.
220 147 : pub fn get_global_config() -> SafeKeeperConf {
221 147 : TIMELINES_STATE.lock().unwrap().get_conf().clone()
222 147 : }
223 :
224 : /// Create a new timeline with the given id. If the timeline already exists, returns
225 : /// an existing timeline.
226 1854 : pub async fn create(
227 1854 : ttid: TenantTimelineId,
228 1854 : server_info: ServerInfo,
229 1854 : commit_lsn: Lsn,
230 1854 : local_start_lsn: Lsn,
231 1854 : ) -> Result<Arc<Timeline>> {
232 479 : let (conf, wal_backup_launcher_tx) = {
233 1854 : let state = TIMELINES_STATE.lock().unwrap();
234 1854 : if let Ok(timeline) = state.get(&ttid) {
235 : // Timeline already exists, return it.
236 1375 : return Ok(timeline);
237 479 : }
238 479 : state.get_dependencies()
239 : };
240 :
241 479 : info!("creating new timeline {}", ttid);
242 :
243 479 : let timeline = Arc::new(Timeline::create_empty(
244 479 : &conf,
245 479 : ttid,
246 479 : wal_backup_launcher_tx,
247 479 : server_info,
248 479 : commit_lsn,
249 479 : local_start_lsn,
250 479 : )?);
251 :
252 : // Take a lock and finish the initialization holding this mutex. No other threads
253 : // can interfere with creation after we will insert timeline into the map.
254 : {
255 479 : let mut shared_state = timeline.write_shared_state().await;
256 :
257 : // We can get a race condition here in case of concurrent create calls, but only
258 : // in theory. create() will return valid timeline on the next try.
259 479 : TIMELINES_STATE
260 479 : .lock()
261 479 : .unwrap()
262 479 : .try_insert(timeline.clone())?;
263 :
264 : // Write the new timeline to the disk and start background workers.
265 : // Bootstrap is transactional, so if it fails, the timeline will be deleted,
266 : // and the state on disk should remain unchanged.
267 2663 : if let Err(e) = timeline.init_new(&mut shared_state, &conf).await {
268 : // Note: the most likely reason for init failure is that the timeline
269 : // directory already exists on disk. This happens when timeline is corrupted
270 : // and wasn't loaded from disk on startup because of that. We want to preserve
271 : // the timeline directory in this case, for further inspection.
272 :
273 : // TODO: this is an unusual error, perhaps we should send it to sentry
274 : // TODO: compute will try to create timeline every second, we should add backoff
275 0 : error!("failed to init new timeline {}: {}", ttid, e);
276 :
277 : // Timeline failed to init, it cannot be used. Remove it from the map.
278 0 : TIMELINES_STATE.lock().unwrap().timelines.remove(&ttid);
279 0 : return Err(e);
280 479 : }
281 479 : // We are done with bootstrap, release the lock, return the timeline.
282 479 : // {} block forces release before .await
283 479 : }
284 479 : timeline.update_status_notify().await?;
285 479 : timeline.wal_backup_launcher_tx.send(timeline.ttid).await?;
286 479 : Ok(timeline)
287 1854 : }
288 :
289 : /// Get a timeline from the global map. If it's not present, it doesn't exist on disk,
290 : /// or was corrupted and couldn't be loaded on startup. Returned timeline is always valid,
291 : /// i.e. loaded in memory and not cancelled.
292 13906 : pub fn get(ttid: TenantTimelineId) -> Result<Arc<Timeline>, TimelineError> {
293 13906 : let res = TIMELINES_STATE.lock().unwrap().get(&ttid);
294 13906 :
295 13906 : match res {
296 13331 : Ok(tli) => {
297 13331 : if tli.is_cancelled() {
298 5 : return Err(TimelineError::Cancelled(ttid));
299 13326 : }
300 13326 : Ok(tli)
301 : }
302 575 : _ => res,
303 : }
304 13906 : }
305 :
306 : /// Returns all timelines. This is used for background timeline processes.
307 10237 : pub fn get_all() -> Vec<Arc<Timeline>> {
308 10237 : let global_lock = TIMELINES_STATE.lock().unwrap();
309 10237 : global_lock
310 10237 : .timelines
311 10237 : .values()
312 10237 : .filter(|t| !t.is_cancelled())
313 10237 : .cloned()
314 10237 : .collect()
315 10237 : }
316 :
317 : /// Returns all timelines belonging to a given tenant. Used for deleting all timelines of a tenant,
318 : /// and that's why it can return cancelled timelines, to retry deleting them.
319 4 : fn get_all_for_tenant(tenant_id: TenantId) -> Vec<Arc<Timeline>> {
320 4 : let global_lock = TIMELINES_STATE.lock().unwrap();
321 4 : global_lock
322 4 : .timelines
323 4 : .values()
324 20 : .filter(|t| t.ttid.tenant_id == tenant_id)
325 4 : .cloned()
326 4 : .collect()
327 4 : }
328 :
329 : /// Cancels timeline, then deletes the corresponding data directory.
330 : /// If only_local, doesn't remove WAL segments in remote storage.
331 30 : pub async fn delete(
332 30 : ttid: &TenantTimelineId,
333 30 : only_local: bool,
334 30 : ) -> Result<TimelineDeleteForceResult> {
335 30 : let tli_res = TIMELINES_STATE.lock().unwrap().get(ttid);
336 30 : match tli_res {
337 28 : Ok(timeline) => {
338 : // Take a lock and finish the deletion holding this mutex.
339 28 : let mut shared_state = timeline.write_shared_state().await;
340 :
341 28 : info!("deleting timeline {}, only_local={}", ttid, only_local);
342 28 : let (dir_existed, was_active) =
343 59 : timeline.delete(&mut shared_state, only_local).await?;
344 :
345 : // Remove timeline from the map.
346 : // FIXME: re-enable it once we fix the issue with recreation of deleted timelines
347 : // https://github.com/neondatabase/neon/issues/3146
348 : // TIMELINES_STATE.lock().unwrap().timelines.remove(ttid);
349 :
350 28 : Ok(TimelineDeleteForceResult {
351 28 : dir_existed,
352 28 : was_active,
353 28 : })
354 : }
355 : Err(_) => {
356 : // Timeline is not memory, but it may still exist on disk in broken state.
357 2 : let dir_path = TIMELINES_STATE
358 2 : .lock()
359 2 : .unwrap()
360 2 : .get_conf()
361 2 : .timeline_dir(ttid);
362 2 : let dir_existed = delete_dir(dir_path)?;
363 :
364 2 : Ok(TimelineDeleteForceResult {
365 2 : dir_existed,
366 2 : was_active: false,
367 2 : })
368 : }
369 : }
370 30 : }
371 :
372 : /// Deactivates and deletes all timelines for the tenant. Returns map of all timelines which
373 : /// the tenant had, `true` if a timeline was active. There may be a race if new timelines are
374 : /// created simultaneously. In that case the function will return error and the caller should
375 : /// retry tenant deletion again later.
376 : ///
377 : /// If only_local, doesn't remove WAL segments in remote storage.
378 4 : pub async fn delete_force_all_for_tenant(
379 4 : tenant_id: &TenantId,
380 4 : only_local: bool,
381 4 : ) -> Result<HashMap<TenantTimelineId, TimelineDeleteForceResult>> {
382 4 : info!("deleting all timelines for tenant {}", tenant_id);
383 4 : let to_delete = Self::get_all_for_tenant(*tenant_id);
384 4 :
385 4 : let mut err = None;
386 4 :
387 4 : let mut deleted = HashMap::new();
388 20 : for tli in &to_delete {
389 16 : match Self::delete(&tli.ttid, only_local).await {
390 16 : Ok(result) => {
391 16 : deleted.insert(tli.ttid, result);
392 16 : }
393 0 : Err(e) => {
394 0 : error!("failed to delete timeline {}: {}", tli.ttid, e);
395 : // Save error to return later.
396 0 : err = Some(e);
397 : }
398 : }
399 : }
400 :
401 : // If there was an error, return it.
402 4 : if let Some(e) = err {
403 0 : return Err(e);
404 4 : }
405 4 :
406 4 : // There may be broken timelines on disk, so delete the whole tenant dir as well.
407 4 : // Note that we could concurrently create new timelines while we were deleting them,
408 4 : // so the directory may be not empty. In this case timelines will have bad state
409 4 : // and timeline background jobs can panic.
410 4 : delete_dir(
411 4 : TIMELINES_STATE
412 4 : .lock()
413 4 : .unwrap()
414 4 : .get_conf()
415 4 : .tenant_dir(tenant_id),
416 4 : )?;
417 :
418 : // FIXME: we temporarily disabled removing timelines from the map, see `delete_force`
419 : // let tlis_after_delete = Self::get_all_for_tenant(*tenant_id);
420 : // if !tlis_after_delete.is_empty() {
421 : // // Some timelines were created while we were deleting them, returning error
422 : // // to the caller, so it can retry later.
423 : // bail!(
424 : // "failed to delete all timelines for tenant {}: some timelines were created while we were deleting them",
425 : // tenant_id
426 : // );
427 : // }
428 :
429 4 : Ok(deleted)
430 4 : }
431 : }
432 :
433 30 : #[derive(Clone, Copy, Serialize)]
434 : pub struct TimelineDeleteForceResult {
435 : pub dir_existed: bool,
436 : pub was_active: bool,
437 : }
438 :
439 : /// Deletes directory and it's contents. Returns false if directory does not exist.
440 6 : fn delete_dir(path: Utf8PathBuf) -> Result<bool> {
441 6 : match std::fs::remove_dir_all(path) {
442 2 : Ok(_) => Ok(true),
443 4 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
444 0 : Err(e) => Err(e.into()),
445 : }
446 6 : }
|