TLA Line data Source code
1 : //! This module contains global `(tenant_id, timeline_id)` -> `Arc<Timeline>` mapping.
2 : //! All timelines should always be present in this map, this is done by loading them
3 : //! all from the disk on startup and keeping them in memory.
4 :
5 : use crate::safekeeper::ServerInfo;
6 : use crate::timeline::{Timeline, TimelineError};
7 : use crate::SafeKeeperConf;
8 : use anyhow::{bail, Context, Result};
9 : use camino::Utf8PathBuf;
10 : use once_cell::sync::Lazy;
11 : use serde::Serialize;
12 : use std::collections::HashMap;
13 : use std::str::FromStr;
14 : use std::sync::{Arc, Mutex};
15 : use tokio::sync::mpsc::Sender;
16 : use tracing::*;
17 : use utils::id::{TenantId, TenantTimelineId, TimelineId};
18 : use utils::lsn::Lsn;
19 :
20 : struct GlobalTimelinesState {
21 : timelines: HashMap<TenantTimelineId, Arc<Timeline>>,
22 : wal_backup_launcher_tx: Option<Sender<TenantTimelineId>>,
23 : conf: Option<SafeKeeperConf>,
24 : }
25 :
26 : impl GlobalTimelinesState {
27 : /// Get configuration, which must be set once during init.
28 CBC 1080 : fn get_conf(&self) -> &SafeKeeperConf {
29 1080 : self.conf
30 1080 : .as_ref()
31 1080 : .expect("GlobalTimelinesState conf is not initialized")
32 1080 : }
33 :
34 : /// Get dependencies for a timeline constructor.
35 482 : fn get_dependencies(&self) -> (SafeKeeperConf, Sender<TenantTimelineId>) {
36 482 : (
37 482 : self.get_conf().clone(),
38 482 : self.wal_backup_launcher_tx.as_ref().unwrap().clone(),
39 482 : )
40 482 : }
41 :
42 : /// Insert timeline into the map. Returns error if timeline with the same id already exists.
43 481 : fn try_insert(&mut self, timeline: Arc<Timeline>) -> Result<()> {
44 481 : let ttid = timeline.ttid;
45 481 : if self.timelines.contains_key(&ttid) {
46 UBC 0 : bail!(TimelineError::AlreadyExists(ttid));
47 CBC 481 : }
48 481 : self.timelines.insert(ttid, timeline);
49 481 : Ok(())
50 481 : }
51 :
52 : /// Get timeline from the map. Returns error if timeline doesn't exist.
53 14460 : fn get(&self, ttid: &TenantTimelineId) -> Result<Arc<Timeline>, TimelineError> {
54 14460 : self.timelines
55 14460 : .get(ttid)
56 14460 : .cloned()
57 14460 : .ok_or(TimelineError::NotFound(*ttid))
58 14460 : }
59 : }
60 :
61 500 : static TIMELINES_STATE: Lazy<Mutex<GlobalTimelinesState>> = Lazy::new(|| {
62 500 : Mutex::new(GlobalTimelinesState {
63 500 : timelines: HashMap::new(),
64 500 : wal_backup_launcher_tx: None,
65 500 : conf: None,
66 500 : })
67 500 : });
68 :
69 : /// A zero-sized struct used to manage access to the global timelines map.
70 : pub struct GlobalTimelines;
71 :
72 : impl GlobalTimelines {
73 : /// Inject dependencies needed for the timeline constructors and load all timelines to memory.
74 500 : pub async fn init(
75 500 : conf: SafeKeeperConf,
76 500 : wal_backup_launcher_tx: Sender<TenantTimelineId>,
77 500 : ) -> Result<()> {
78 : // clippy isn't smart enough to understand that drop(state) releases the
79 : // lock, so use explicit block
80 500 : let tenants_dir = {
81 500 : let mut state = TIMELINES_STATE.lock().unwrap();
82 500 : assert!(state.wal_backup_launcher_tx.is_none());
83 500 : state.wal_backup_launcher_tx = Some(wal_backup_launcher_tx);
84 500 : state.conf = Some(conf);
85 500 :
86 500 : // Iterate through all directories and load tenants for all directories
87 500 : // named as a valid tenant_id.
88 500 : state.get_conf().workdir.clone()
89 500 : };
90 500 : let mut tenant_count = 0;
91 1589 : for tenants_dir_entry in std::fs::read_dir(&tenants_dir)
92 500 : .with_context(|| format!("failed to list tenants dir {}", tenants_dir))?
93 : {
94 1589 : match &tenants_dir_entry {
95 1589 : Ok(tenants_dir_entry) => {
96 86 : if let Ok(tenant_id) =
97 1589 : TenantId::from_str(tenants_dir_entry.file_name().to_str().unwrap_or(""))
98 : {
99 86 : tenant_count += 1;
100 86 : GlobalTimelines::load_tenant_timelines(tenant_id).await?;
101 1503 : }
102 : }
103 UBC 0 : Err(e) => error!(
104 0 : "failed to list tenants dir entry {:?} in directory {}, reason: {:?}",
105 0 : tenants_dir_entry, tenants_dir, e
106 0 : ),
107 : }
108 : }
109 :
110 CBC 500 : info!(
111 500 : "found {} tenants directories, successfully loaded {} timelines",
112 500 : tenant_count,
113 500 : TIMELINES_STATE.lock().unwrap().timelines.len()
114 500 : );
115 500 : Ok(())
116 500 : }
117 :
118 : /// Loads all timelines for the given tenant to memory. Returns fs::read_dir
119 : /// errors if any.
120 : ///
121 : /// It is async for update_status_notify sake. Since TIMELINES_STATE lock is
122 : /// sync and there is no important reason to make it async (it is always
123 : /// held for a short while) we just lock and unlock it for each timeline --
124 : /// this function is called during init when nothing else is running, so
125 : /// this is fine.
126 86 : async fn load_tenant_timelines(tenant_id: TenantId) -> Result<()> {
127 86 : let (conf, wal_backup_launcher_tx) = {
128 86 : let state = TIMELINES_STATE.lock().unwrap();
129 86 : (
130 86 : state.get_conf().clone(),
131 86 : state.wal_backup_launcher_tx.as_ref().unwrap().clone(),
132 86 : )
133 86 : };
134 86 :
135 86 : let timelines_dir = conf.tenant_dir(&tenant_id);
136 86 : for timelines_dir_entry in std::fs::read_dir(&timelines_dir)
137 86 : .with_context(|| format!("failed to list timelines dir {}", timelines_dir))?
138 : {
139 83 : match &timelines_dir_entry {
140 83 : Ok(timeline_dir_entry) => {
141 83 : if let Ok(timeline_id) =
142 83 : TimelineId::from_str(timeline_dir_entry.file_name().to_str().unwrap_or(""))
143 : {
144 83 : let ttid = TenantTimelineId::new(tenant_id, timeline_id);
145 83 : match Timeline::load_timeline(&conf, ttid, wal_backup_launcher_tx.clone()) {
146 83 : Ok(timeline) => {
147 83 : let tli = Arc::new(timeline);
148 83 : TIMELINES_STATE
149 83 : .lock()
150 83 : .unwrap()
151 83 : .timelines
152 83 : .insert(ttid, tli.clone());
153 83 : tli.bootstrap(&conf);
154 83 : tli.update_status_notify().await.unwrap();
155 : }
156 : // If we can't load a timeline, it's most likely because of a corrupted
157 : // directory. We will log an error and won't allow to delete/recreate
158 : // this timeline. The only way to fix this timeline is to repair manually
159 : // and restart the safekeeper.
160 UBC 0 : Err(e) => error!(
161 0 : "failed to load timeline {} for tenant {}, reason: {:?}",
162 0 : timeline_id, tenant_id, e
163 0 : ),
164 : }
165 0 : }
166 : }
167 0 : Err(e) => error!(
168 0 : "failed to list timelines dir entry {:?} in directory {}, reason: {:?}",
169 0 : timelines_dir_entry, timelines_dir, e
170 0 : ),
171 : }
172 : }
173 :
174 CBC 86 : Ok(())
175 86 : }
176 :
177 : /// Load timeline from disk to the memory.
178 1 : pub async fn load_timeline(ttid: TenantTimelineId) -> Result<Arc<Timeline>> {
179 1 : let (conf, wal_backup_launcher_tx) = TIMELINES_STATE.lock().unwrap().get_dependencies();
180 1 :
181 1 : match Timeline::load_timeline(&conf, ttid, wal_backup_launcher_tx) {
182 1 : Ok(timeline) => {
183 1 : let tli = Arc::new(timeline);
184 1 :
185 1 : // TODO: prevent concurrent timeline creation/loading
186 1 : TIMELINES_STATE
187 1 : .lock()
188 1 : .unwrap()
189 1 : .timelines
190 1 : .insert(ttid, tli.clone());
191 1 :
192 1 : tli.bootstrap(&conf);
193 1 :
194 1 : Ok(tli)
195 : }
196 : // If we can't load a timeline, it's bad. Caller will figure it out.
197 UBC 0 : Err(e) => bail!("failed to load timeline {}, reason: {:?}", ttid, e),
198 : }
199 CBC 1 : }
200 :
201 : /// Get the number of timelines in the map.
202 5 : pub fn timelines_count() -> usize {
203 5 : TIMELINES_STATE.lock().unwrap().timelines.len()
204 5 : }
205 :
206 : /// Get the global safekeeper config.
207 6 : pub fn get_global_config() -> SafeKeeperConf {
208 6 : TIMELINES_STATE.lock().unwrap().get_conf().clone()
209 6 : }
210 :
211 : /// Create a new timeline with the given id. If the timeline already exists, returns
212 : /// an existing timeline.
213 2024 : pub async fn create(
214 2024 : ttid: TenantTimelineId,
215 2024 : server_info: ServerInfo,
216 2024 : commit_lsn: Lsn,
217 2024 : local_start_lsn: Lsn,
218 2024 : ) -> Result<Arc<Timeline>> {
219 481 : let (conf, wal_backup_launcher_tx) = {
220 2024 : let state = TIMELINES_STATE.lock().unwrap();
221 2024 : if let Ok(timeline) = state.get(&ttid) {
222 : // Timeline already exists, return it.
223 1543 : return Ok(timeline);
224 481 : }
225 481 : state.get_dependencies()
226 : };
227 :
228 481 : info!("creating new timeline {}", ttid);
229 :
230 481 : let timeline = Arc::new(Timeline::create_empty(
231 481 : &conf,
232 481 : ttid,
233 481 : wal_backup_launcher_tx,
234 481 : server_info,
235 481 : commit_lsn,
236 481 : local_start_lsn,
237 481 : )?);
238 :
239 : // Take a lock and finish the initialization holding this mutex. No other threads
240 : // can interfere with creation after we will insert timeline into the map.
241 : {
242 481 : let mut shared_state = timeline.write_shared_state().await;
243 :
244 : // We can get a race condition here in case of concurrent create calls, but only
245 : // in theory. create() will return valid timeline on the next try.
246 481 : TIMELINES_STATE
247 481 : .lock()
248 481 : .unwrap()
249 481 : .try_insert(timeline.clone())?;
250 :
251 : // Write the new timeline to the disk and start background workers.
252 : // Bootstrap is transactional, so if it fails, the timeline will be deleted,
253 : // and the state on disk should remain unchanged.
254 2602 : if let Err(e) = timeline.init_new(&mut shared_state, &conf).await {
255 : // Note: the most likely reason for init failure is that the timeline
256 : // directory already exists on disk. This happens when timeline is corrupted
257 : // and wasn't loaded from disk on startup because of that. We want to preserve
258 : // the timeline directory in this case, for further inspection.
259 :
260 : // TODO: this is an unusual error, perhaps we should send it to sentry
261 : // TODO: compute will try to create timeline every second, we should add backoff
262 UBC 0 : error!("failed to init new timeline {}: {}", ttid, e);
263 :
264 : // Timeline failed to init, it cannot be used. Remove it from the map.
265 0 : TIMELINES_STATE.lock().unwrap().timelines.remove(&ttid);
266 0 : return Err(e);
267 CBC 481 : }
268 481 : // We are done with bootstrap, release the lock, return the timeline.
269 481 : // {} block forces release before .await
270 481 : }
271 481 : timeline.update_status_notify().await?;
272 481 : timeline.wal_backup_launcher_tx.send(timeline.ttid).await?;
273 481 : Ok(timeline)
274 2024 : }
275 :
276 : /// Get a timeline from the global map. If it's not present, it doesn't exist on disk,
277 : /// or was corrupted and couldn't be loaded on startup. Returned timeline is always valid,
278 : /// i.e. loaded in memory and not cancelled.
279 12403 : pub fn get(ttid: TenantTimelineId) -> Result<Arc<Timeline>, TimelineError> {
280 12403 : let res = TIMELINES_STATE.lock().unwrap().get(&ttid);
281 12403 :
282 12403 : match res {
283 11930 : Ok(tli) => {
284 11930 : if tli.is_cancelled() {
285 UBC 0 : return Err(TimelineError::Cancelled(ttid));
286 CBC 11930 : }
287 11930 : Ok(tli)
288 : }
289 473 : _ => res,
290 : }
291 12403 : }
292 :
293 : /// Returns all timelines. This is used for background timeline processes.
294 8566 : pub fn get_all() -> Vec<Arc<Timeline>> {
295 8566 : let global_lock = TIMELINES_STATE.lock().unwrap();
296 8566 : global_lock
297 8566 : .timelines
298 8566 : .values()
299 8566 : .cloned()
300 8566 : .filter(|t| !t.is_cancelled())
301 8566 : .collect()
302 8566 : }
303 :
304 : /// Returns all timelines belonging to a given tenant. Used for deleting all timelines of a tenant,
305 : /// and that's why it can return cancelled timelines, to retry deleting them.
306 4 : fn get_all_for_tenant(tenant_id: TenantId) -> Vec<Arc<Timeline>> {
307 4 : let global_lock = TIMELINES_STATE.lock().unwrap();
308 4 : global_lock
309 4 : .timelines
310 4 : .values()
311 20 : .filter(|t| t.ttid.tenant_id == tenant_id)
312 4 : .cloned()
313 4 : .collect()
314 4 : }
315 :
316 : /// Cancels timeline, then deletes the corresponding data directory.
317 33 : pub async fn delete_force(ttid: &TenantTimelineId) -> Result<TimelineDeleteForceResult> {
318 33 : let tli_res = TIMELINES_STATE.lock().unwrap().get(ttid);
319 33 : match tli_res {
320 31 : Ok(timeline) => {
321 : // Take a lock and finish the deletion holding this mutex.
322 31 : let mut shared_state = timeline.write_shared_state().await;
323 :
324 31 : info!("deleting timeline {}", ttid);
325 31 : let (dir_existed, was_active) =
326 31 : timeline.delete_from_disk(&mut shared_state).await?;
327 :
328 : // Remove timeline from the map.
329 : // FIXME: re-enable it once we fix the issue with recreation of deleted timelines
330 : // https://github.com/neondatabase/neon/issues/3146
331 : // TIMELINES_STATE.lock().unwrap().timelines.remove(ttid);
332 :
333 31 : Ok(TimelineDeleteForceResult {
334 31 : dir_existed,
335 31 : was_active,
336 31 : })
337 : }
338 : Err(_) => {
339 : // Timeline is not memory, but it may still exist on disk in broken state.
340 2 : let dir_path = TIMELINES_STATE
341 2 : .lock()
342 2 : .unwrap()
343 2 : .get_conf()
344 2 : .timeline_dir(ttid);
345 2 : let dir_existed = delete_dir(dir_path)?;
346 :
347 2 : Ok(TimelineDeleteForceResult {
348 2 : dir_existed,
349 2 : was_active: false,
350 2 : })
351 : }
352 : }
353 33 : }
354 :
355 : /// Deactivates and deletes all timelines for the tenant. Returns map of all timelines which
356 : /// the tenant had, `true` if a timeline was active. There may be a race if new timelines are
357 : /// created simultaneously. In that case the function will return error and the caller should
358 : /// retry tenant deletion again later.
359 4 : pub async fn delete_force_all_for_tenant(
360 4 : tenant_id: &TenantId,
361 4 : ) -> Result<HashMap<TenantTimelineId, TimelineDeleteForceResult>> {
362 4 : info!("deleting all timelines for tenant {}", tenant_id);
363 4 : let to_delete = Self::get_all_for_tenant(*tenant_id);
364 4 :
365 4 : let mut err = None;
366 4 :
367 4 : let mut deleted = HashMap::new();
368 20 : for tli in &to_delete {
369 16 : match Self::delete_force(&tli.ttid).await {
370 16 : Ok(result) => {
371 16 : deleted.insert(tli.ttid, result);
372 16 : }
373 UBC 0 : Err(e) => {
374 0 : error!("failed to delete timeline {}: {}", tli.ttid, e);
375 : // Save error to return later.
376 0 : err = Some(e);
377 : }
378 : }
379 : }
380 :
381 : // If there was an error, return it.
382 CBC 4 : if let Some(e) = err {
383 UBC 0 : return Err(e);
384 CBC 4 : }
385 4 :
386 4 : // There may be broken timelines on disk, so delete the whole tenant dir as well.
387 4 : // Note that we could concurrently create new timelines while we were deleting them,
388 4 : // so the directory may be not empty. In this case timelines will have bad state
389 4 : // and timeline background jobs can panic.
390 4 : delete_dir(
391 4 : TIMELINES_STATE
392 4 : .lock()
393 4 : .unwrap()
394 4 : .get_conf()
395 4 : .tenant_dir(tenant_id),
396 4 : )?;
397 :
398 : // FIXME: we temporarily disabled removing timelines from the map, see `delete_force`
399 : // let tlis_after_delete = Self::get_all_for_tenant(*tenant_id);
400 : // if !tlis_after_delete.is_empty() {
401 : // // Some timelines were created while we were deleting them, returning error
402 : // // to the caller, so it can retry later.
403 : // bail!(
404 : // "failed to delete all timelines for tenant {}: some timelines were created while we were deleting them",
405 : // tenant_id
406 : // );
407 : // }
408 :
409 4 : Ok(deleted)
410 4 : }
411 : }
412 :
413 33 : #[derive(Clone, Copy, Serialize)]
414 : pub struct TimelineDeleteForceResult {
415 : pub dir_existed: bool,
416 : pub was_active: bool,
417 : }
418 :
419 : /// Deletes directory and it's contents. Returns false if directory does not exist.
420 : fn delete_dir(path: Utf8PathBuf) -> Result<bool> {
421 6 : match std::fs::remove_dir_all(path) {
422 2 : Ok(_) => Ok(true),
423 4 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
424 UBC 0 : Err(e) => Err(e.into()),
425 : }
426 CBC 6 : }
|