Line data Source code
1 : //! This module contains global `(tenant_id, timeline_id)` -> `Arc<Timeline>` mapping.
2 : //! All timelines should always be present in this map, this is done by loading them
3 : //! all from the disk on startup and keeping them in memory.
4 :
5 : use crate::safekeeper::ServerInfo;
6 : use crate::timeline::{Timeline, TimelineError};
7 : use crate::SafeKeeperConf;
8 : use anyhow::{bail, Context, Result};
9 : use once_cell::sync::Lazy;
10 : use serde::Serialize;
11 : use std::collections::HashMap;
12 : use std::path::PathBuf;
13 : use std::str::FromStr;
14 : use std::sync::{Arc, Mutex};
15 : use tokio::sync::mpsc::Sender;
16 : use tracing::*;
17 : use utils::id::{TenantId, TenantTimelineId, TimelineId};
18 : use utils::lsn::Lsn;
19 :
20 : struct GlobalTimelinesState {
21 : timelines: HashMap<TenantTimelineId, Arc<Timeline>>,
22 : wal_backup_launcher_tx: Option<Sender<TenantTimelineId>>,
23 : conf: Option<SafeKeeperConf>,
24 : }
25 :
26 : impl GlobalTimelinesState {
27 : /// Get configuration, which must be set once during init.
28 1136 : fn get_conf(&self) -> &SafeKeeperConf {
29 1136 : self.conf
30 1136 : .as_ref()
31 1136 : .expect("GlobalTimelinesState conf is not initialized")
32 1136 : }
33 :
34 : /// Get dependencies for a timeline constructor.
35 524 : fn get_dependencies(&self) -> (SafeKeeperConf, Sender<TenantTimelineId>) {
36 524 : (
37 524 : self.get_conf().clone(),
38 524 : self.wal_backup_launcher_tx.as_ref().unwrap().clone(),
39 524 : )
40 524 : }
41 :
42 : /// Insert timeline into the map. Returns error if timeline with the same id already exists.
43 523 : fn try_insert(&mut self, timeline: Arc<Timeline>) -> Result<()> {
44 523 : let ttid = timeline.ttid;
45 523 : if self.timelines.contains_key(&ttid) {
46 0 : bail!(TimelineError::AlreadyExists(ttid));
47 523 : }
48 523 : self.timelines.insert(ttid, timeline);
49 523 : Ok(())
50 523 : }
51 :
52 : /// Get timeline from the map. Returns error if timeline doesn't exist.
53 14020 : fn get(&self, ttid: &TenantTimelineId) -> Result<Arc<Timeline>, TimelineError> {
54 14020 : self.timelines
55 14020 : .get(ttid)
56 14020 : .cloned()
57 14020 : .ok_or(TimelineError::NotFound(*ttid))
58 14020 : }
59 : }
60 :
61 517 : static TIMELINES_STATE: Lazy<Mutex<GlobalTimelinesState>> = Lazy::new(|| {
62 517 : Mutex::new(GlobalTimelinesState {
63 517 : timelines: HashMap::new(),
64 517 : wal_backup_launcher_tx: None,
65 517 : conf: None,
66 517 : })
67 517 : });
68 :
69 : /// A zero-sized struct used to manage access to the global timelines map.
70 : pub struct GlobalTimelines;
71 :
72 : impl GlobalTimelines {
73 : /// Inject dependencies needed for the timeline constructors and load all timelines to memory.
74 517 : pub async fn init(
75 517 : conf: SafeKeeperConf,
76 517 : wal_backup_launcher_tx: Sender<TenantTimelineId>,
77 517 : ) -> Result<()> {
78 : // clippy isn't smart enough to understand that drop(state) releases the
79 : // lock, so use explicit block
80 517 : let tenants_dir = {
81 517 : let mut state = TIMELINES_STATE.lock().unwrap();
82 517 : assert!(state.wal_backup_launcher_tx.is_none());
83 517 : state.wal_backup_launcher_tx = Some(wal_backup_launcher_tx);
84 517 : state.conf = Some(conf);
85 517 :
86 517 : // Iterate through all directories and load tenants for all directories
87 517 : // named as a valid tenant_id.
88 517 : state.get_conf().workdir.clone()
89 517 : };
90 517 : let mut tenant_count = 0;
91 1637 : for tenants_dir_entry in std::fs::read_dir(&tenants_dir)
92 517 : .with_context(|| format!("failed to list tenants dir {}", tenants_dir.display()))?
93 : {
94 1637 : match &tenants_dir_entry {
95 1637 : Ok(tenants_dir_entry) => {
96 83 : if let Ok(tenant_id) =
97 1637 : TenantId::from_str(tenants_dir_entry.file_name().to_str().unwrap_or(""))
98 : {
99 83 : tenant_count += 1;
100 83 : GlobalTimelines::load_tenant_timelines(tenant_id).await?;
101 1554 : }
102 : }
103 0 : Err(e) => error!(
104 0 : "failed to list tenants dir entry {:?} in directory {}, reason: {:?}",
105 0 : tenants_dir_entry,
106 0 : tenants_dir.display(),
107 0 : e
108 0 : ),
109 : }
110 : }
111 :
112 517 : info!(
113 517 : "found {} tenants directories, successfully loaded {} timelines",
114 517 : tenant_count,
115 517 : TIMELINES_STATE.lock().unwrap().timelines.len()
116 517 : );
117 517 : Ok(())
118 517 : }
119 :
120 : /// Loads all timelines for the given tenant to memory. Returns fs::read_dir
121 : /// errors if any.
122 : ///
123 : /// It is async for update_status_notify sake. Since TIMELINES_STATE lock is
124 : /// sync and there is no important reason to make it async (it is always
125 : /// held for a short while) we just lock and unlock it for each timeline --
126 : /// this function is called during init when nothing else is running, so
127 : /// this is fine.
128 83 : async fn load_tenant_timelines(tenant_id: TenantId) -> Result<()> {
129 83 : let (conf, wal_backup_launcher_tx) = {
130 83 : let state = TIMELINES_STATE.lock().unwrap();
131 83 : (
132 83 : state.get_conf().clone(),
133 83 : state.wal_backup_launcher_tx.as_ref().unwrap().clone(),
134 83 : )
135 83 : };
136 83 :
137 83 : let timelines_dir = conf.tenant_dir(&tenant_id);
138 83 : for timelines_dir_entry in std::fs::read_dir(&timelines_dir)
139 83 : .with_context(|| format!("failed to list timelines dir {}", timelines_dir.display()))?
140 : {
141 80 : match &timelines_dir_entry {
142 80 : Ok(timeline_dir_entry) => {
143 80 : if let Ok(timeline_id) =
144 80 : TimelineId::from_str(timeline_dir_entry.file_name().to_str().unwrap_or(""))
145 : {
146 80 : let ttid = TenantTimelineId::new(tenant_id, timeline_id);
147 80 : match Timeline::load_timeline(&conf, ttid, wal_backup_launcher_tx.clone()) {
148 80 : Ok(timeline) => {
149 80 : let tli = Arc::new(timeline);
150 80 : TIMELINES_STATE
151 80 : .lock()
152 80 : .unwrap()
153 80 : .timelines
154 80 : .insert(ttid, tli.clone());
155 80 : tli.bootstrap(&conf);
156 80 : tli.update_status_notify().await.unwrap();
157 : }
158 : // If we can't load a timeline, it's most likely because of a corrupted
159 : // directory. We will log an error and won't allow to delete/recreate
160 : // this timeline. The only way to fix this timeline is to repair manually
161 : // and restart the safekeeper.
162 0 : Err(e) => error!(
163 0 : "failed to load timeline {} for tenant {}, reason: {:?}",
164 0 : timeline_id, tenant_id, e
165 0 : ),
166 : }
167 0 : }
168 : }
169 0 : Err(e) => error!(
170 0 : "failed to list timelines dir entry {:?} in directory {}, reason: {:?}",
171 0 : timelines_dir_entry,
172 0 : timelines_dir.display(),
173 0 : e
174 0 : ),
175 : }
176 : }
177 :
178 83 : Ok(())
179 83 : }
180 :
181 : /// Load timeline from disk to the memory.
182 1 : pub async fn load_timeline(ttid: TenantTimelineId) -> Result<Arc<Timeline>> {
183 1 : let (conf, wal_backup_launcher_tx) = TIMELINES_STATE.lock().unwrap().get_dependencies();
184 1 :
185 1 : match Timeline::load_timeline(&conf, ttid, wal_backup_launcher_tx) {
186 1 : Ok(timeline) => {
187 1 : let tli = Arc::new(timeline);
188 1 :
189 1 : // TODO: prevent concurrent timeline creation/loading
190 1 : TIMELINES_STATE
191 1 : .lock()
192 1 : .unwrap()
193 1 : .timelines
194 1 : .insert(ttid, tli.clone());
195 1 :
196 1 : tli.bootstrap(&conf);
197 1 :
198 1 : Ok(tli)
199 : }
200 : // If we can't load a timeline, it's bad. Caller will figure it out.
201 0 : Err(e) => bail!("failed to load timeline {}, reason: {:?}", ttid, e),
202 : }
203 1 : }
204 :
205 : /// Get the number of timelines in the map.
206 5 : pub fn timelines_count() -> usize {
207 5 : TIMELINES_STATE.lock().unwrap().timelines.len()
208 5 : }
209 :
210 : /// Get the global safekeeper config.
211 6 : pub fn get_global_config() -> SafeKeeperConf {
212 6 : TIMELINES_STATE.lock().unwrap().get_conf().clone()
213 6 : }
214 :
215 : /// Create a new timeline with the given id. If the timeline already exists, returns
216 : /// an existing timeline.
217 2117 : pub async fn create(
218 2117 : ttid: TenantTimelineId,
219 2117 : server_info: ServerInfo,
220 2117 : commit_lsn: Lsn,
221 2117 : local_start_lsn: Lsn,
222 2117 : ) -> Result<Arc<Timeline>> {
223 523 : let (conf, wal_backup_launcher_tx) = {
224 2117 : let state = TIMELINES_STATE.lock().unwrap();
225 2117 : if let Ok(timeline) = state.get(&ttid) {
226 : // Timeline already exists, return it.
227 1594 : return Ok(timeline);
228 523 : }
229 523 : state.get_dependencies()
230 : };
231 :
232 523 : info!("creating new timeline {}", ttid);
233 :
234 523 : let timeline = Arc::new(Timeline::create_empty(
235 523 : &conf,
236 523 : ttid,
237 523 : wal_backup_launcher_tx,
238 523 : server_info,
239 523 : commit_lsn,
240 523 : local_start_lsn,
241 523 : )?);
242 :
243 : // Take a lock and finish the initialization holding this mutex. No other threads
244 : // can interfere with creation after we will insert timeline into the map.
245 : {
246 523 : let mut shared_state = timeline.write_shared_state().await;
247 :
248 : // We can get a race condition here in case of concurrent create calls, but only
249 : // in theory. create() will return valid timeline on the next try.
250 523 : TIMELINES_STATE
251 523 : .lock()
252 523 : .unwrap()
253 523 : .try_insert(timeline.clone())?;
254 :
255 : // Write the new timeline to the disk and start background workers.
256 : // Bootstrap is transactional, so if it fails, the timeline will be deleted,
257 : // and the state on disk should remain unchanged.
258 2818 : if let Err(e) = timeline.init_new(&mut shared_state, &conf).await {
259 : // Note: the most likely reason for init failure is that the timeline
260 : // directory already exists on disk. This happens when timeline is corrupted
261 : // and wasn't loaded from disk on startup because of that. We want to preserve
262 : // the timeline directory in this case, for further inspection.
263 :
264 : // TODO: this is an unusual error, perhaps we should send it to sentry
265 : // TODO: compute will try to create timeline every second, we should add backoff
266 0 : error!("failed to init new timeline {}: {}", ttid, e);
267 :
268 : // Timeline failed to init, it cannot be used. Remove it from the map.
269 0 : TIMELINES_STATE.lock().unwrap().timelines.remove(&ttid);
270 0 : return Err(e);
271 523 : }
272 523 : // We are done with bootstrap, release the lock, return the timeline.
273 523 : // {} block forces release before .await
274 523 : }
275 523 : timeline.update_status_notify().await?;
276 523 : timeline.wal_backup_launcher_tx.send(timeline.ttid).await?;
277 523 : Ok(timeline)
278 2117 : }
279 :
280 : /// Get a timeline from the global map. If it's not present, it doesn't exist on disk,
281 : /// or was corrupted and couldn't be loaded on startup. Returned timeline is always valid,
282 : /// i.e. loaded in memory and not cancelled.
283 11870 : pub fn get(ttid: TenantTimelineId) -> Result<Arc<Timeline>, TimelineError> {
284 11870 : let res = TIMELINES_STATE.lock().unwrap().get(&ttid);
285 11870 :
286 11870 : match res {
287 11357 : Ok(tli) => {
288 11357 : if tli.is_cancelled() {
289 0 : return Err(TimelineError::Cancelled(ttid));
290 11357 : }
291 11357 : Ok(tli)
292 : }
293 513 : _ => res,
294 : }
295 11870 : }
296 :
297 : /// Returns all timelines. This is used for background timeline processes.
298 8117 : pub fn get_all() -> Vec<Arc<Timeline>> {
299 8117 : let global_lock = TIMELINES_STATE.lock().unwrap();
300 8117 : global_lock
301 8117 : .timelines
302 8117 : .values()
303 8117 : .cloned()
304 8117 : .filter(|t| !t.is_cancelled())
305 8117 : .collect()
306 8117 : }
307 :
308 : /// Returns all timelines belonging to a given tenant. Used for deleting all timelines of a tenant,
309 : /// and that's why it can return cancelled timelines, to retry deleting them.
310 4 : fn get_all_for_tenant(tenant_id: TenantId) -> Vec<Arc<Timeline>> {
311 4 : let global_lock = TIMELINES_STATE.lock().unwrap();
312 4 : global_lock
313 4 : .timelines
314 4 : .values()
315 20 : .filter(|t| t.ttid.tenant_id == tenant_id)
316 4 : .cloned()
317 4 : .collect()
318 4 : }
319 :
320 : /// Cancels timeline, then deletes the corresponding data directory.
321 33 : pub async fn delete_force(ttid: &TenantTimelineId) -> Result<TimelineDeleteForceResult> {
322 33 : let tli_res = TIMELINES_STATE.lock().unwrap().get(ttid);
323 33 : match tli_res {
324 31 : Ok(timeline) => {
325 : // Take a lock and finish the deletion holding this mutex.
326 31 : let mut shared_state = timeline.write_shared_state().await;
327 :
328 31 : info!("deleting timeline {}", ttid);
329 31 : let (dir_existed, was_active) =
330 31 : timeline.delete_from_disk(&mut shared_state).await?;
331 :
332 : // Remove timeline from the map.
333 : // FIXME: re-enable it once we fix the issue with recreation of deleted timelines
334 : // https://github.com/neondatabase/neon/issues/3146
335 : // TIMELINES_STATE.lock().unwrap().timelines.remove(ttid);
336 :
337 31 : Ok(TimelineDeleteForceResult {
338 31 : dir_existed,
339 31 : was_active,
340 31 : })
341 : }
342 : Err(_) => {
343 : // Timeline is not memory, but it may still exist on disk in broken state.
344 2 : let dir_path = TIMELINES_STATE
345 2 : .lock()
346 2 : .unwrap()
347 2 : .get_conf()
348 2 : .timeline_dir(ttid);
349 2 : let dir_existed = delete_dir(dir_path)?;
350 :
351 2 : Ok(TimelineDeleteForceResult {
352 2 : dir_existed,
353 2 : was_active: false,
354 2 : })
355 : }
356 : }
357 33 : }
358 :
359 : /// Deactivates and deletes all timelines for the tenant. Returns map of all timelines which
360 : /// the tenant had, `true` if a timeline was active. There may be a race if new timelines are
361 : /// created simultaneously. In that case the function will return error and the caller should
362 : /// retry tenant deletion again later.
363 4 : pub async fn delete_force_all_for_tenant(
364 4 : tenant_id: &TenantId,
365 4 : ) -> Result<HashMap<TenantTimelineId, TimelineDeleteForceResult>> {
366 4 : info!("deleting all timelines for tenant {}", tenant_id);
367 4 : let to_delete = Self::get_all_for_tenant(*tenant_id);
368 4 :
369 4 : let mut err = None;
370 4 :
371 4 : let mut deleted = HashMap::new();
372 20 : for tli in &to_delete {
373 16 : match Self::delete_force(&tli.ttid).await {
374 16 : Ok(result) => {
375 16 : deleted.insert(tli.ttid, result);
376 16 : }
377 0 : Err(e) => {
378 0 : error!("failed to delete timeline {}: {}", tli.ttid, e);
379 : // Save error to return later.
380 0 : err = Some(e);
381 : }
382 : }
383 : }
384 :
385 : // If there was an error, return it.
386 4 : if let Some(e) = err {
387 0 : return Err(e);
388 4 : }
389 4 :
390 4 : // There may be broken timelines on disk, so delete the whole tenant dir as well.
391 4 : // Note that we could concurrently create new timelines while we were deleting them,
392 4 : // so the directory may be not empty. In this case timelines will have bad state
393 4 : // and timeline background jobs can panic.
394 4 : delete_dir(
395 4 : TIMELINES_STATE
396 4 : .lock()
397 4 : .unwrap()
398 4 : .get_conf()
399 4 : .tenant_dir(tenant_id),
400 4 : )?;
401 :
402 : // FIXME: we temporarily disabled removing timelines from the map, see `delete_force`
403 : // let tlis_after_delete = Self::get_all_for_tenant(*tenant_id);
404 : // if !tlis_after_delete.is_empty() {
405 : // // Some timelines were created while we were deleting them, returning error
406 : // // to the caller, so it can retry later.
407 : // bail!(
408 : // "failed to delete all timelines for tenant {}: some timelines were created while we were deleting them",
409 : // tenant_id
410 : // );
411 : // }
412 :
413 4 : Ok(deleted)
414 4 : }
415 : }
416 :
417 33 : #[derive(Clone, Copy, Serialize)]
418 : pub struct TimelineDeleteForceResult {
419 : pub dir_existed: bool,
420 : pub was_active: bool,
421 : }
422 :
423 : /// Deletes directory and it's contents. Returns false if directory does not exist.
424 : fn delete_dir(path: PathBuf) -> Result<bool> {
425 6 : match std::fs::remove_dir_all(path) {
426 2 : Ok(_) => Ok(true),
427 4 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
428 0 : Err(e) => Err(e.into()),
429 : }
430 6 : }
|