Line data Source code
1 : use super::storage_layer::LayerName;
2 : use super::storage_layer::ResidentLayer;
3 : use crate::tenant::metadata::TimelineMetadata;
4 : use crate::tenant::remote_timeline_client::index::IndexPart;
5 : use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
6 : use std::collections::{HashMap, VecDeque};
7 : use std::fmt::Debug;
8 :
9 : use chrono::NaiveDateTime;
10 : use std::sync::Arc;
11 : use tracing::info;
12 : use utils::lsn::AtomicLsn;
13 :
14 : use std::sync::atomic::AtomicU32;
15 : use utils::lsn::Lsn;
16 :
17 : #[cfg(feature = "testing")]
18 : use utils::generation::Generation;
19 :
20 : // clippy warns that Uninitialized is much smaller than Initialized, which wastes
21 : // memory for Uninitialized variants. Doesn't matter in practice, there are not
22 : // that many upload queues in a running pageserver, and most of them are initialized
23 : // anyway.
24 : #[allow(clippy::large_enum_variant)]
25 : pub(super) enum UploadQueue {
26 : Uninitialized,
27 : Initialized(UploadQueueInitialized),
28 : Stopped(UploadQueueStopped),
29 : }
30 :
31 : impl UploadQueue {
32 0 : pub fn as_str(&self) -> &'static str {
33 0 : match self {
34 0 : UploadQueue::Uninitialized => "Uninitialized",
35 0 : UploadQueue::Initialized(_) => "Initialized",
36 0 : UploadQueue::Stopped(_) => "Stopped",
37 : }
38 0 : }
39 : }
40 :
41 : /// This keeps track of queued and in-progress tasks.
42 : pub(crate) struct UploadQueueInitialized {
43 : /// Counter to assign task IDs
44 : pub(crate) task_counter: u64,
45 :
46 : /// The next uploaded index_part.json; assumed to be dirty.
47 : ///
48 : /// Should not be read, directly except for layer file updates. Instead you should add a
49 : /// projected field.
50 : pub(crate) dirty: IndexPart,
51 :
52 : /// The latest remote persisted IndexPart.
53 : ///
54 : /// Each completed metadata upload will update this. The second item is the task_id which last
55 : /// updated the value, used to ensure we never store an older value over a newer one.
56 : pub(crate) clean: (IndexPart, Option<u64>),
57 :
58 : /// How many file uploads or deletions been scheduled, since the
59 : /// last (scheduling of) metadata index upload?
60 : pub(crate) latest_files_changes_since_metadata_upload_scheduled: u64,
61 :
62 : /// The Lsn is only updated after our generation has been validated with
63 : /// the control plane (unlesss a timeline's generation is None, in which case
64 : /// we skip validation)
65 : pub(crate) visible_remote_consistent_lsn: Arc<AtomicLsn>,
66 :
67 : // Breakdown of different kinds of tasks currently in-progress
68 : pub(crate) num_inprogress_layer_uploads: usize,
69 : pub(crate) num_inprogress_metadata_uploads: usize,
70 : pub(crate) num_inprogress_deletions: usize,
71 :
72 : /// Tasks that are currently in-progress. In-progress means that a tokio Task
73 : /// has been launched for it. An in-progress task can be busy uploading, but it can
74 : /// also be waiting on the `concurrency_limiter` Semaphore in S3Bucket, or it can
75 : /// be waiting for retry in `exponential_backoff`.
76 : pub(crate) inprogress_tasks: HashMap<u64, Arc<UploadTask>>,
77 :
78 : /// Queued operations that have not been launched yet. They might depend on previous
79 : /// tasks to finish. For example, metadata upload cannot be performed before all
80 : /// preceding layer file uploads have completed.
81 : pub(crate) queued_operations: VecDeque<UploadOp>,
82 :
83 : /// Files which have been unlinked but not yet had scheduled a deletion for. Only kept around
84 : /// for error logging.
85 : ///
86 : /// Putting this behind a testing feature to catch problems in tests, but assuming we could have a
87 : /// bug causing leaks, then it's better to not leave this enabled for production builds.
88 : #[cfg(feature = "testing")]
89 : pub(crate) dangling_files: HashMap<LayerName, Generation>,
90 :
91 : /// Set to true when we have inserted the `UploadOp::Shutdown` into the `inprogress_tasks`.
92 : pub(crate) shutting_down: bool,
93 :
94 : /// Permitless semaphore on which any number of `RemoteTimelineClient::shutdown` futures can
95 : /// wait on until one of them stops the queue. The semaphore is closed when
96 : /// `RemoteTimelineClient::launch_queued_tasks` encounters `UploadOp::Shutdown`.
97 : pub(crate) shutdown_ready: Arc<tokio::sync::Semaphore>,
98 : }
99 :
100 : impl UploadQueueInitialized {
101 8 : pub(super) fn no_pending_work(&self) -> bool {
102 8 : self.inprogress_tasks.is_empty() && self.queued_operations.is_empty()
103 8 : }
104 :
105 0 : pub(super) fn get_last_remote_consistent_lsn_visible(&self) -> Lsn {
106 0 : self.visible_remote_consistent_lsn.load()
107 0 : }
108 :
109 0 : pub(super) fn get_last_remote_consistent_lsn_projected(&self) -> Option<Lsn> {
110 0 : let lsn = self.clean.0.metadata.disk_consistent_lsn();
111 0 : self.clean.1.map(|_| lsn)
112 0 : }
113 : }
114 :
115 : #[derive(Clone, Copy)]
116 : pub(super) enum SetDeletedFlagProgress {
117 : NotRunning,
118 : InProgress(NaiveDateTime),
119 : Successful(NaiveDateTime),
120 : }
121 :
122 : pub(super) struct UploadQueueStoppedDeletable {
123 : pub(super) upload_queue_for_deletion: UploadQueueInitialized,
124 : pub(super) deleted_at: SetDeletedFlagProgress,
125 : }
126 :
127 : pub(super) enum UploadQueueStopped {
128 : Deletable(UploadQueueStoppedDeletable),
129 : Uninitialized,
130 : }
131 :
132 0 : #[derive(thiserror::Error, Debug)]
133 : pub enum NotInitialized {
134 : #[error("queue is in state Uninitialized")]
135 : Uninitialized,
136 : #[error("queue is in state Stopped")]
137 : Stopped,
138 : #[error("queue is shutting down")]
139 : ShuttingDown,
140 : }
141 :
142 : impl NotInitialized {
143 0 : pub(crate) fn is_stopping(&self) -> bool {
144 : use NotInitialized::*;
145 0 : match self {
146 0 : Uninitialized => false,
147 0 : Stopped => true,
148 0 : ShuttingDown => true,
149 : }
150 0 : }
151 : }
152 :
153 : impl UploadQueue {
154 412 : pub(crate) fn initialize_empty_remote(
155 412 : &mut self,
156 412 : metadata: &TimelineMetadata,
157 412 : ) -> anyhow::Result<&mut UploadQueueInitialized> {
158 412 : match self {
159 412 : UploadQueue::Uninitialized => (),
160 : UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => {
161 0 : anyhow::bail!("already initialized, state {}", self.as_str())
162 : }
163 : }
164 :
165 412 : info!("initializing upload queue for empty remote");
166 :
167 412 : let index_part = IndexPart::empty(metadata.clone());
168 412 :
169 412 : let state = UploadQueueInitialized {
170 412 : dirty: index_part.clone(),
171 412 : clean: (index_part, None),
172 412 : latest_files_changes_since_metadata_upload_scheduled: 0,
173 412 : visible_remote_consistent_lsn: Arc::new(AtomicLsn::new(0)),
174 412 : // what follows are boring default initializations
175 412 : task_counter: 0,
176 412 : num_inprogress_layer_uploads: 0,
177 412 : num_inprogress_metadata_uploads: 0,
178 412 : num_inprogress_deletions: 0,
179 412 : inprogress_tasks: HashMap::new(),
180 412 : queued_operations: VecDeque::new(),
181 412 : #[cfg(feature = "testing")]
182 412 : dangling_files: HashMap::new(),
183 412 : shutting_down: false,
184 412 : shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
185 412 : };
186 412 :
187 412 : *self = UploadQueue::Initialized(state);
188 412 : Ok(self.initialized_mut().expect("we just set it"))
189 412 : }
190 :
191 6 : pub(crate) fn initialize_with_current_remote_index_part(
192 6 : &mut self,
193 6 : index_part: &IndexPart,
194 6 : ) -> anyhow::Result<&mut UploadQueueInitialized> {
195 6 : match self {
196 6 : UploadQueue::Uninitialized => (),
197 : UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => {
198 0 : anyhow::bail!("already initialized, state {}", self.as_str())
199 : }
200 : }
201 :
202 6 : info!(
203 0 : "initializing upload queue with remote index_part.disk_consistent_lsn: {}",
204 0 : index_part.metadata.disk_consistent_lsn()
205 : );
206 :
207 6 : let state = UploadQueueInitialized {
208 6 : dirty: index_part.clone(),
209 6 : clean: (index_part.clone(), None),
210 6 : latest_files_changes_since_metadata_upload_scheduled: 0,
211 6 : visible_remote_consistent_lsn: Arc::new(
212 6 : index_part.metadata.disk_consistent_lsn().into(),
213 6 : ),
214 6 : // what follows are boring default initializations
215 6 : task_counter: 0,
216 6 : num_inprogress_layer_uploads: 0,
217 6 : num_inprogress_metadata_uploads: 0,
218 6 : num_inprogress_deletions: 0,
219 6 : inprogress_tasks: HashMap::new(),
220 6 : queued_operations: VecDeque::new(),
221 6 : #[cfg(feature = "testing")]
222 6 : dangling_files: HashMap::new(),
223 6 : shutting_down: false,
224 6 : shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
225 6 : };
226 6 :
227 6 : *self = UploadQueue::Initialized(state);
228 6 : Ok(self.initialized_mut().expect("we just set it"))
229 6 : }
230 :
231 5256 : pub(crate) fn initialized_mut(
232 5256 : &mut self,
233 5256 : ) -> Result<&mut UploadQueueInitialized, NotInitialized> {
234 : use UploadQueue::*;
235 5256 : match self {
236 0 : Uninitialized => Err(NotInitialized::Uninitialized),
237 5256 : Initialized(x) => {
238 5256 : if x.shutting_down {
239 0 : Err(NotInitialized::ShuttingDown)
240 : } else {
241 5256 : Ok(x)
242 : }
243 : }
244 0 : Stopped(_) => Err(NotInitialized::Stopped),
245 : }
246 5256 : }
247 :
248 2 : pub(crate) fn stopped_mut(&mut self) -> anyhow::Result<&mut UploadQueueStoppedDeletable> {
249 2 : match self {
250 : UploadQueue::Initialized(_) | UploadQueue::Uninitialized => {
251 0 : anyhow::bail!("queue is in state {}", self.as_str())
252 : }
253 : UploadQueue::Stopped(UploadQueueStopped::Uninitialized) => {
254 0 : anyhow::bail!("queue is in state Stopped(Uninitialized)")
255 : }
256 2 : UploadQueue::Stopped(UploadQueueStopped::Deletable(deletable)) => Ok(deletable),
257 : }
258 2 : }
259 : }
260 :
261 : /// An in-progress upload or delete task.
262 : #[derive(Debug)]
263 : pub(crate) struct UploadTask {
264 : /// Unique ID of this task. Used as the key in `inprogress_tasks` above.
265 : pub(crate) task_id: u64,
266 : pub(crate) retries: AtomicU32,
267 :
268 : pub(crate) op: UploadOp,
269 : }
270 :
271 : /// A deletion of some layers within the lifetime of a timeline. This is not used
272 : /// for timeline deletion, which skips this queue and goes directly to DeletionQueue.
273 : #[derive(Debug)]
274 : pub(crate) struct Delete {
275 : pub(crate) layers: Vec<(LayerName, LayerFileMetadata)>,
276 : }
277 :
278 : #[derive(Debug)]
279 : pub(crate) enum UploadOp {
280 : /// Upload a layer file
281 : UploadLayer(ResidentLayer, LayerFileMetadata),
282 :
283 : /// Upload a index_part.json file
284 : UploadMetadata {
285 : /// The next [`UploadQueueInitialized::clean`] after this upload succeeds.
286 : uploaded: Box<IndexPart>,
287 : },
288 :
289 : /// Delete layer files
290 : Delete(Delete),
291 :
292 : /// Barrier. When the barrier operation is reached, the channel is closed.
293 : Barrier(tokio::sync::watch::Sender<()>),
294 :
295 : /// Shutdown; upon encountering this operation no new operations will be spawned, otherwise
296 : /// this is the same as a Barrier.
297 : Shutdown,
298 : }
299 :
300 : impl std::fmt::Display for UploadOp {
301 0 : fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
302 0 : match self {
303 0 : UploadOp::UploadLayer(layer, metadata) => {
304 0 : write!(
305 0 : f,
306 0 : "UploadLayer({}, size={:?}, gen={:?})",
307 0 : layer, metadata.file_size, metadata.generation
308 0 : )
309 : }
310 0 : UploadOp::UploadMetadata { uploaded, .. } => {
311 0 : write!(
312 0 : f,
313 0 : "UploadMetadata(lsn: {})",
314 0 : uploaded.metadata.disk_consistent_lsn()
315 0 : )
316 : }
317 0 : UploadOp::Delete(delete) => {
318 0 : write!(f, "Delete({} layers)", delete.layers.len())
319 : }
320 0 : UploadOp::Barrier(_) => write!(f, "Barrier"),
321 0 : UploadOp::Shutdown => write!(f, "Shutdown"),
322 : }
323 0 : }
324 : }
|