Line data Source code
1 : use crate::{
2 : is_temporary,
3 : tenant::{
4 : ephemeral_file::is_ephemeral_file,
5 : remote_timeline_client::{
6 : self,
7 : index::{IndexPart, LayerFileMetadata},
8 : },
9 : storage_layer::LayerFileName,
10 : Generation,
11 : },
12 : METADATA_FILE_NAME,
13 : };
14 : use anyhow::Context;
15 : use camino::Utf8Path;
16 : use pageserver_api::shard::ShardIndex;
17 : use std::{collections::HashMap, str::FromStr};
18 : use utils::lsn::Lsn;
19 :
20 : /// Identified files in the timeline directory.
21 : pub(super) enum Discovered {
22 : /// The only one we care about
23 : Layer(LayerFileName, u64),
24 : /// Old ephmeral files from previous launches, should be removed
25 : Ephemeral(String),
26 : /// Old temporary timeline files, unsure what these really are, should be removed
27 : Temporary(String),
28 : /// Temporary on-demand download files, should be removed
29 : TemporaryDownload(String),
30 : /// "metadata" file we persist locally and include in `index_part.json`
31 : Metadata,
32 : /// Backup file from previously future layers
33 : IgnoredBackup,
34 : /// Unrecognized, warn about these
35 : Unknown(String),
36 : }
37 :
38 : /// Scans the timeline directory for interesting files.
39 412 : pub(super) fn scan_timeline_dir(path: &Utf8Path) -> anyhow::Result<Vec<Discovered>> {
40 412 : let mut ret = Vec::new();
41 :
42 13811 : for direntry in path.read_dir_utf8()? {
43 13811 : let direntry = direntry?;
44 13811 : let file_name = direntry.file_name().to_string();
45 :
46 13811 : let discovered = match LayerFileName::from_str(&file_name) {
47 13348 : Ok(file_name) => {
48 13348 : let file_size = direntry.metadata()?.len();
49 13348 : Discovered::Layer(file_name, file_size)
50 : }
51 : Err(_) => {
52 463 : if file_name == METADATA_FILE_NAME {
53 412 : Discovered::Metadata
54 51 : } else if file_name.ends_with(".old") {
55 : // ignore these
56 0 : Discovered::IgnoredBackup
57 51 : } else if remote_timeline_client::is_temp_download_file(direntry.path()) {
58 3 : Discovered::TemporaryDownload(file_name)
59 48 : } else if is_ephemeral_file(&file_name) {
60 47 : Discovered::Ephemeral(file_name)
61 1 : } else if is_temporary(direntry.path()) {
62 1 : Discovered::Temporary(file_name)
63 : } else {
64 0 : Discovered::Unknown(file_name)
65 : }
66 : }
67 : };
68 :
69 13811 : ret.push(discovered);
70 : }
71 :
72 412 : Ok(ret)
73 412 : }
74 :
75 : /// Decision on what to do with a layer file after considering its local and remote metadata.
76 0 : #[derive(Clone, Debug)]
77 : pub(super) enum Decision {
78 : /// The layer is not present locally.
79 : Evicted(LayerFileMetadata),
80 : /// The layer is present locally, but local metadata does not match remote; we must
81 : /// delete it and treat it as evicted.
82 : UseRemote {
83 : local: LayerFileMetadata,
84 : remote: LayerFileMetadata,
85 : },
86 : /// The layer is present locally, and metadata matches.
87 : UseLocal(LayerFileMetadata),
88 : }
89 :
90 : /// A layer needs to be left out of the layer map.
91 0 : #[derive(Debug)]
92 : pub(super) enum DismissedLayer {
93 : /// The related layer is is in future compared to disk_consistent_lsn, it must not be loaded.
94 : Future {
95 : /// The local metadata. `None` if the layer is only known through [`IndexPart`].
96 : local: Option<LayerFileMetadata>,
97 : },
98 : /// The layer only exists locally.
99 : ///
100 : /// In order to make crash safe updates to layer map, we must dismiss layers which are only
101 : /// found locally or not yet included in the remote `index_part.json`.
102 : LocalOnly(LayerFileMetadata),
103 : }
104 :
105 : /// Merges local discoveries and remote [`IndexPart`] to a collection of decisions.
106 412 : pub(super) fn reconcile(
107 412 : discovered: Vec<(LayerFileName, u64)>,
108 412 : index_part: Option<&IndexPart>,
109 412 : disk_consistent_lsn: Lsn,
110 412 : generation: Generation,
111 412 : shard: ShardIndex,
112 412 : ) -> Vec<(LayerFileName, Result<Decision, DismissedLayer>)> {
113 412 : use Decision::*;
114 412 :
115 412 : // name => (local, remote)
116 412 : type Collected = HashMap<LayerFileName, (Option<LayerFileMetadata>, Option<LayerFileMetadata>)>;
117 412 :
118 412 : let mut discovered = discovered
119 412 : .into_iter()
120 13348 : .map(|(name, file_size)| {
121 13348 : (
122 13348 : name,
123 13348 : // The generation and shard here will be corrected to match IndexPart in the merge below, unless
124 13348 : // it is not in IndexPart, in which case using our current generation makes sense
125 13348 : // because it will be uploaded in this generation.
126 13348 : (
127 13348 : Some(LayerFileMetadata::new(file_size, generation, shard)),
128 13348 : None,
129 13348 : ),
130 13348 : )
131 13348 : })
132 412 : .collect::<Collected>();
133 412 :
134 412 : // merge any index_part information, when available
135 412 : index_part
136 412 : .as_ref()
137 412 : .map(|ip| ip.layer_metadata.iter())
138 412 : .into_iter()
139 412 : .flatten()
140 56829 : .map(|(name, metadata)| (name, LayerFileMetadata::from(metadata)))
141 56829 : .for_each(|(name, metadata)| {
142 56829 : if let Some(existing) = discovered.get_mut(name) {
143 12712 : existing.1 = Some(metadata);
144 44117 : } else {
145 44117 : discovered.insert(name.to_owned(), (None, Some(metadata)));
146 44117 : }
147 56829 : });
148 412 :
149 412 : discovered
150 412 : .into_iter()
151 57465 : .map(|(name, (local, remote))| {
152 57465 : let decision = if name.is_in_future(disk_consistent_lsn) {
153 2 : Err(DismissedLayer::Future { local })
154 : } else {
155 57463 : match (local, remote) {
156 12712 : (Some(local), Some(remote)) if local != remote => {
157 12212 : Ok(UseRemote { local, remote })
158 : }
159 500 : (Some(x), Some(_)) => Ok(UseLocal(x)),
160 44116 : (None, Some(x)) => Ok(Evicted(x)),
161 635 : (Some(x), None) => Err(DismissedLayer::LocalOnly(x)),
162 : (None, None) => {
163 0 : unreachable!("there must not be any non-local non-remote files")
164 : }
165 : }
166 : };
167 :
168 57465 : (name, decision)
169 57465 : })
170 412 : .collect::<Vec<_>>()
171 412 : }
172 :
173 51 : pub(super) fn cleanup(path: &Utf8Path, kind: &str) -> anyhow::Result<()> {
174 51 : let file_name = path.file_name().expect("must be file path");
175 51 : tracing::debug!(kind, ?file_name, "cleaning up");
176 51 : std::fs::remove_file(path).with_context(|| format!("failed to remove {kind} at {path}"))
177 51 : }
178 :
179 1 : pub(super) fn cleanup_local_file_for_remote(
180 1 : path: &Utf8Path,
181 1 : local: &LayerFileMetadata,
182 1 : remote: &LayerFileMetadata,
183 1 : ) -> anyhow::Result<()> {
184 1 : let local_size = local.file_size();
185 1 : let remote_size = remote.file_size();
186 1 :
187 1 : let file_name = path.file_name().expect("must be file path");
188 1 : tracing::warn!("removing local file {file_name:?} because it has unexpected length {local_size}; length in remote index is {remote_size}");
189 1 : if let Err(err) = crate::tenant::timeline::rename_to_backup(path) {
190 0 : assert!(
191 0 : path.exists(),
192 0 : "we would leave the local_layer without a file if this does not hold: {path}",
193 : );
194 0 : Err(err)
195 : } else {
196 1 : Ok(())
197 : }
198 1 : }
199 :
200 1 : pub(super) fn cleanup_future_layer(
201 1 : path: &Utf8Path,
202 1 : name: &LayerFileName,
203 1 : disk_consistent_lsn: Lsn,
204 1 : ) -> anyhow::Result<()> {
205 1 : // future image layers are allowed to be produced always for not yet flushed to disk
206 1 : // lsns stored in InMemoryLayer.
207 1 : let kind = name.kind();
208 1 : tracing::info!("found future {kind} layer {name} disk_consistent_lsn is {disk_consistent_lsn}");
209 1 : std::fs::remove_file(path)?;
210 1 : Ok(())
211 1 : }
212 :
213 635 : pub(super) fn cleanup_local_only_file(
214 635 : path: &Utf8Path,
215 635 : name: &LayerFileName,
216 635 : local: &LayerFileMetadata,
217 635 : ) -> anyhow::Result<()> {
218 635 : let kind = name.kind();
219 635 : tracing::info!("found local-only {kind} layer {name}, metadata {local:?}");
220 635 : std::fs::remove_file(path)?;
221 635 : Ok(())
222 635 : }
|