Line data Source code
1 : //! This module contains global `(tenant_id, timeline_id)` -> `Arc<Timeline>` mapping.
2 : //! All timelines should always be present in this map, this is done by loading them
3 : //! all from the disk on startup and keeping them in memory.
4 :
5 : use crate::safekeeper::ServerInfo;
6 : use crate::timeline::{get_tenant_dir, get_timeline_dir, Timeline, TimelineError};
7 : use crate::timelines_set::TimelinesSet;
8 : use crate::SafeKeeperConf;
9 : use anyhow::{bail, Context, Result};
10 : use camino::Utf8PathBuf;
11 : use once_cell::sync::Lazy;
12 : use serde::Serialize;
13 : use std::collections::HashMap;
14 : use std::str::FromStr;
15 : use std::sync::atomic::Ordering;
16 : use std::sync::{Arc, Mutex};
17 : use tracing::*;
18 : use utils::id::{TenantId, TenantTimelineId, TimelineId};
19 : use utils::lsn::Lsn;
20 :
21 : struct GlobalTimelinesState {
22 : timelines: HashMap<TenantTimelineId, Arc<Timeline>>,
23 : conf: Option<SafeKeeperConf>,
24 : broker_active_set: Arc<TimelinesSet>,
25 : load_lock: Arc<tokio::sync::Mutex<TimelineLoadLock>>,
26 : }
27 :
28 : // Used to prevent concurrent timeline loading.
29 : pub struct TimelineLoadLock;
30 :
31 : impl GlobalTimelinesState {
32 : /// Get configuration, which must be set once during init.
33 0 : fn get_conf(&self) -> &SafeKeeperConf {
34 0 : self.conf
35 0 : .as_ref()
36 0 : .expect("GlobalTimelinesState conf is not initialized")
37 0 : }
38 :
39 : /// Get dependencies for a timeline constructor.
40 0 : fn get_dependencies(&self) -> (SafeKeeperConf, Arc<TimelinesSet>) {
41 0 : (self.get_conf().clone(), self.broker_active_set.clone())
42 0 : }
43 :
44 : /// Insert timeline into the map. Returns error if timeline with the same id already exists.
45 0 : fn try_insert(&mut self, timeline: Arc<Timeline>) -> Result<()> {
46 0 : let ttid = timeline.ttid;
47 0 : if self.timelines.contains_key(&ttid) {
48 0 : bail!(TimelineError::AlreadyExists(ttid));
49 0 : }
50 0 : self.timelines.insert(ttid, timeline);
51 0 : Ok(())
52 0 : }
53 :
54 : /// Get timeline from the map. Returns error if timeline doesn't exist.
55 0 : fn get(&self, ttid: &TenantTimelineId) -> Result<Arc<Timeline>, TimelineError> {
56 0 : self.timelines
57 0 : .get(ttid)
58 0 : .cloned()
59 0 : .ok_or(TimelineError::NotFound(*ttid))
60 0 : }
61 : }
62 :
63 0 : static TIMELINES_STATE: Lazy<Mutex<GlobalTimelinesState>> = Lazy::new(|| {
64 0 : Mutex::new(GlobalTimelinesState {
65 0 : timelines: HashMap::new(),
66 0 : conf: None,
67 0 : broker_active_set: Arc::new(TimelinesSet::default()),
68 0 : load_lock: Arc::new(tokio::sync::Mutex::new(TimelineLoadLock)),
69 0 : })
70 0 : });
71 :
72 : /// A zero-sized struct used to manage access to the global timelines map.
73 : pub struct GlobalTimelines;
74 :
75 : impl GlobalTimelines {
76 : /// Inject dependencies needed for the timeline constructors and load all timelines to memory.
77 0 : pub async fn init(conf: SafeKeeperConf) -> Result<()> {
78 0 : // clippy isn't smart enough to understand that drop(state) releases the
79 0 : // lock, so use explicit block
80 0 : let tenants_dir = {
81 0 : let mut state = TIMELINES_STATE.lock().unwrap();
82 0 : state.conf = Some(conf);
83 0 :
84 0 : // Iterate through all directories and load tenants for all directories
85 0 : // named as a valid tenant_id.
86 0 : state.get_conf().workdir.clone()
87 0 : };
88 0 : let mut tenant_count = 0;
89 0 : for tenants_dir_entry in std::fs::read_dir(&tenants_dir)
90 0 : .with_context(|| format!("failed to list tenants dir {}", tenants_dir))?
91 : {
92 0 : match &tenants_dir_entry {
93 0 : Ok(tenants_dir_entry) => {
94 0 : if let Ok(tenant_id) =
95 0 : TenantId::from_str(tenants_dir_entry.file_name().to_str().unwrap_or(""))
96 : {
97 0 : tenant_count += 1;
98 0 : GlobalTimelines::load_tenant_timelines(tenant_id).await?;
99 0 : }
100 : }
101 0 : Err(e) => error!(
102 0 : "failed to list tenants dir entry {:?} in directory {}, reason: {:?}",
103 : tenants_dir_entry, tenants_dir, e
104 : ),
105 : }
106 : }
107 :
108 0 : info!(
109 0 : "found {} tenants directories, successfully loaded {} timelines",
110 0 : tenant_count,
111 0 : TIMELINES_STATE.lock().unwrap().timelines.len()
112 : );
113 0 : Ok(())
114 0 : }
115 :
116 : /// Loads all timelines for the given tenant to memory. Returns fs::read_dir
117 : /// errors if any.
118 : ///
119 : /// It is async for update_status_notify sake. Since TIMELINES_STATE lock is
120 : /// sync and there is no important reason to make it async (it is always
121 : /// held for a short while) we just lock and unlock it for each timeline --
122 : /// this function is called during init when nothing else is running, so
123 : /// this is fine.
124 0 : async fn load_tenant_timelines(tenant_id: TenantId) -> Result<()> {
125 0 : let (conf, broker_active_set) = {
126 0 : let state = TIMELINES_STATE.lock().unwrap();
127 0 : state.get_dependencies()
128 0 : };
129 0 :
130 0 : let timelines_dir = get_tenant_dir(&conf, &tenant_id);
131 0 : for timelines_dir_entry in std::fs::read_dir(&timelines_dir)
132 0 : .with_context(|| format!("failed to list timelines dir {}", timelines_dir))?
133 : {
134 0 : match &timelines_dir_entry {
135 0 : Ok(timeline_dir_entry) => {
136 0 : if let Ok(timeline_id) =
137 0 : TimelineId::from_str(timeline_dir_entry.file_name().to_str().unwrap_or(""))
138 : {
139 0 : let ttid = TenantTimelineId::new(tenant_id, timeline_id);
140 0 : match Timeline::load_timeline(&conf, ttid) {
141 0 : Ok(timeline) => {
142 0 : let tli = Arc::new(timeline);
143 0 : TIMELINES_STATE
144 0 : .lock()
145 0 : .unwrap()
146 0 : .timelines
147 0 : .insert(ttid, tli.clone());
148 0 : tli.bootstrap(&conf, broker_active_set.clone());
149 0 : }
150 : // If we can't load a timeline, it's most likely because of a corrupted
151 : // directory. We will log an error and won't allow to delete/recreate
152 : // this timeline. The only way to fix this timeline is to repair manually
153 : // and restart the safekeeper.
154 0 : Err(e) => error!(
155 0 : "failed to load timeline {} for tenant {}, reason: {:?}",
156 : timeline_id, tenant_id, e
157 : ),
158 : }
159 0 : }
160 : }
161 0 : Err(e) => error!(
162 0 : "failed to list timelines dir entry {:?} in directory {}, reason: {:?}",
163 : timelines_dir_entry, timelines_dir, e
164 : ),
165 : }
166 : }
167 :
168 0 : Ok(())
169 0 : }
170 :
171 : /// Take a lock for timeline loading.
172 0 : pub async fn loading_lock() -> Arc<tokio::sync::Mutex<TimelineLoadLock>> {
173 0 : TIMELINES_STATE.lock().unwrap().load_lock.clone()
174 0 : }
175 :
176 : /// Load timeline from disk to the memory.
177 0 : pub async fn load_timeline<'a>(
178 0 : _guard: &tokio::sync::MutexGuard<'a, TimelineLoadLock>,
179 0 : ttid: TenantTimelineId,
180 0 : ) -> Result<Arc<Timeline>> {
181 0 : let (conf, broker_active_set) = TIMELINES_STATE.lock().unwrap().get_dependencies();
182 0 :
183 0 : match Timeline::load_timeline(&conf, ttid) {
184 0 : Ok(timeline) => {
185 0 : let tli = Arc::new(timeline);
186 0 :
187 0 : // TODO: prevent concurrent timeline creation/loading
188 0 : TIMELINES_STATE
189 0 : .lock()
190 0 : .unwrap()
191 0 : .timelines
192 0 : .insert(ttid, tli.clone());
193 0 :
194 0 : tli.bootstrap(&conf, broker_active_set);
195 0 :
196 0 : Ok(tli)
197 : }
198 : // If we can't load a timeline, it's bad. Caller will figure it out.
199 0 : Err(e) => bail!("failed to load timeline {}, reason: {:?}", ttid, e),
200 : }
201 0 : }
202 :
203 : /// Get the number of timelines in the map.
204 0 : pub fn timelines_count() -> usize {
205 0 : TIMELINES_STATE.lock().unwrap().timelines.len()
206 0 : }
207 :
208 : /// Get the global safekeeper config.
209 0 : pub fn get_global_config() -> SafeKeeperConf {
210 0 : TIMELINES_STATE.lock().unwrap().get_conf().clone()
211 0 : }
212 :
213 0 : pub fn get_global_broker_active_set() -> Arc<TimelinesSet> {
214 0 : TIMELINES_STATE.lock().unwrap().broker_active_set.clone()
215 0 : }
216 :
217 : /// Create a new timeline with the given id. If the timeline already exists, returns
218 : /// an existing timeline.
219 0 : pub async fn create(
220 0 : ttid: TenantTimelineId,
221 0 : server_info: ServerInfo,
222 0 : commit_lsn: Lsn,
223 0 : local_start_lsn: Lsn,
224 0 : ) -> Result<Arc<Timeline>> {
225 0 : let (conf, broker_active_set) = {
226 0 : let state = TIMELINES_STATE.lock().unwrap();
227 0 : if let Ok(timeline) = state.get(&ttid) {
228 : // Timeline already exists, return it.
229 0 : return Ok(timeline);
230 0 : }
231 0 : state.get_dependencies()
232 0 : };
233 0 :
234 0 : info!("creating new timeline {}", ttid);
235 :
236 0 : let timeline = Arc::new(Timeline::create_empty(
237 0 : &conf,
238 0 : ttid,
239 0 : server_info,
240 0 : commit_lsn,
241 0 : local_start_lsn,
242 0 : )?);
243 :
244 : // Take a lock and finish the initialization holding this mutex. No other threads
245 : // can interfere with creation after we will insert timeline into the map.
246 : {
247 0 : let mut shared_state = timeline.write_shared_state().await;
248 :
249 : // We can get a race condition here in case of concurrent create calls, but only
250 : // in theory. create() will return valid timeline on the next try.
251 0 : TIMELINES_STATE
252 0 : .lock()
253 0 : .unwrap()
254 0 : .try_insert(timeline.clone())?;
255 :
256 : // Write the new timeline to the disk and start background workers.
257 : // Bootstrap is transactional, so if it fails, the timeline will be deleted,
258 : // and the state on disk should remain unchanged.
259 0 : if let Err(e) = timeline
260 0 : .init_new(&mut shared_state, &conf, broker_active_set)
261 0 : .await
262 : {
263 : // Note: the most likely reason for init failure is that the timeline
264 : // directory already exists on disk. This happens when timeline is corrupted
265 : // and wasn't loaded from disk on startup because of that. We want to preserve
266 : // the timeline directory in this case, for further inspection.
267 :
268 : // TODO: this is an unusual error, perhaps we should send it to sentry
269 : // TODO: compute will try to create timeline every second, we should add backoff
270 0 : error!("failed to init new timeline {}: {}", ttid, e);
271 :
272 : // Timeline failed to init, it cannot be used. Remove it from the map.
273 0 : TIMELINES_STATE.lock().unwrap().timelines.remove(&ttid);
274 0 : return Err(e);
275 0 : }
276 0 : // We are done with bootstrap, release the lock, return the timeline.
277 0 : // {} block forces release before .await
278 0 : }
279 0 : Ok(timeline)
280 0 : }
281 :
282 : /// Get a timeline from the global map. If it's not present, it doesn't exist on disk,
283 : /// or was corrupted and couldn't be loaded on startup. Returned timeline is always valid,
284 : /// i.e. loaded in memory and not cancelled.
285 0 : pub fn get(ttid: TenantTimelineId) -> Result<Arc<Timeline>, TimelineError> {
286 0 : let res = TIMELINES_STATE.lock().unwrap().get(&ttid);
287 0 :
288 0 : match res {
289 0 : Ok(tli) => {
290 0 : if tli.is_cancelled() {
291 0 : return Err(TimelineError::Cancelled(ttid));
292 0 : }
293 0 : Ok(tli)
294 : }
295 0 : _ => res,
296 : }
297 0 : }
298 :
299 : /// Returns all timelines. This is used for background timeline processes.
300 0 : pub fn get_all() -> Vec<Arc<Timeline>> {
301 0 : let global_lock = TIMELINES_STATE.lock().unwrap();
302 0 : global_lock
303 0 : .timelines
304 0 : .values()
305 0 : .filter(|t| !t.is_cancelled())
306 0 : .cloned()
307 0 : .collect()
308 0 : }
309 :
310 : /// Returns all timelines belonging to a given tenant. Used for deleting all timelines of a tenant,
311 : /// and that's why it can return cancelled timelines, to retry deleting them.
312 0 : fn get_all_for_tenant(tenant_id: TenantId) -> Vec<Arc<Timeline>> {
313 0 : let global_lock = TIMELINES_STATE.lock().unwrap();
314 0 : global_lock
315 0 : .timelines
316 0 : .values()
317 0 : .filter(|t| t.ttid.tenant_id == tenant_id)
318 0 : .cloned()
319 0 : .collect()
320 0 : }
321 :
322 : /// Cancels timeline, then deletes the corresponding data directory.
323 : /// If only_local, doesn't remove WAL segments in remote storage.
324 0 : pub async fn delete(
325 0 : ttid: &TenantTimelineId,
326 0 : only_local: bool,
327 0 : ) -> Result<TimelineDeleteForceResult> {
328 0 : let tli_res = TIMELINES_STATE.lock().unwrap().get(ttid);
329 0 : match tli_res {
330 0 : Ok(timeline) => {
331 0 : let was_active = timeline.broker_active.load(Ordering::Relaxed);
332 :
333 : // Take a lock and finish the deletion holding this mutex.
334 0 : let mut shared_state = timeline.write_shared_state().await;
335 :
336 0 : info!("deleting timeline {}, only_local={}", ttid, only_local);
337 0 : let dir_existed = timeline.delete(&mut shared_state, only_local).await?;
338 :
339 : // Remove timeline from the map.
340 : // FIXME: re-enable it once we fix the issue with recreation of deleted timelines
341 : // https://github.com/neondatabase/neon/issues/3146
342 : // TIMELINES_STATE.lock().unwrap().timelines.remove(ttid);
343 :
344 0 : Ok(TimelineDeleteForceResult {
345 0 : dir_existed,
346 0 : was_active, // TODO: we probably should remove this field
347 0 : })
348 : }
349 : Err(_) => {
350 : // Timeline is not memory, but it may still exist on disk in broken state.
351 0 : let dir_path = get_timeline_dir(TIMELINES_STATE.lock().unwrap().get_conf(), ttid);
352 0 : let dir_existed = delete_dir(dir_path)?;
353 :
354 0 : Ok(TimelineDeleteForceResult {
355 0 : dir_existed,
356 0 : was_active: false,
357 0 : })
358 : }
359 : }
360 0 : }
361 :
362 : /// Deactivates and deletes all timelines for the tenant. Returns map of all timelines which
363 : /// the tenant had, `true` if a timeline was active. There may be a race if new timelines are
364 : /// created simultaneously. In that case the function will return error and the caller should
365 : /// retry tenant deletion again later.
366 : ///
367 : /// If only_local, doesn't remove WAL segments in remote storage.
368 0 : pub async fn delete_force_all_for_tenant(
369 0 : tenant_id: &TenantId,
370 0 : only_local: bool,
371 0 : ) -> Result<HashMap<TenantTimelineId, TimelineDeleteForceResult>> {
372 0 : info!("deleting all timelines for tenant {}", tenant_id);
373 0 : let to_delete = Self::get_all_for_tenant(*tenant_id);
374 0 :
375 0 : let mut err = None;
376 0 :
377 0 : let mut deleted = HashMap::new();
378 0 : for tli in &to_delete {
379 0 : match Self::delete(&tli.ttid, only_local).await {
380 0 : Ok(result) => {
381 0 : deleted.insert(tli.ttid, result);
382 0 : }
383 0 : Err(e) => {
384 0 : error!("failed to delete timeline {}: {}", tli.ttid, e);
385 : // Save error to return later.
386 0 : err = Some(e);
387 : }
388 : }
389 : }
390 :
391 : // If there was an error, return it.
392 0 : if let Some(e) = err {
393 0 : return Err(e);
394 0 : }
395 0 :
396 0 : // There may be broken timelines on disk, so delete the whole tenant dir as well.
397 0 : // Note that we could concurrently create new timelines while we were deleting them,
398 0 : // so the directory may be not empty. In this case timelines will have bad state
399 0 : // and timeline background jobs can panic.
400 0 : delete_dir(get_tenant_dir(
401 0 : TIMELINES_STATE.lock().unwrap().get_conf(),
402 0 : tenant_id,
403 0 : ))?;
404 :
405 : // FIXME: we temporarily disabled removing timelines from the map, see `delete_force`
406 : // let tlis_after_delete = Self::get_all_for_tenant(*tenant_id);
407 : // if !tlis_after_delete.is_empty() {
408 : // // Some timelines were created while we were deleting them, returning error
409 : // // to the caller, so it can retry later.
410 : // bail!(
411 : // "failed to delete all timelines for tenant {}: some timelines were created while we were deleting them",
412 : // tenant_id
413 : // );
414 : // }
415 :
416 0 : Ok(deleted)
417 0 : }
418 : }
419 :
420 : #[derive(Clone, Copy, Serialize)]
421 : pub struct TimelineDeleteForceResult {
422 : pub dir_existed: bool,
423 : pub was_active: bool,
424 : }
425 :
426 : /// Deletes directory and it's contents. Returns false if directory does not exist.
427 0 : fn delete_dir(path: Utf8PathBuf) -> Result<bool> {
428 0 : match std::fs::remove_dir_all(path) {
429 0 : Ok(_) => Ok(true),
430 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
431 0 : Err(e) => Err(e.into()),
432 : }
433 0 : }
|