Line data Source code
1 : //! This module contains global `(tenant_id, timeline_id)` -> `Arc<Timeline>` mapping.
2 : //! All timelines should always be present in this map, this is done by loading them
3 : //! all from the disk on startup and keeping them in memory.
4 :
5 : use crate::defaults::DEFAULT_EVICTION_CONCURRENCY;
6 : use crate::rate_limit::RateLimiter;
7 : use crate::state::TimelinePersistentState;
8 : use crate::timeline::{get_tenant_dir, get_timeline_dir, Timeline, TimelineError};
9 : use crate::timelines_set::TimelinesSet;
10 : use crate::wal_storage::Storage;
11 : use crate::{control_file, wal_storage, SafeKeeperConf};
12 : use anyhow::{bail, Context, Result};
13 : use camino::Utf8PathBuf;
14 : use camino_tempfile::Utf8TempDir;
15 : use safekeeper_api::ServerInfo;
16 : use serde::Serialize;
17 : use std::collections::HashMap;
18 : use std::str::FromStr;
19 : use std::sync::atomic::Ordering;
20 : use std::sync::{Arc, Mutex};
21 : use std::time::{Duration, Instant};
22 : use tokio::fs;
23 : use tracing::*;
24 : use utils::crashsafe::{durable_rename, fsync_async_opt};
25 : use utils::id::{TenantId, TenantTimelineId, TimelineId};
26 : use utils::lsn::Lsn;
27 :
28 : // Timeline entry in the global map: either a ready timeline, or mark that it is
29 : // being created.
30 : #[derive(Clone)]
31 : enum GlobalMapTimeline {
32 : CreationInProgress,
33 : Timeline(Arc<Timeline>),
34 : }
35 :
36 : struct GlobalTimelinesState {
37 : timelines: HashMap<TenantTimelineId, GlobalMapTimeline>,
38 :
39 : // A tombstone indicates this timeline used to exist has been deleted. These are used to prevent
40 : // on-demand timeline creation from recreating deleted timelines. This is only soft-enforced, as
41 : // this map is dropped on restart.
42 : tombstones: HashMap<TenantTimelineId, Instant>,
43 :
44 : conf: Arc<SafeKeeperConf>,
45 : broker_active_set: Arc<TimelinesSet>,
46 : global_rate_limiter: RateLimiter,
47 : }
48 :
49 : impl GlobalTimelinesState {
50 : /// Get dependencies for a timeline constructor.
51 0 : fn get_dependencies(&self) -> (Arc<SafeKeeperConf>, Arc<TimelinesSet>, RateLimiter) {
52 0 : (
53 0 : self.conf.clone(),
54 0 : self.broker_active_set.clone(),
55 0 : self.global_rate_limiter.clone(),
56 0 : )
57 0 : }
58 :
59 : /// Get timeline from the map. Returns error if timeline doesn't exist or
60 : /// creation is in progress.
61 0 : fn get(&self, ttid: &TenantTimelineId) -> Result<Arc<Timeline>, TimelineError> {
62 0 : match self.timelines.get(ttid).cloned() {
63 0 : Some(GlobalMapTimeline::Timeline(tli)) => Ok(tli),
64 : Some(GlobalMapTimeline::CreationInProgress) => {
65 0 : Err(TimelineError::CreationInProgress(*ttid))
66 : }
67 0 : None => Err(TimelineError::NotFound(*ttid)),
68 : }
69 0 : }
70 :
71 0 : fn delete(&mut self, ttid: TenantTimelineId) {
72 0 : self.timelines.remove(&ttid);
73 0 : self.tombstones.insert(ttid, Instant::now());
74 0 : }
75 : }
76 :
77 : /// A struct used to manage access to the global timelines map.
78 : pub struct GlobalTimelines {
79 : state: Mutex<GlobalTimelinesState>,
80 : }
81 :
82 : impl GlobalTimelines {
83 : /// Create a new instance of the global timelines map.
84 0 : pub fn new(conf: Arc<SafeKeeperConf>) -> Self {
85 0 : Self {
86 0 : state: Mutex::new(GlobalTimelinesState {
87 0 : timelines: HashMap::new(),
88 0 : tombstones: HashMap::new(),
89 0 : conf,
90 0 : broker_active_set: Arc::new(TimelinesSet::default()),
91 0 : global_rate_limiter: RateLimiter::new(1, 1),
92 0 : }),
93 0 : }
94 0 : }
95 :
96 : /// Inject dependencies needed for the timeline constructors and load all timelines to memory.
97 0 : pub async fn init(&self) -> Result<()> {
98 0 : // clippy isn't smart enough to understand that drop(state) releases the
99 0 : // lock, so use explicit block
100 0 : let tenants_dir = {
101 0 : let mut state = self.state.lock().unwrap();
102 0 : state.global_rate_limiter = RateLimiter::new(
103 0 : state.conf.partial_backup_concurrency,
104 0 : DEFAULT_EVICTION_CONCURRENCY,
105 0 : );
106 0 :
107 0 : // Iterate through all directories and load tenants for all directories
108 0 : // named as a valid tenant_id.
109 0 : state.conf.workdir.clone()
110 0 : };
111 0 : let mut tenant_count = 0;
112 0 : for tenants_dir_entry in std::fs::read_dir(&tenants_dir)
113 0 : .with_context(|| format!("failed to list tenants dir {}", tenants_dir))?
114 : {
115 0 : match &tenants_dir_entry {
116 0 : Ok(tenants_dir_entry) => {
117 0 : if let Ok(tenant_id) =
118 0 : TenantId::from_str(tenants_dir_entry.file_name().to_str().unwrap_or(""))
119 : {
120 0 : tenant_count += 1;
121 0 : self.load_tenant_timelines(tenant_id).await?;
122 0 : }
123 : }
124 0 : Err(e) => error!(
125 0 : "failed to list tenants dir entry {:?} in directory {}, reason: {:?}",
126 : tenants_dir_entry, tenants_dir, e
127 : ),
128 : }
129 : }
130 :
131 0 : info!(
132 0 : "found {} tenants directories, successfully loaded {} timelines",
133 0 : tenant_count,
134 0 : self.state.lock().unwrap().timelines.len()
135 : );
136 0 : Ok(())
137 0 : }
138 :
139 : /// Loads all timelines for the given tenant to memory. Returns fs::read_dir
140 : /// errors if any.
141 : ///
142 : /// It is async, but self.state lock is sync and there is no important
143 : /// reason to make it async (it is always held for a short while), so we
144 : /// just lock and unlock it for each timeline -- this function is called
145 : /// during init when nothing else is running, so this is fine.
146 0 : async fn load_tenant_timelines(&self, tenant_id: TenantId) -> Result<()> {
147 0 : let (conf, broker_active_set, partial_backup_rate_limiter) = {
148 0 : let state = self.state.lock().unwrap();
149 0 : state.get_dependencies()
150 0 : };
151 0 :
152 0 : let timelines_dir = get_tenant_dir(&conf, &tenant_id);
153 0 : for timelines_dir_entry in std::fs::read_dir(&timelines_dir)
154 0 : .with_context(|| format!("failed to list timelines dir {}", timelines_dir))?
155 : {
156 0 : match &timelines_dir_entry {
157 0 : Ok(timeline_dir_entry) => {
158 0 : if let Ok(timeline_id) =
159 0 : TimelineId::from_str(timeline_dir_entry.file_name().to_str().unwrap_or(""))
160 : {
161 0 : let ttid = TenantTimelineId::new(tenant_id, timeline_id);
162 0 : match Timeline::load_timeline(conf.clone(), ttid) {
163 0 : Ok(tli) => {
164 0 : let mut shared_state = tli.write_shared_state().await;
165 0 : self.state
166 0 : .lock()
167 0 : .unwrap()
168 0 : .timelines
169 0 : .insert(ttid, GlobalMapTimeline::Timeline(tli.clone()));
170 0 : tli.bootstrap(
171 0 : &mut shared_state,
172 0 : &conf,
173 0 : broker_active_set.clone(),
174 0 : partial_backup_rate_limiter.clone(),
175 0 : );
176 : }
177 : // If we can't load a timeline, it's most likely because of a corrupted
178 : // directory. We will log an error and won't allow to delete/recreate
179 : // this timeline. The only way to fix this timeline is to repair manually
180 : // and restart the safekeeper.
181 0 : Err(e) => error!(
182 0 : "failed to load timeline {} for tenant {}, reason: {:?}",
183 : timeline_id, tenant_id, e
184 : ),
185 : }
186 0 : }
187 : }
188 0 : Err(e) => error!(
189 0 : "failed to list timelines dir entry {:?} in directory {}, reason: {:?}",
190 : timelines_dir_entry, timelines_dir, e
191 : ),
192 : }
193 : }
194 :
195 0 : Ok(())
196 0 : }
197 :
198 : /// Get the number of timelines in the map.
199 0 : pub fn timelines_count(&self) -> usize {
200 0 : self.state.lock().unwrap().timelines.len()
201 0 : }
202 :
203 : /// Get the global safekeeper config.
204 0 : pub fn get_global_config(&self) -> Arc<SafeKeeperConf> {
205 0 : self.state.lock().unwrap().conf.clone()
206 0 : }
207 :
208 0 : pub fn get_global_broker_active_set(&self) -> Arc<TimelinesSet> {
209 0 : self.state.lock().unwrap().broker_active_set.clone()
210 0 : }
211 :
212 : /// Create a new timeline with the given id. If the timeline already exists, returns
213 : /// an existing timeline.
214 0 : pub(crate) async fn create(
215 0 : &self,
216 0 : ttid: TenantTimelineId,
217 0 : server_info: ServerInfo,
218 0 : commit_lsn: Lsn,
219 0 : local_start_lsn: Lsn,
220 0 : ) -> Result<Arc<Timeline>> {
221 0 : let (conf, _, _) = {
222 0 : let state = self.state.lock().unwrap();
223 0 : if let Ok(timeline) = state.get(&ttid) {
224 : // Timeline already exists, return it.
225 0 : return Ok(timeline);
226 0 : }
227 0 :
228 0 : if state.tombstones.contains_key(&ttid) {
229 0 : anyhow::bail!("Timeline {ttid} is deleted, refusing to recreate");
230 0 : }
231 0 :
232 0 : state.get_dependencies()
233 0 : };
234 0 :
235 0 : info!("creating new timeline {}", ttid);
236 :
237 : // Do on disk initialization in tmp dir.
238 0 : let (_tmp_dir, tmp_dir_path) = create_temp_timeline_dir(&conf, ttid).await?;
239 :
240 : // TODO: currently we create only cfile. It would be reasonable to
241 : // immediately initialize first WAL segment as well.
242 0 : let state =
243 0 : TimelinePersistentState::new(&ttid, server_info, vec![], commit_lsn, local_start_lsn)?;
244 0 : control_file::FileStorage::create_new(&tmp_dir_path, state, conf.no_sync).await?;
245 0 : let timeline = self.load_temp_timeline(ttid, &tmp_dir_path, true).await?;
246 0 : Ok(timeline)
247 0 : }
248 :
249 : /// Move timeline from a temp directory to the main storage, and load it to
250 : /// the global map. Creating timeline in this way ensures atomicity: rename
251 : /// is atomic, so either move of the whole datadir succeeds or it doesn't,
252 : /// but corrupted data dir shouldn't be possible.
253 : ///
254 : /// We'd like to avoid holding map lock while doing IO, so it's a 3 step
255 : /// process:
256 : /// 1) check the global map that timeline doesn't exist and mark that we're
257 : /// creating it;
258 : /// 2) move the directory and load the timeline
259 : /// 3) take lock again and insert the timeline into the global map.
260 0 : pub async fn load_temp_timeline(
261 0 : &self,
262 0 : ttid: TenantTimelineId,
263 0 : tmp_path: &Utf8PathBuf,
264 0 : check_tombstone: bool,
265 0 : ) -> Result<Arc<Timeline>> {
266 : // Check for existence and mark that we're creating it.
267 0 : let (conf, broker_active_set, partial_backup_rate_limiter) = {
268 0 : let mut state = self.state.lock().unwrap();
269 0 : match state.timelines.get(&ttid) {
270 : Some(GlobalMapTimeline::CreationInProgress) => {
271 0 : bail!(TimelineError::CreationInProgress(ttid));
272 : }
273 : Some(GlobalMapTimeline::Timeline(_)) => {
274 0 : bail!(TimelineError::AlreadyExists(ttid));
275 : }
276 0 : _ => {}
277 0 : }
278 0 : if check_tombstone {
279 0 : if state.tombstones.contains_key(&ttid) {
280 0 : anyhow::bail!("timeline {ttid} is deleted, refusing to recreate");
281 0 : }
282 : } else {
283 : // We may be have been asked to load a timeline that was previously deleted (e.g. from `pull_timeline.rs`). We trust
284 : // that the human doing this manual intervention knows what they are doing, and remove its tombstone.
285 0 : if state.tombstones.remove(&ttid).is_some() {
286 0 : warn!("un-deleted timeline {ttid}");
287 0 : }
288 : }
289 0 : state
290 0 : .timelines
291 0 : .insert(ttid, GlobalMapTimeline::CreationInProgress);
292 0 : state.get_dependencies()
293 0 : };
294 0 :
295 0 : // Do the actual move and reflect the result in the map.
296 0 : match GlobalTimelines::install_temp_timeline(ttid, tmp_path, conf.clone()).await {
297 0 : Ok(timeline) => {
298 0 : let mut timeline_shared_state = timeline.write_shared_state().await;
299 0 : let mut state = self.state.lock().unwrap();
300 0 : assert!(matches!(
301 0 : state.timelines.get(&ttid),
302 : Some(GlobalMapTimeline::CreationInProgress)
303 : ));
304 :
305 0 : state
306 0 : .timelines
307 0 : .insert(ttid, GlobalMapTimeline::Timeline(timeline.clone()));
308 0 : drop(state);
309 0 : timeline.bootstrap(
310 0 : &mut timeline_shared_state,
311 0 : &conf,
312 0 : broker_active_set,
313 0 : partial_backup_rate_limiter,
314 0 : );
315 0 : drop(timeline_shared_state);
316 0 : Ok(timeline)
317 : }
318 0 : Err(e) => {
319 0 : // Init failed, remove the marker from the map
320 0 : let mut state = self.state.lock().unwrap();
321 0 : assert!(matches!(
322 0 : state.timelines.get(&ttid),
323 : Some(GlobalMapTimeline::CreationInProgress)
324 : ));
325 0 : state.timelines.remove(&ttid);
326 0 : Err(e)
327 : }
328 : }
329 0 : }
330 :
331 : /// Main part of load_temp_timeline: do the move and load.
332 0 : async fn install_temp_timeline(
333 0 : ttid: TenantTimelineId,
334 0 : tmp_path: &Utf8PathBuf,
335 0 : conf: Arc<SafeKeeperConf>,
336 0 : ) -> Result<Arc<Timeline>> {
337 0 : let tenant_path = get_tenant_dir(conf.as_ref(), &ttid.tenant_id);
338 0 : let timeline_path = get_timeline_dir(conf.as_ref(), &ttid);
339 0 :
340 0 : // We must have already checked that timeline doesn't exist in the map,
341 0 : // but there might be existing datadir: if timeline is corrupted it is
342 0 : // not loaded. We don't want to overwrite such a dir, so check for its
343 0 : // existence.
344 0 : match fs::metadata(&timeline_path).await {
345 : Ok(_) => {
346 : // Timeline directory exists on disk, we should leave state unchanged
347 : // and return error.
348 0 : bail!(TimelineError::Invalid(ttid));
349 : }
350 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
351 0 : Err(e) => {
352 0 : return Err(e.into());
353 : }
354 : }
355 :
356 0 : info!(
357 0 : "moving timeline {} from {} to {}",
358 : ttid, tmp_path, timeline_path
359 : );
360 :
361 : // Now it is safe to move the timeline directory to the correct
362 : // location. First, create tenant directory. Ignore error if it already
363 : // exists.
364 0 : if let Err(e) = tokio::fs::create_dir(&tenant_path).await {
365 0 : if e.kind() != std::io::ErrorKind::AlreadyExists {
366 0 : return Err(e.into());
367 0 : }
368 0 : }
369 : // fsync it
370 0 : fsync_async_opt(&tenant_path, !conf.no_sync).await?;
371 : // and its creation
372 0 : fsync_async_opt(&conf.workdir, !conf.no_sync).await?;
373 :
374 : // Do the move.
375 0 : durable_rename(tmp_path, &timeline_path, !conf.no_sync).await?;
376 :
377 0 : Timeline::load_timeline(conf, ttid)
378 0 : }
379 :
380 : /// Get a timeline from the global map. If it's not present, it doesn't exist on disk,
381 : /// or was corrupted and couldn't be loaded on startup. Returned timeline is always valid,
382 : /// i.e. loaded in memory and not cancelled.
383 0 : pub(crate) fn get(&self, ttid: TenantTimelineId) -> Result<Arc<Timeline>, TimelineError> {
384 0 : let tli_res = {
385 0 : let state = self.state.lock().unwrap();
386 0 : state.get(&ttid)
387 0 : };
388 0 : match tli_res {
389 0 : Ok(tli) => {
390 0 : if tli.is_cancelled() {
391 0 : return Err(TimelineError::Cancelled(ttid));
392 0 : }
393 0 : Ok(tli)
394 : }
395 0 : _ => tli_res,
396 : }
397 0 : }
398 :
399 : /// Returns all timelines. This is used for background timeline processes.
400 0 : pub fn get_all(&self) -> Vec<Arc<Timeline>> {
401 0 : let global_lock = self.state.lock().unwrap();
402 0 : global_lock
403 0 : .timelines
404 0 : .values()
405 0 : .filter_map(|t| match t {
406 0 : GlobalMapTimeline::Timeline(t) => {
407 0 : if t.is_cancelled() {
408 0 : None
409 : } else {
410 0 : Some(t.clone())
411 : }
412 : }
413 0 : _ => None,
414 0 : })
415 0 : .collect()
416 0 : }
417 :
418 : /// Returns all timelines belonging to a given tenant. Used for deleting all timelines of a tenant,
419 : /// and that's why it can return cancelled timelines, to retry deleting them.
420 0 : fn get_all_for_tenant(&self, tenant_id: TenantId) -> Vec<Arc<Timeline>> {
421 0 : let global_lock = self.state.lock().unwrap();
422 0 : global_lock
423 0 : .timelines
424 0 : .values()
425 0 : .filter_map(|t| match t {
426 0 : GlobalMapTimeline::Timeline(t) => Some(t.clone()),
427 0 : _ => None,
428 0 : })
429 0 : .filter(|t| t.ttid.tenant_id == tenant_id)
430 0 : .collect()
431 0 : }
432 :
433 : /// Cancels timeline, then deletes the corresponding data directory.
434 : /// If only_local, doesn't remove WAL segments in remote storage.
435 0 : pub(crate) async fn delete(
436 0 : &self,
437 0 : ttid: &TenantTimelineId,
438 0 : only_local: bool,
439 0 : ) -> Result<TimelineDeleteForceResult> {
440 0 : let tli_res = {
441 0 : let state = self.state.lock().unwrap();
442 0 :
443 0 : if state.tombstones.contains_key(ttid) {
444 : // Presence of a tombstone guarantees that a previous deletion has completed and there is no work to do.
445 0 : info!("Timeline {ttid} was already deleted");
446 0 : return Ok(TimelineDeleteForceResult {
447 0 : dir_existed: false,
448 0 : was_active: false,
449 0 : });
450 0 : }
451 0 :
452 0 : state.get(ttid)
453 : };
454 :
455 0 : let result = match tli_res {
456 0 : Ok(timeline) => {
457 0 : let was_active = timeline.broker_active.load(Ordering::Relaxed);
458 0 :
459 0 : info!("deleting timeline {}, only_local={}", ttid, only_local);
460 0 : timeline.shutdown().await;
461 :
462 : // Take a lock and finish the deletion holding this mutex.
463 0 : let mut shared_state = timeline.write_shared_state().await;
464 :
465 0 : let dir_existed = timeline.delete(&mut shared_state, only_local).await?;
466 :
467 0 : Ok(TimelineDeleteForceResult {
468 0 : dir_existed,
469 0 : was_active, // TODO: we probably should remove this field
470 0 : })
471 : }
472 : Err(_) => {
473 : // Timeline is not memory, but it may still exist on disk in broken state.
474 0 : let dir_path = get_timeline_dir(self.state.lock().unwrap().conf.as_ref(), ttid);
475 0 : let dir_existed = delete_dir(dir_path)?;
476 :
477 0 : Ok(TimelineDeleteForceResult {
478 0 : dir_existed,
479 0 : was_active: false,
480 0 : })
481 : }
482 : };
483 :
484 : // Finalize deletion, by dropping Timeline objects and storing smaller tombstones. The tombstones
485 : // are used to prevent still-running computes from re-creating the same timeline when they send data,
486 : // and to speed up repeated deletion calls by avoiding re-listing objects.
487 0 : self.state.lock().unwrap().delete(*ttid);
488 0 :
489 0 : result
490 0 : }
491 :
492 : /// Deactivates and deletes all timelines for the tenant. Returns map of all timelines which
493 : /// the tenant had, `true` if a timeline was active. There may be a race if new timelines are
494 : /// created simultaneously. In that case the function will return error and the caller should
495 : /// retry tenant deletion again later.
496 : ///
497 : /// If only_local, doesn't remove WAL segments in remote storage.
498 0 : pub async fn delete_force_all_for_tenant(
499 0 : &self,
500 0 : tenant_id: &TenantId,
501 0 : only_local: bool,
502 0 : ) -> Result<HashMap<TenantTimelineId, TimelineDeleteForceResult>> {
503 0 : info!("deleting all timelines for tenant {}", tenant_id);
504 0 : let to_delete = self.get_all_for_tenant(*tenant_id);
505 0 :
506 0 : let mut err = None;
507 0 :
508 0 : let mut deleted = HashMap::new();
509 0 : for tli in &to_delete {
510 0 : match self.delete(&tli.ttid, only_local).await {
511 0 : Ok(result) => {
512 0 : deleted.insert(tli.ttid, result);
513 0 : }
514 0 : Err(e) => {
515 0 : error!("failed to delete timeline {}: {}", tli.ttid, e);
516 : // Save error to return later.
517 0 : err = Some(e);
518 : }
519 : }
520 : }
521 :
522 : // If there was an error, return it.
523 0 : if let Some(e) = err {
524 0 : return Err(e);
525 0 : }
526 0 :
527 0 : // There may be broken timelines on disk, so delete the whole tenant dir as well.
528 0 : // Note that we could concurrently create new timelines while we were deleting them,
529 0 : // so the directory may be not empty. In this case timelines will have bad state
530 0 : // and timeline background jobs can panic.
531 0 : delete_dir(get_tenant_dir(
532 0 : self.state.lock().unwrap().conf.as_ref(),
533 0 : tenant_id,
534 0 : ))?;
535 :
536 0 : Ok(deleted)
537 0 : }
538 :
539 0 : pub fn housekeeping(&self, tombstone_ttl: &Duration) {
540 0 : let mut state = self.state.lock().unwrap();
541 0 :
542 0 : // We keep tombstones long enough to have a good chance of preventing rogue computes from re-creating deleted
543 0 : // timelines. If a compute kept running for longer than this TTL (or across a safekeeper restart) then they
544 0 : // may recreate a deleted timeline.
545 0 : let now = Instant::now();
546 0 : state
547 0 : .tombstones
548 0 : .retain(|_, v| now.duration_since(*v) < *tombstone_ttl);
549 0 : }
550 : }
551 :
552 : #[derive(Clone, Copy, Serialize)]
553 : pub struct TimelineDeleteForceResult {
554 : pub dir_existed: bool,
555 : pub was_active: bool,
556 : }
557 :
558 : /// Deletes directory and it's contents. Returns false if directory does not exist.
559 0 : fn delete_dir(path: Utf8PathBuf) -> Result<bool> {
560 0 : match std::fs::remove_dir_all(path) {
561 0 : Ok(_) => Ok(true),
562 0 : Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),
563 0 : Err(e) => Err(e.into()),
564 : }
565 0 : }
566 :
567 : /// Create temp directory for a new timeline. It needs to be located on the same
568 : /// filesystem as the rest of the timelines. It will be automatically deleted when
569 : /// Utf8TempDir goes out of scope.
570 0 : pub async fn create_temp_timeline_dir(
571 0 : conf: &SafeKeeperConf,
572 0 : ttid: TenantTimelineId,
573 0 : ) -> Result<(Utf8TempDir, Utf8PathBuf)> {
574 0 : let temp_base = conf.workdir.join("tmp");
575 0 :
576 0 : tokio::fs::create_dir_all(&temp_base).await?;
577 :
578 0 : let tli_dir = camino_tempfile::Builder::new()
579 0 : .suffix("_temptli")
580 0 : .prefix(&format!("{}_{}_", ttid.tenant_id, ttid.timeline_id))
581 0 : .tempdir_in(temp_base)?;
582 :
583 0 : let tli_dir_path = tli_dir.path().to_path_buf();
584 0 :
585 0 : Ok((tli_dir, tli_dir_path))
586 0 : }
587 :
588 : /// Do basic validation of a temp timeline, before moving it to the global map.
589 0 : pub async fn validate_temp_timeline(
590 0 : conf: &SafeKeeperConf,
591 0 : ttid: TenantTimelineId,
592 0 : path: &Utf8PathBuf,
593 0 : ) -> Result<(Lsn, Lsn)> {
594 0 : let control_path = path.join("safekeeper.control");
595 :
596 0 : let control_store = control_file::FileStorage::load_control_file(control_path)?;
597 0 : if control_store.server.wal_seg_size == 0 {
598 0 : bail!("wal_seg_size is not set");
599 0 : }
600 :
601 0 : let wal_store = wal_storage::PhysicalStorage::new(&ttid, path, &control_store, conf.no_sync)?;
602 :
603 0 : let commit_lsn = control_store.commit_lsn;
604 0 : let flush_lsn = wal_store.flush_lsn();
605 0 :
606 0 : Ok((commit_lsn, flush_lsn))
607 0 : }
|