TLA Line data Source code
1 : use super::storage_layer::LayerFileName;
2 : use super::storage_layer::ResidentLayer;
3 : use crate::tenant::metadata::TimelineMetadata;
4 : use crate::tenant::remote_timeline_client::index::IndexPart;
5 : use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
6 : use std::collections::{HashMap, VecDeque};
7 : use std::fmt::Debug;
8 :
9 : use chrono::NaiveDateTime;
10 : use std::sync::Arc;
11 : use tracing::info;
12 : use utils::lsn::AtomicLsn;
13 :
14 : use std::sync::atomic::AtomicU32;
15 : use utils::lsn::Lsn;
16 :
17 : #[cfg(feature = "testing")]
18 : use utils::generation::Generation;
19 :
20 : // clippy warns that Uninitialized is much smaller than Initialized, which wastes
21 : // memory for Uninitialized variants. Doesn't matter in practice, there are not
22 : // that many upload queues in a running pageserver, and most of them are initialized
23 : // anyway.
24 : #[allow(clippy::large_enum_variant)]
25 : pub(super) enum UploadQueue {
26 : Uninitialized,
27 : Initialized(UploadQueueInitialized),
28 : Stopped(UploadQueueStopped),
29 : }
30 :
31 : impl UploadQueue {
32 CBC 7 : pub fn as_str(&self) -> &'static str {
33 7 : match self {
34 UBC 0 : UploadQueue::Uninitialized => "Uninitialized",
35 0 : UploadQueue::Initialized(_) => "Initialized",
36 CBC 7 : UploadQueue::Stopped(_) => "Stopped",
37 : }
38 7 : }
39 : }
40 :
41 : /// This keeps track of queued and in-progress tasks.
42 : pub(crate) struct UploadQueueInitialized {
43 : /// Counter to assign task IDs
44 : pub(crate) task_counter: u64,
45 :
46 : /// All layer files stored in the remote storage, taking into account all
47 : /// in-progress and queued operations
48 : pub(crate) latest_files: HashMap<LayerFileName, LayerFileMetadata>,
49 :
50 : /// How many file uploads or deletions been scheduled, since the
51 : /// last (scheduling of) metadata index upload?
52 : pub(crate) latest_files_changes_since_metadata_upload_scheduled: u64,
53 :
54 : /// Metadata stored in the remote storage, taking into account all
55 : /// in-progress and queued operations.
56 : /// DANGER: do not return to outside world, e.g., safekeepers.
57 : pub(crate) latest_metadata: TimelineMetadata,
58 :
59 : /// `disk_consistent_lsn` from the last metadata file that was successfully
60 : /// uploaded. `Lsn(0)` if nothing was uploaded yet.
61 : /// Unlike `latest_files` or `latest_metadata`, this value is never ahead.
62 : /// Safekeeper can rely on it to make decisions for WAL storage.
63 : ///
64 : /// visible_remote_consistent_lsn is only updated after our generation has been validated with
65 : /// the control plane (unlesss a timeline's generation is None, in which case
66 : /// we skip validation)
67 : pub(crate) projected_remote_consistent_lsn: Option<Lsn>,
68 : pub(crate) visible_remote_consistent_lsn: Arc<AtomicLsn>,
69 :
70 : // Breakdown of different kinds of tasks currently in-progress
71 : pub(crate) num_inprogress_layer_uploads: usize,
72 : pub(crate) num_inprogress_metadata_uploads: usize,
73 : pub(crate) num_inprogress_deletions: usize,
74 :
75 : /// Tasks that are currently in-progress. In-progress means that a tokio Task
76 : /// has been launched for it. An in-progress task can be busy uploading, but it can
77 : /// also be waiting on the `concurrency_limiter` Semaphore in S3Bucket, or it can
78 : /// be waiting for retry in `exponential_backoff`.
79 : pub(crate) inprogress_tasks: HashMap<u64, Arc<UploadTask>>,
80 :
81 : /// Queued operations that have not been launched yet. They might depend on previous
82 : /// tasks to finish. For example, metadata upload cannot be performed before all
83 : /// preceding layer file uploads have completed.
84 : pub(crate) queued_operations: VecDeque<UploadOp>,
85 :
86 : /// Files which have been unlinked but not yet had scheduled a deletion for. Only kept around
87 : /// for error logging.
88 : ///
89 : /// Putting this behind a testing feature to catch problems in tests, but assuming we could have a
90 : /// bug causing leaks, then it's better to not leave this enabled for production builds.
91 : #[cfg(feature = "testing")]
92 : pub(crate) dangling_files: HashMap<LayerFileName, Generation>,
93 :
94 : /// Set to true when we have inserted the `UploadOp::Shutdown` into the `inprogress_tasks`.
95 : pub(crate) shutting_down: bool,
96 :
97 : /// Permitless semaphore on which any number of `RemoteTimelineClient::shutdown` futures can
98 : /// wait on until one of them stops the queue. The semaphore is closed when
99 : /// `RemoteTimelineClient::launch_queued_tasks` encounters `UploadOp::Shutdown`.
100 : pub(crate) shutdown_ready: Arc<tokio::sync::Semaphore>,
101 : }
102 :
103 : impl UploadQueueInitialized {
104 171 : pub(super) fn no_pending_work(&self) -> bool {
105 171 : self.inprogress_tasks.is_empty() && self.queued_operations.is_empty()
106 171 : }
107 :
108 568864 : pub(super) fn get_last_remote_consistent_lsn_visible(&self) -> Lsn {
109 568864 : self.visible_remote_consistent_lsn.load()
110 568864 : }
111 :
112 2993 : pub(super) fn get_last_remote_consistent_lsn_projected(&self) -> Option<Lsn> {
113 2993 : self.projected_remote_consistent_lsn
114 2993 : }
115 : }
116 :
117 UBC 0 : #[derive(Clone, Copy)]
118 : pub(super) enum SetDeletedFlagProgress {
119 : NotRunning,
120 : InProgress(NaiveDateTime),
121 : Successful(NaiveDateTime),
122 : }
123 :
124 : pub(super) struct UploadQueueStopped {
125 : pub(super) upload_queue_for_deletion: UploadQueueInitialized,
126 : pub(super) deleted_at: SetDeletedFlagProgress,
127 : }
128 :
129 : impl UploadQueue {
130 CBC 921 : pub(crate) fn initialize_empty_remote(
131 921 : &mut self,
132 921 : metadata: &TimelineMetadata,
133 921 : ) -> anyhow::Result<&mut UploadQueueInitialized> {
134 921 : match self {
135 921 : UploadQueue::Uninitialized => (),
136 : UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => {
137 UBC 0 : anyhow::bail!("already initialized, state {}", self.as_str())
138 : }
139 : }
140 :
141 CBC 921 : info!("initializing upload queue for empty remote");
142 :
143 921 : let state = UploadQueueInitialized {
144 921 : // As described in the doc comment, it's ok for `latest_files` and `latest_metadata` to be ahead.
145 921 : latest_files: HashMap::new(),
146 921 : latest_files_changes_since_metadata_upload_scheduled: 0,
147 921 : latest_metadata: metadata.clone(),
148 921 : projected_remote_consistent_lsn: None,
149 921 : visible_remote_consistent_lsn: Arc::new(AtomicLsn::new(0)),
150 921 : // what follows are boring default initializations
151 921 : task_counter: 0,
152 921 : num_inprogress_layer_uploads: 0,
153 921 : num_inprogress_metadata_uploads: 0,
154 921 : num_inprogress_deletions: 0,
155 921 : inprogress_tasks: HashMap::new(),
156 921 : queued_operations: VecDeque::new(),
157 921 : #[cfg(feature = "testing")]
158 921 : dangling_files: HashMap::new(),
159 921 : shutting_down: false,
160 921 : shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
161 921 : };
162 921 :
163 921 : *self = UploadQueue::Initialized(state);
164 921 : Ok(self.initialized_mut().expect("we just set it"))
165 921 : }
166 :
167 369 : pub(crate) fn initialize_with_current_remote_index_part(
168 369 : &mut self,
169 369 : index_part: &IndexPart,
170 369 : ) -> anyhow::Result<&mut UploadQueueInitialized> {
171 369 : match self {
172 369 : UploadQueue::Uninitialized => (),
173 : UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => {
174 UBC 0 : anyhow::bail!("already initialized, state {}", self.as_str())
175 : }
176 : }
177 :
178 CBC 369 : let mut files = HashMap::with_capacity(index_part.layer_metadata.len());
179 58550 : for (layer_name, layer_metadata) in &index_part.layer_metadata {
180 58181 : files.insert(
181 58181 : layer_name.to_owned(),
182 58181 : LayerFileMetadata::from(layer_metadata),
183 58181 : );
184 58181 : }
185 :
186 369 : info!(
187 369 : "initializing upload queue with remote index_part.disk_consistent_lsn: {}",
188 369 : index_part.metadata.disk_consistent_lsn()
189 369 : );
190 :
191 369 : let state = UploadQueueInitialized {
192 369 : latest_files: files,
193 369 : latest_files_changes_since_metadata_upload_scheduled: 0,
194 369 : latest_metadata: index_part.metadata.clone(),
195 369 : projected_remote_consistent_lsn: Some(index_part.metadata.disk_consistent_lsn()),
196 369 : visible_remote_consistent_lsn: Arc::new(
197 369 : index_part.metadata.disk_consistent_lsn().into(),
198 369 : ),
199 369 : // what follows are boring default initializations
200 369 : task_counter: 0,
201 369 : num_inprogress_layer_uploads: 0,
202 369 : num_inprogress_metadata_uploads: 0,
203 369 : num_inprogress_deletions: 0,
204 369 : inprogress_tasks: HashMap::new(),
205 369 : queued_operations: VecDeque::new(),
206 369 : #[cfg(feature = "testing")]
207 369 : dangling_files: HashMap::new(),
208 369 : shutting_down: false,
209 369 : shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
210 369 : };
211 369 :
212 369 : *self = UploadQueue::Initialized(state);
213 369 : Ok(self.initialized_mut().expect("we just set it"))
214 369 : }
215 :
216 24314 : pub(crate) fn initialized_mut(&mut self) -> anyhow::Result<&mut UploadQueueInitialized> {
217 24314 : match self {
218 : UploadQueue::Uninitialized | UploadQueue::Stopped(_) => {
219 7 : anyhow::bail!("queue is in state {}", self.as_str())
220 : }
221 24307 : UploadQueue::Initialized(x) => {
222 24307 : if !x.shutting_down {
223 24307 : Ok(x)
224 : } else {
225 UBC 0 : anyhow::bail!("queue is shutting down")
226 : }
227 : }
228 : }
229 CBC 24314 : }
230 :
231 515 : pub(crate) fn stopped_mut(&mut self) -> anyhow::Result<&mut UploadQueueStopped> {
232 515 : match self {
233 : UploadQueue::Initialized(_) | UploadQueue::Uninitialized => {
234 UBC 0 : anyhow::bail!("queue is in state {}", self.as_str())
235 : }
236 CBC 515 : UploadQueue::Stopped(stopped) => Ok(stopped),
237 : }
238 515 : }
239 : }
240 :
241 : /// An in-progress upload or delete task.
242 UBC 0 : #[derive(Debug)]
243 : pub(crate) struct UploadTask {
244 : /// Unique ID of this task. Used as the key in `inprogress_tasks` above.
245 : pub(crate) task_id: u64,
246 : pub(crate) retries: AtomicU32,
247 :
248 : pub(crate) op: UploadOp,
249 : }
250 :
251 : /// A deletion of some layers within the lifetime of a timeline. This is not used
252 : /// for timeline deletion, which skips this queue and goes directly to DeletionQueue.
253 0 : #[derive(Debug)]
254 : pub(crate) struct Delete {
255 : pub(crate) layers: Vec<(LayerFileName, LayerFileMetadata)>,
256 : }
257 :
258 0 : #[derive(Debug)]
259 : pub(crate) enum UploadOp {
260 : /// Upload a layer file
261 : UploadLayer(ResidentLayer, LayerFileMetadata),
262 :
263 : /// Upload the metadata file
264 : UploadMetadata(IndexPart, Lsn),
265 :
266 : /// Delete layer files
267 : Delete(Delete),
268 :
269 : /// Barrier. When the barrier operation is reached,
270 : Barrier(tokio::sync::watch::Sender<()>),
271 :
272 : /// Shutdown; upon encountering this operation no new operations will be spawned, otherwise
273 : /// this is the same as a Barrier.
274 : Shutdown,
275 : }
276 :
277 : impl std::fmt::Display for UploadOp {
278 CBC 4676 : fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
279 4676 : match self {
280 3424 : UploadOp::UploadLayer(layer, metadata) => {
281 3424 : write!(
282 3424 : f,
283 3424 : "UploadLayer({}, size={:?}, gen={:?})",
284 3424 : layer,
285 3424 : metadata.file_size(),
286 3424 : metadata.generation
287 3424 : )
288 : }
289 1252 : UploadOp::UploadMetadata(_, lsn) => {
290 1252 : write!(f, "UploadMetadata(lsn: {})", lsn)
291 : }
292 UBC 0 : UploadOp::Delete(delete) => {
293 0 : write!(f, "Delete({} layers)", delete.layers.len())
294 : }
295 0 : UploadOp::Barrier(_) => write!(f, "Barrier"),
296 0 : UploadOp::Shutdown => write!(f, "Shutdown"),
297 : }
298 CBC 4676 : }
299 : }
|