Line data Source code
1 : use std::collections::{HashMap, hash_map};
2 : use std::str::FromStr;
3 :
4 : use anyhow::Context;
5 : use camino::{Utf8Path, Utf8PathBuf};
6 : use utils::lsn::Lsn;
7 :
8 : use crate::is_temporary;
9 : use crate::tenant::ephemeral_file::is_ephemeral_file;
10 : use crate::tenant::remote_timeline_client::index::{IndexPart, LayerFileMetadata};
11 : use crate::tenant::remote_timeline_client::{self};
12 : use crate::tenant::storage_layer::LayerName;
13 :
14 : /// Identified files in the timeline directory.
15 : pub(super) enum Discovered {
16 : /// The only one we care about
17 : Layer(LayerName, LocalLayerFileMetadata),
18 : /// Old ephmeral files from previous launches, should be removed
19 : Ephemeral(String),
20 : /// Old temporary timeline files, unsure what these really are, should be removed
21 : Temporary(String),
22 : /// Temporary on-demand download files, should be removed
23 : TemporaryDownload(String),
24 : /// Backup file from previously future layers
25 : IgnoredBackup(Utf8PathBuf),
26 : /// Unrecognized, warn about these
27 : Unknown(String),
28 : }
29 :
30 : /// Scans the timeline directory for interesting files.
31 12 : pub(super) fn scan_timeline_dir(path: &Utf8Path) -> anyhow::Result<Vec<Discovered>> {
32 12 : let mut ret = Vec::new();
33 :
34 32 : for direntry in path.read_dir_utf8()? {
35 32 : let direntry = direntry?;
36 32 : let file_name = direntry.file_name().to_string();
37 :
38 32 : let discovered = match LayerName::from_str(&file_name) {
39 32 : Ok(file_name) => {
40 32 : let file_size = direntry.metadata()?.len();
41 32 : Discovered::Layer(
42 32 : file_name,
43 32 : LocalLayerFileMetadata::new(direntry.path().to_owned(), file_size),
44 32 : )
45 : }
46 : Err(_) => {
47 0 : if file_name.ends_with(".old") {
48 : // ignore these
49 0 : Discovered::IgnoredBackup(direntry.path().to_owned())
50 0 : } else if remote_timeline_client::is_temp_download_file(direntry.path()) {
51 0 : Discovered::TemporaryDownload(file_name)
52 0 : } else if is_ephemeral_file(&file_name) {
53 0 : Discovered::Ephemeral(file_name)
54 0 : } else if is_temporary(direntry.path()) {
55 0 : Discovered::Temporary(file_name)
56 : } else {
57 0 : Discovered::Unknown(file_name)
58 : }
59 : }
60 : };
61 :
62 32 : ret.push(discovered);
63 : }
64 :
65 12 : Ok(ret)
66 12 : }
67 :
68 : /// Whereas `LayerFileMetadata` describes the metadata we would store in remote storage,
69 : /// this structure extends it with metadata describing the layer's presence in local storage.
70 : #[derive(Clone, Debug)]
71 : pub(super) struct LocalLayerFileMetadata {
72 : pub(super) file_size: u64,
73 : pub(super) local_path: Utf8PathBuf,
74 : }
75 :
76 : impl LocalLayerFileMetadata {
77 32 : pub fn new(local_path: Utf8PathBuf, file_size: u64) -> Self {
78 32 : Self {
79 32 : local_path,
80 32 : file_size,
81 32 : }
82 32 : }
83 : }
84 :
85 : /// For a layer that is present in remote metadata, this type describes how to handle
86 : /// it during startup: it is either Resident (and we have some metadata about a local file),
87 : /// or it is Evicted (and we only have remote metadata).
88 : #[derive(Clone, Debug)]
89 : pub(super) enum Decision {
90 : /// The layer is not present locally.
91 : Evicted(LayerFileMetadata),
92 : /// The layer is present locally, and metadata matches: we may hook up this layer to the
93 : /// existing file in local storage.
94 : Resident {
95 : local: LocalLayerFileMetadata,
96 : remote: LayerFileMetadata,
97 : },
98 : }
99 :
100 : /// A layer needs to be left out of the layer map.
101 : #[derive(Debug)]
102 : pub(super) enum DismissedLayer {
103 : /// The related layer is is in future compared to disk_consistent_lsn, it must not be loaded.
104 : Future {
105 : /// `None` if the layer is only known through [`IndexPart`].
106 : local: Option<LocalLayerFileMetadata>,
107 : },
108 : /// The layer only exists locally.
109 : ///
110 : /// In order to make crash safe updates to layer map, we must dismiss layers which are only
111 : /// found locally or not yet included in the remote `index_part.json`.
112 : LocalOnly(LocalLayerFileMetadata),
113 :
114 : /// The layer exists in remote storage but the local layer's metadata (e.g. file size)
115 : /// does not match it
116 : BadMetadata(LocalLayerFileMetadata),
117 : }
118 :
119 : /// Merges local discoveries and remote [`IndexPart`] to a collection of decisions.
120 12 : pub(super) fn reconcile(
121 12 : local_layers: Vec<(LayerName, LocalLayerFileMetadata)>,
122 12 : index_part: &IndexPart,
123 12 : disk_consistent_lsn: Lsn,
124 12 : ) -> Vec<(LayerName, Result<Decision, DismissedLayer>)> {
125 12 : let mut result = Vec::new();
126 12 :
127 12 : let mut remote_layers = HashMap::new();
128 :
129 : // Construct Decisions for layers that are found locally, if they're in remote metadata. Otherwise
130 : // construct DismissedLayers to get rid of them.
131 44 : for (layer_name, local_metadata) in local_layers {
132 32 : let Some(remote_metadata) = index_part.layer_metadata.get(&layer_name) else {
133 0 : result.push((layer_name, Err(DismissedLayer::LocalOnly(local_metadata))));
134 0 : continue;
135 : };
136 :
137 32 : if remote_metadata.file_size != local_metadata.file_size {
138 0 : result.push((layer_name, Err(DismissedLayer::BadMetadata(local_metadata))));
139 0 : continue;
140 32 : }
141 32 :
142 32 : remote_layers.insert(
143 32 : layer_name,
144 32 : Decision::Resident {
145 32 : local: local_metadata,
146 32 : remote: remote_metadata.clone(),
147 32 : },
148 32 : );
149 : }
150 :
151 : // Construct Decision for layers that were not found locally
152 12 : index_part
153 12 : .layer_metadata
154 12 : .iter()
155 32 : .for_each(|(name, metadata)| {
156 32 : if let hash_map::Entry::Vacant(entry) = remote_layers.entry(name.clone()) {
157 0 : entry.insert(Decision::Evicted(metadata.clone()));
158 32 : }
159 32 : });
160 12 :
161 12 : // For layers that were found in authoritative remote metadata, apply a final check that they are within
162 12 : // the disk_consistent_lsn.
163 32 : result.extend(remote_layers.into_iter().map(|(name, decision)| {
164 32 : if name.is_in_future(disk_consistent_lsn) {
165 0 : match decision {
166 0 : Decision::Evicted(_remote) => (name, Err(DismissedLayer::Future { local: None })),
167 : Decision::Resident {
168 0 : local,
169 0 : remote: _remote,
170 0 : } => (name, Err(DismissedLayer::Future { local: Some(local) })),
171 : }
172 : } else {
173 32 : (name, Ok(decision))
174 : }
175 32 : }));
176 12 :
177 12 : result
178 12 : }
179 :
180 0 : pub(super) fn cleanup(path: &Utf8Path, kind: &str) -> anyhow::Result<()> {
181 0 : let file_name = path.file_name().expect("must be file path");
182 0 : tracing::debug!(kind, ?file_name, "cleaning up");
183 0 : std::fs::remove_file(path).with_context(|| format!("failed to remove {kind} at {path}"))
184 0 : }
185 :
186 0 : pub(super) fn cleanup_local_file_for_remote(local: &LocalLayerFileMetadata) -> anyhow::Result<()> {
187 0 : let local_size = local.file_size;
188 0 : let path = &local.local_path;
189 0 : let file_name = path.file_name().expect("must be file path");
190 0 : tracing::warn!(
191 0 : "removing local file {file_name:?} because it has unexpected length {local_size};"
192 : );
193 :
194 0 : std::fs::remove_file(path).with_context(|| format!("failed to remove layer at {path}"))
195 0 : }
196 :
197 0 : pub(super) fn cleanup_future_layer(
198 0 : path: &Utf8Path,
199 0 : name: &LayerName,
200 0 : disk_consistent_lsn: Lsn,
201 0 : ) -> anyhow::Result<()> {
202 0 : // future image layers are allowed to be produced always for not yet flushed to disk
203 0 : // lsns stored in InMemoryLayer.
204 0 : let kind = name.kind();
205 0 : tracing::info!("found future {kind} layer {name} disk_consistent_lsn is {disk_consistent_lsn}");
206 0 : std::fs::remove_file(path)?;
207 0 : Ok(())
208 0 : }
209 :
210 0 : pub(super) fn cleanup_local_only_file(
211 0 : name: &LayerName,
212 0 : local: &LocalLayerFileMetadata,
213 0 : ) -> anyhow::Result<()> {
214 0 : let kind = name.kind();
215 0 : tracing::info!(
216 0 : "found local-only {kind} layer {name} size {}",
217 : local.file_size
218 : );
219 0 : std::fs::remove_file(&local.local_path)?;
220 0 : Ok(())
221 0 : }
|