Line data Source code
1 : use crate::{
2 : is_temporary,
3 : tenant::{
4 : ephemeral_file::is_ephemeral_file,
5 : remote_timeline_client::{
6 : self,
7 : index::{IndexPart, LayerFileMetadata},
8 : },
9 : storage_layer::LayerFileName,
10 : Generation,
11 : },
12 : METADATA_FILE_NAME,
13 : };
14 : use anyhow::Context;
15 : use std::{collections::HashMap, ffi::OsString, path::Path, str::FromStr};
16 : use utils::lsn::Lsn;
17 :
18 : /// Identified files in the timeline directory.
19 : pub(super) enum Discovered {
20 : /// The only one we care about
21 : Layer(LayerFileName, u64),
22 : /// Old ephmeral files from previous launches, should be removed
23 : Ephemeral(OsString),
24 : /// Old temporary timeline files, unsure what these really are, should be removed
25 : Temporary(OsString),
26 : /// Temporary on-demand download files, should be removed
27 : TemporaryDownload(OsString),
28 : /// "metadata" file we persist locally and include in `index_part.json`
29 : Metadata,
30 : /// Backup file from previously future layers
31 : IgnoredBackup,
32 : /// Unrecognized, warn about these
33 : Unknown(OsString),
34 : }
35 :
36 : /// Scans the timeline directory for interesting files.
37 326 : pub(super) fn scan_timeline_dir(path: &Path) -> anyhow::Result<Vec<Discovered>> {
38 326 : let mut ret = Vec::new();
39 :
40 4264 : for direntry in std::fs::read_dir(path)? {
41 4264 : let direntry = direntry?;
42 4264 : let direntry_path = direntry.path();
43 4264 : let file_name = direntry.file_name();
44 4264 :
45 4264 : let fname = file_name.to_string_lossy();
46 :
47 4264 : let discovered = match LayerFileName::from_str(&fname) {
48 3850 : Ok(file_name) => {
49 3850 : let file_size = direntry.metadata()?.len();
50 3850 : Discovered::Layer(file_name, file_size)
51 : }
52 : Err(_) => {
53 414 : if fname == METADATA_FILE_NAME {
54 326 : Discovered::Metadata
55 88 : } else if fname.ends_with(".old") {
56 : // ignore these
57 0 : Discovered::IgnoredBackup
58 88 : } else if remote_timeline_client::is_temp_download_file(&direntry_path) {
59 0 : Discovered::TemporaryDownload(file_name)
60 88 : } else if is_ephemeral_file(&fname) {
61 83 : Discovered::Ephemeral(file_name)
62 5 : } else if is_temporary(&direntry_path) {
63 5 : Discovered::Temporary(file_name)
64 : } else {
65 0 : Discovered::Unknown(file_name)
66 : }
67 : }
68 : };
69 :
70 4264 : ret.push(discovered);
71 : }
72 :
73 326 : Ok(ret)
74 326 : }
75 :
76 : /// Decision on what to do with a layer file after considering its local and remote metadata.
77 0 : #[derive(Clone)]
78 : pub(super) enum Decision {
79 : /// The layer is not present locally.
80 : Evicted(LayerFileMetadata),
81 : /// The layer is present locally, but local metadata does not match remote; we must
82 : /// delete it and treat it as evicted.
83 : UseRemote {
84 : local: LayerFileMetadata,
85 : remote: LayerFileMetadata,
86 : },
87 : /// The layer is present locally, and metadata matches.
88 : UseLocal(LayerFileMetadata),
89 : /// The layer is only known locally, it needs to be uploaded.
90 : NeedsUpload(LayerFileMetadata),
91 : }
92 :
93 : /// The related layer is is in future compared to disk_consistent_lsn, it must not be loaded.
94 0 : #[derive(Debug)]
95 : pub(super) struct FutureLayer {
96 : /// The local metadata. `None` if the layer is only known through [`IndexPart`].
97 : pub(super) local: Option<LayerFileMetadata>,
98 : }
99 :
100 : /// Merges local discoveries and remote [`IndexPart`] to a collection of decisions.
101 : ///
102 : /// This function should not gain additional reasons to fail than [`FutureLayer`], consider adding
103 : /// the checks earlier to [`scan_timeline_dir`].
104 326 : pub(super) fn reconcile(
105 326 : discovered: Vec<(LayerFileName, u64)>,
106 326 : index_part: Option<&IndexPart>,
107 326 : disk_consistent_lsn: Lsn,
108 326 : generation: Generation,
109 326 : ) -> Vec<(LayerFileName, Result<Decision, FutureLayer>)> {
110 326 : use Decision::*;
111 326 :
112 326 : // name => (local, remote)
113 326 : type Collected = HashMap<LayerFileName, (Option<LayerFileMetadata>, Option<LayerFileMetadata>)>;
114 326 :
115 326 : let mut discovered = discovered
116 326 : .into_iter()
117 3850 : .map(|(name, file_size)| {
118 3850 : (
119 3850 : name,
120 3850 : // The generation here will be corrected to match IndexPart in the merge below, unless
121 3850 : // it is not in IndexPart, in which case using our current generation makes sense
122 3850 : // because it will be uploaded in this generation.
123 3850 : (Some(LayerFileMetadata::new(file_size, generation)), None),
124 3850 : )
125 3850 : })
126 326 : .collect::<Collected>();
127 326 :
128 326 : // merge any index_part information, when available
129 326 : index_part
130 326 : .as_ref()
131 326 : .map(|ip| ip.layer_metadata.iter())
132 326 : .into_iter()
133 326 : .flatten()
134 5490 : .map(|(name, metadata)| (name, LayerFileMetadata::from(metadata)))
135 326 : .for_each(|(name, metadata)| {
136 5490 : if let Some(existing) = discovered.get_mut(name) {
137 2999 : existing.1 = Some(metadata);
138 2999 : } else {
139 2491 : discovered.insert(name.to_owned(), (None, Some(metadata)));
140 2491 : }
141 5490 : });
142 326 :
143 326 : discovered
144 326 : .into_iter()
145 6341 : .map(|(name, (local, remote))| {
146 6341 : let decision = if name.is_in_future(disk_consistent_lsn) {
147 1 : Err(FutureLayer { local })
148 : } else {
149 6340 : Ok(match (local, remote) {
150 2999 : (Some(local), Some(remote)) if local != remote => {
151 1 : assert_eq!(local.generation, remote.generation);
152 :
153 1 : UseRemote { local, remote }
154 : }
155 2998 : (Some(x), Some(_)) => UseLocal(x),
156 2491 : (None, Some(x)) => Evicted(x),
157 850 : (Some(x), None) => NeedsUpload(x),
158 : (None, None) => {
159 0 : unreachable!("there must not be any non-local non-remote files")
160 : }
161 : })
162 : };
163 :
164 6341 : (name, decision)
165 6341 : })
166 326 : .collect::<Vec<_>>()
167 326 : }
168 :
169 88 : pub(super) fn cleanup(path: &Path, kind: &str) -> anyhow::Result<()> {
170 88 : let file_name = path.file_name().expect("must be file path");
171 88 : tracing::debug!(kind, ?file_name, "cleaning up");
172 88 : std::fs::remove_file(path)
173 88 : .with_context(|| format!("failed to remove {kind} at {}", path.display()))
174 88 : }
175 :
176 1 : pub(super) fn cleanup_local_file_for_remote(
177 1 : path: &Path,
178 1 : local: &LayerFileMetadata,
179 1 : remote: &LayerFileMetadata,
180 1 : ) -> anyhow::Result<()> {
181 1 : let local_size = local.file_size();
182 1 : let remote_size = remote.file_size();
183 1 :
184 1 : let file_name = path.file_name().expect("must be file path");
185 1 : tracing::warn!("removing local file {file_name:?} because it has unexpected length {local_size}; length in remote index is {remote_size}");
186 1 : if let Err(err) = crate::tenant::timeline::rename_to_backup(path) {
187 0 : assert!(
188 0 : path.exists(),
189 0 : "we would leave the local_layer without a file if this does not hold: {}",
190 0 : path.display()
191 : );
192 0 : Err(err)
193 : } else {
194 1 : Ok(())
195 : }
196 1 : }
197 :
198 1 : pub(super) fn cleanup_future_layer(
199 1 : path: &Path,
200 1 : name: &LayerFileName,
201 1 : disk_consistent_lsn: Lsn,
202 1 : ) -> anyhow::Result<()> {
203 : use LayerFileName::*;
204 1 : let kind = match name {
205 1 : Delta(_) => "delta",
206 0 : Image(_) => "image",
207 : };
208 : // future image layers are allowed to be produced always for not yet flushed to disk
209 : // lsns stored in InMemoryLayer.
210 1 : tracing::info!("found future {kind} layer {name} disk_consistent_lsn is {disk_consistent_lsn}");
211 1 : crate::tenant::timeline::rename_to_backup(path)?;
212 1 : Ok(())
213 1 : }
|