Line data Source code
1 : use crate::metrics::RemoteOpFileKind;
2 :
3 : use super::storage_layer::LayerFileName;
4 : use super::Generation;
5 : use crate::tenant::metadata::TimelineMetadata;
6 : use crate::tenant::remote_timeline_client::index::IndexPart;
7 : use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
8 : use std::collections::{HashMap, VecDeque};
9 : use std::fmt::Debug;
10 :
11 : use chrono::NaiveDateTime;
12 : use std::sync::Arc;
13 : use tracing::info;
14 :
15 : use std::sync::atomic::AtomicU32;
16 : use utils::lsn::Lsn;
17 :
18 : // clippy warns that Uninitialized is much smaller than Initialized, which wastes
19 : // memory for Uninitialized variants. Doesn't matter in practice, there are not
20 : // that many upload queues in a running pageserver, and most of them are initialized
21 : // anyway.
22 : #[allow(clippy::large_enum_variant)]
23 : pub(super) enum UploadQueue {
24 : Uninitialized,
25 : Initialized(UploadQueueInitialized),
26 : Stopped(UploadQueueStopped),
27 : }
28 :
29 : impl UploadQueue {
30 0 : pub fn as_str(&self) -> &'static str {
31 0 : match self {
32 0 : UploadQueue::Uninitialized => "Uninitialized",
33 0 : UploadQueue::Initialized(_) => "Initialized",
34 0 : UploadQueue::Stopped(_) => "Stopped",
35 : }
36 0 : }
37 : }
38 :
39 : /// This keeps track of queued and in-progress tasks.
40 : pub(crate) struct UploadQueueInitialized {
41 : /// Counter to assign task IDs
42 : pub(crate) task_counter: u64,
43 :
44 : /// All layer files stored in the remote storage, taking into account all
45 : /// in-progress and queued operations
46 : pub(crate) latest_files: HashMap<LayerFileName, LayerFileMetadata>,
47 :
48 : /// How many file uploads or deletions been scheduled, since the
49 : /// last (scheduling of) metadata index upload?
50 : pub(crate) latest_files_changes_since_metadata_upload_scheduled: u64,
51 :
52 : /// Metadata stored in the remote storage, taking into account all
53 : /// in-progress and queued operations.
54 : /// DANGER: do not return to outside world, e.g., safekeepers.
55 : pub(crate) latest_metadata: TimelineMetadata,
56 :
57 : /// `disk_consistent_lsn` from the last metadata file that was successfully
58 : /// uploaded. `Lsn(0)` if nothing was uploaded yet.
59 : /// Unlike `latest_files` or `latest_metadata`, this value is never ahead.
60 : /// Safekeeper can rely on it to make decisions for WAL storage.
61 : pub(crate) last_uploaded_consistent_lsn: Lsn,
62 :
63 : // Breakdown of different kinds of tasks currently in-progress
64 : pub(crate) num_inprogress_layer_uploads: usize,
65 : pub(crate) num_inprogress_metadata_uploads: usize,
66 : pub(crate) num_inprogress_deletions: usize,
67 :
68 : /// Tasks that are currently in-progress. In-progress means that a tokio Task
69 : /// has been launched for it. An in-progress task can be busy uploading, but it can
70 : /// also be waiting on the `concurrency_limiter` Semaphore in S3Bucket, or it can
71 : /// be waiting for retry in `exponential_backoff`.
72 : pub(crate) inprogress_tasks: HashMap<u64, Arc<UploadTask>>,
73 :
74 : /// Queued operations that have not been launched yet. They might depend on previous
75 : /// tasks to finish. For example, metadata upload cannot be performed before all
76 : /// preceding layer file uploads have completed.
77 : pub(crate) queued_operations: VecDeque<UploadOp>,
78 : }
79 :
80 : impl UploadQueueInitialized {
81 203 : pub(super) fn no_pending_work(&self) -> bool {
82 203 : self.inprogress_tasks.is_empty() && self.queued_operations.is_empty()
83 203 : }
84 : }
85 :
86 0 : #[derive(Clone, Copy)]
87 : pub(super) enum SetDeletedFlagProgress {
88 : NotRunning,
89 : InProgress(NaiveDateTime),
90 : Successful(NaiveDateTime),
91 : }
92 :
93 : pub(super) struct UploadQueueStopped {
94 : pub(super) upload_queue_for_deletion: UploadQueueInitialized,
95 : pub(super) deleted_at: SetDeletedFlagProgress,
96 : }
97 :
98 : impl UploadQueue {
99 564 : pub(crate) fn initialize_empty_remote(
100 564 : &mut self,
101 564 : metadata: &TimelineMetadata,
102 564 : ) -> anyhow::Result<&mut UploadQueueInitialized> {
103 564 : match self {
104 564 : UploadQueue::Uninitialized => (),
105 : UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => {
106 0 : anyhow::bail!("already initialized, state {}", self.as_str())
107 : }
108 : }
109 :
110 564 : info!("initializing upload queue for empty remote");
111 :
112 564 : let state = UploadQueueInitialized {
113 564 : // As described in the doc comment, it's ok for `latest_files` and `latest_metadata` to be ahead.
114 564 : latest_files: HashMap::new(),
115 564 : latest_files_changes_since_metadata_upload_scheduled: 0,
116 564 : latest_metadata: metadata.clone(),
117 564 : // We haven't uploaded anything yet, so, `last_uploaded_consistent_lsn` must be 0 to prevent
118 564 : // safekeepers from garbage-collecting anything.
119 564 : last_uploaded_consistent_lsn: Lsn(0),
120 564 : // what follows are boring default initializations
121 564 : task_counter: 0,
122 564 : num_inprogress_layer_uploads: 0,
123 564 : num_inprogress_metadata_uploads: 0,
124 564 : num_inprogress_deletions: 0,
125 564 : inprogress_tasks: HashMap::new(),
126 564 : queued_operations: VecDeque::new(),
127 564 : };
128 564 :
129 564 : *self = UploadQueue::Initialized(state);
130 564 : Ok(self.initialized_mut().expect("we just set it"))
131 564 : }
132 :
133 199 : pub(crate) fn initialize_with_current_remote_index_part(
134 199 : &mut self,
135 199 : index_part: &IndexPart,
136 199 : ) -> anyhow::Result<&mut UploadQueueInitialized> {
137 199 : match self {
138 199 : UploadQueue::Uninitialized => (),
139 : UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => {
140 0 : anyhow::bail!("already initialized, state {}", self.as_str())
141 : }
142 : }
143 :
144 199 : let mut files = HashMap::with_capacity(index_part.layer_metadata.len());
145 6481 : for (layer_name, layer_metadata) in &index_part.layer_metadata {
146 6282 : files.insert(
147 6282 : layer_name.to_owned(),
148 6282 : LayerFileMetadata::from(layer_metadata),
149 6282 : );
150 6282 : }
151 :
152 199 : info!(
153 199 : "initializing upload queue with remote index_part.disk_consistent_lsn: {}",
154 199 : index_part.metadata.disk_consistent_lsn()
155 199 : );
156 :
157 199 : let state = UploadQueueInitialized {
158 199 : latest_files: files,
159 199 : latest_files_changes_since_metadata_upload_scheduled: 0,
160 199 : latest_metadata: index_part.metadata.clone(),
161 199 : last_uploaded_consistent_lsn: index_part.metadata.disk_consistent_lsn(),
162 199 : // what follows are boring default initializations
163 199 : task_counter: 0,
164 199 : num_inprogress_layer_uploads: 0,
165 199 : num_inprogress_metadata_uploads: 0,
166 199 : num_inprogress_deletions: 0,
167 199 : inprogress_tasks: HashMap::new(),
168 199 : queued_operations: VecDeque::new(),
169 199 : };
170 199 :
171 199 : *self = UploadQueue::Initialized(state);
172 199 : Ok(self.initialized_mut().expect("we just set it"))
173 199 : }
174 :
175 19325 : pub(crate) fn initialized_mut(&mut self) -> anyhow::Result<&mut UploadQueueInitialized> {
176 19325 : match self {
177 : UploadQueue::Uninitialized | UploadQueue::Stopped(_) => {
178 0 : anyhow::bail!("queue is in state {}", self.as_str())
179 : }
180 19325 : UploadQueue::Initialized(x) => Ok(x),
181 : }
182 19325 : }
183 :
184 614 : pub(crate) fn stopped_mut(&mut self) -> anyhow::Result<&mut UploadQueueStopped> {
185 614 : match self {
186 : UploadQueue::Initialized(_) | UploadQueue::Uninitialized => {
187 0 : anyhow::bail!("queue is in state {}", self.as_str())
188 : }
189 614 : UploadQueue::Stopped(stopped) => Ok(stopped),
190 : }
191 614 : }
192 : }
193 :
194 : /// An in-progress upload or delete task.
195 0 : #[derive(Debug)]
196 : pub(crate) struct UploadTask {
197 : /// Unique ID of this task. Used as the key in `inprogress_tasks` above.
198 : pub(crate) task_id: u64,
199 : pub(crate) retries: AtomicU32,
200 :
201 : pub(crate) op: UploadOp,
202 : }
203 :
204 0 : #[derive(Debug)]
205 : pub(crate) struct Delete {
206 : pub(crate) file_kind: RemoteOpFileKind,
207 : pub(crate) layer_file_name: LayerFileName,
208 : pub(crate) scheduled_from_timeline_delete: bool,
209 : pub(crate) generation: Generation,
210 : }
211 :
212 0 : #[derive(Debug)]
213 : pub(crate) enum UploadOp {
214 : /// Upload a layer file
215 : UploadLayer(LayerFileName, LayerFileMetadata),
216 :
217 : /// Upload the metadata file
218 : UploadMetadata(IndexPart, Lsn),
219 :
220 : /// Delete a layer file
221 : Delete(Delete),
222 :
223 : /// Barrier. When the barrier operation is reached,
224 : Barrier(tokio::sync::watch::Sender<()>),
225 : }
226 :
227 : impl std::fmt::Display for UploadOp {
228 9152 : fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
229 9152 : match self {
230 3818 : UploadOp::UploadLayer(path, metadata) => {
231 3818 : write!(
232 3818 : f,
233 3818 : "UploadLayer({}, size={:?}, gen={:?})",
234 3818 : path.file_name(),
235 3818 : metadata.file_size(),
236 3818 : metadata.generation,
237 3818 : )
238 : }
239 1440 : UploadOp::UploadMetadata(_, lsn) => {
240 1440 : write!(f, "UploadMetadata(lsn: {})", lsn)
241 : }
242 3894 : UploadOp::Delete(delete) => write!(
243 3894 : f,
244 3894 : "Delete(path: {}, scheduled_from_timeline_delete: {}, gen: {:?})",
245 3894 : delete.layer_file_name.file_name(),
246 3894 : delete.scheduled_from_timeline_delete,
247 3894 : delete.generation
248 3894 : ),
249 0 : UploadOp::Barrier(_) => write!(f, "Barrier"),
250 : }
251 9152 : }
252 : }
|