Line data Source code
1 : use super::storage_layer::LayerFileName;
2 : use super::storage_layer::ResidentLayer;
3 : use crate::tenant::metadata::TimelineMetadata;
4 : use crate::tenant::remote_timeline_client::index::IndexPart;
5 : use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
6 : use std::collections::{HashMap, VecDeque};
7 : use std::fmt::Debug;
8 :
9 : use chrono::NaiveDateTime;
10 : use std::sync::Arc;
11 : use tracing::info;
12 : use utils::lsn::AtomicLsn;
13 :
14 : use std::sync::atomic::AtomicU32;
15 : use utils::lsn::Lsn;
16 :
17 : #[cfg(feature = "testing")]
18 : use utils::generation::Generation;
19 :
20 : // clippy warns that Uninitialized is much smaller than Initialized, which wastes
21 : // memory for Uninitialized variants. Doesn't matter in practice, there are not
22 : // that many upload queues in a running pageserver, and most of them are initialized
23 : // anyway.
24 : #[allow(clippy::large_enum_variant)]
25 : pub(super) enum UploadQueue {
26 : Uninitialized,
27 : Initialized(UploadQueueInitialized),
28 : Stopped(UploadQueueStopped),
29 : }
30 :
31 : impl UploadQueue {
32 0 : pub fn as_str(&self) -> &'static str {
33 0 : match self {
34 0 : UploadQueue::Uninitialized => "Uninitialized",
35 0 : UploadQueue::Initialized(_) => "Initialized",
36 0 : UploadQueue::Stopped(_) => "Stopped",
37 : }
38 0 : }
39 : }
40 :
41 : /// This keeps track of queued and in-progress tasks.
42 : pub(crate) struct UploadQueueInitialized {
43 : /// Counter to assign task IDs
44 : pub(crate) task_counter: u64,
45 :
46 : /// All layer files stored in the remote storage, taking into account all
47 : /// in-progress and queued operations
48 : pub(crate) latest_files: HashMap<LayerFileName, LayerFileMetadata>,
49 :
50 : /// How many file uploads or deletions been scheduled, since the
51 : /// last (scheduling of) metadata index upload?
52 : pub(crate) latest_files_changes_since_metadata_upload_scheduled: u64,
53 :
54 : /// Metadata stored in the remote storage, taking into account all
55 : /// in-progress and queued operations.
56 : /// DANGER: do not return to outside world, e.g., safekeepers.
57 : pub(crate) latest_metadata: TimelineMetadata,
58 :
59 : /// `disk_consistent_lsn` from the last metadata file that was successfully
60 : /// uploaded. `Lsn(0)` if nothing was uploaded yet.
61 : /// Unlike `latest_files` or `latest_metadata`, this value is never ahead.
62 : /// Safekeeper can rely on it to make decisions for WAL storage.
63 : ///
64 : /// visible_remote_consistent_lsn is only updated after our generation has been validated with
65 : /// the control plane (unlesss a timeline's generation is None, in which case
66 : /// we skip validation)
67 : pub(crate) projected_remote_consistent_lsn: Option<Lsn>,
68 : pub(crate) visible_remote_consistent_lsn: Arc<AtomicLsn>,
69 :
70 : // Breakdown of different kinds of tasks currently in-progress
71 : pub(crate) num_inprogress_layer_uploads: usize,
72 : pub(crate) num_inprogress_metadata_uploads: usize,
73 : pub(crate) num_inprogress_deletions: usize,
74 :
75 : /// Tasks that are currently in-progress. In-progress means that a tokio Task
76 : /// has been launched for it. An in-progress task can be busy uploading, but it can
77 : /// also be waiting on the `concurrency_limiter` Semaphore in S3Bucket, or it can
78 : /// be waiting for retry in `exponential_backoff`.
79 : pub(crate) inprogress_tasks: HashMap<u64, Arc<UploadTask>>,
80 :
81 : /// Queued operations that have not been launched yet. They might depend on previous
82 : /// tasks to finish. For example, metadata upload cannot be performed before all
83 : /// preceding layer file uploads have completed.
84 : pub(crate) queued_operations: VecDeque<UploadOp>,
85 :
86 : /// Files which have been unlinked but not yet had scheduled a deletion for. Only kept around
87 : /// for error logging.
88 : ///
89 : /// Putting this behind a testing feature to catch problems in tests, but assuming we could have a
90 : /// bug causing leaks, then it's better to not leave this enabled for production builds.
91 : #[cfg(feature = "testing")]
92 : pub(crate) dangling_files: HashMap<LayerFileName, Generation>,
93 :
94 : /// Set to true when we have inserted the `UploadOp::Shutdown` into the `inprogress_tasks`.
95 : pub(crate) shutting_down: bool,
96 :
97 : /// Permitless semaphore on which any number of `RemoteTimelineClient::shutdown` futures can
98 : /// wait on until one of them stops the queue. The semaphore is closed when
99 : /// `RemoteTimelineClient::launch_queued_tasks` encounters `UploadOp::Shutdown`.
100 : pub(crate) shutdown_ready: Arc<tokio::sync::Semaphore>,
101 : }
102 :
103 : impl UploadQueueInitialized {
104 189 : pub(super) fn no_pending_work(&self) -> bool {
105 189 : self.inprogress_tasks.is_empty() && self.queued_operations.is_empty()
106 189 : }
107 :
108 798512 : pub(super) fn get_last_remote_consistent_lsn_visible(&self) -> Lsn {
109 798512 : self.visible_remote_consistent_lsn.load()
110 798512 : }
111 :
112 3042 : pub(super) fn get_last_remote_consistent_lsn_projected(&self) -> Option<Lsn> {
113 3042 : self.projected_remote_consistent_lsn
114 3042 : }
115 : }
116 :
117 0 : #[derive(Clone, Copy)]
118 : pub(super) enum SetDeletedFlagProgress {
119 : NotRunning,
120 : InProgress(NaiveDateTime),
121 : Successful(NaiveDateTime),
122 : }
123 :
124 : pub(super) struct UploadQueueStopped {
125 : pub(super) upload_queue_for_deletion: UploadQueueInitialized,
126 : pub(super) deleted_at: SetDeletedFlagProgress,
127 : }
128 :
129 14 : #[derive(thiserror::Error, Debug)]
130 : pub(crate) enum NotInitialized {
131 : #[error("queue is in state Uninitialized")]
132 : Uninitialized,
133 : #[error("queue is in state Stopping")]
134 : Stopped,
135 : #[error("queue is shutting down")]
136 : ShuttingDown,
137 : }
138 :
139 : impl NotInitialized {
140 0 : pub(crate) fn is_stopping(&self) -> bool {
141 0 : use NotInitialized::*;
142 0 : match self {
143 0 : Uninitialized => false,
144 0 : Stopped => true,
145 0 : ShuttingDown => true,
146 : }
147 0 : }
148 : }
149 :
150 : impl UploadQueue {
151 1144 : pub(crate) fn initialize_empty_remote(
152 1144 : &mut self,
153 1144 : metadata: &TimelineMetadata,
154 1144 : ) -> anyhow::Result<&mut UploadQueueInitialized> {
155 1144 : match self {
156 1144 : UploadQueue::Uninitialized => (),
157 : UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => {
158 0 : anyhow::bail!("already initialized, state {}", self.as_str())
159 : }
160 : }
161 :
162 1144 : info!("initializing upload queue for empty remote");
163 :
164 1144 : let state = UploadQueueInitialized {
165 1144 : // As described in the doc comment, it's ok for `latest_files` and `latest_metadata` to be ahead.
166 1144 : latest_files: HashMap::new(),
167 1144 : latest_files_changes_since_metadata_upload_scheduled: 0,
168 1144 : latest_metadata: metadata.clone(),
169 1144 : projected_remote_consistent_lsn: None,
170 1144 : visible_remote_consistent_lsn: Arc::new(AtomicLsn::new(0)),
171 1144 : // what follows are boring default initializations
172 1144 : task_counter: 0,
173 1144 : num_inprogress_layer_uploads: 0,
174 1144 : num_inprogress_metadata_uploads: 0,
175 1144 : num_inprogress_deletions: 0,
176 1144 : inprogress_tasks: HashMap::new(),
177 1144 : queued_operations: VecDeque::new(),
178 1144 : #[cfg(feature = "testing")]
179 1144 : dangling_files: HashMap::new(),
180 1144 : shutting_down: false,
181 1144 : shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
182 1144 : };
183 1144 :
184 1144 : *self = UploadQueue::Initialized(state);
185 1144 : Ok(self.initialized_mut().expect("we just set it"))
186 1144 : }
187 :
188 424 : pub(crate) fn initialize_with_current_remote_index_part(
189 424 : &mut self,
190 424 : index_part: &IndexPart,
191 424 : ) -> anyhow::Result<&mut UploadQueueInitialized> {
192 424 : match self {
193 424 : UploadQueue::Uninitialized => (),
194 : UploadQueue::Initialized(_) | UploadQueue::Stopped(_) => {
195 0 : anyhow::bail!("already initialized, state {}", self.as_str())
196 : }
197 : }
198 :
199 424 : let mut files = HashMap::with_capacity(index_part.layer_metadata.len());
200 57586 : for (layer_name, layer_metadata) in &index_part.layer_metadata {
201 57162 : files.insert(
202 57162 : layer_name.to_owned(),
203 57162 : LayerFileMetadata::from(layer_metadata),
204 57162 : );
205 57162 : }
206 :
207 424 : info!(
208 424 : "initializing upload queue with remote index_part.disk_consistent_lsn: {}",
209 424 : index_part.metadata.disk_consistent_lsn()
210 424 : );
211 :
212 424 : let state = UploadQueueInitialized {
213 424 : latest_files: files,
214 424 : latest_files_changes_since_metadata_upload_scheduled: 0,
215 424 : latest_metadata: index_part.metadata.clone(),
216 424 : projected_remote_consistent_lsn: Some(index_part.metadata.disk_consistent_lsn()),
217 424 : visible_remote_consistent_lsn: Arc::new(
218 424 : index_part.metadata.disk_consistent_lsn().into(),
219 424 : ),
220 424 : // what follows are boring default initializations
221 424 : task_counter: 0,
222 424 : num_inprogress_layer_uploads: 0,
223 424 : num_inprogress_metadata_uploads: 0,
224 424 : num_inprogress_deletions: 0,
225 424 : inprogress_tasks: HashMap::new(),
226 424 : queued_operations: VecDeque::new(),
227 424 : #[cfg(feature = "testing")]
228 424 : dangling_files: HashMap::new(),
229 424 : shutting_down: false,
230 424 : shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
231 424 : };
232 424 :
233 424 : *self = UploadQueue::Initialized(state);
234 424 : Ok(self.initialized_mut().expect("we just set it"))
235 424 : }
236 :
237 28221 : pub(crate) fn initialized_mut(&mut self) -> anyhow::Result<&mut UploadQueueInitialized> {
238 28221 : use UploadQueue::*;
239 28221 : match self {
240 0 : Uninitialized => Err(NotInitialized::Uninitialized.into()),
241 28207 : Initialized(x) => {
242 28207 : if x.shutting_down {
243 0 : Err(NotInitialized::ShuttingDown.into())
244 : } else {
245 28207 : Ok(x)
246 : }
247 : }
248 14 : Stopped(_) => Err(NotInitialized::Stopped.into()),
249 : }
250 28221 : }
251 :
252 569 : pub(crate) fn stopped_mut(&mut self) -> anyhow::Result<&mut UploadQueueStopped> {
253 569 : match self {
254 : UploadQueue::Initialized(_) | UploadQueue::Uninitialized => {
255 0 : anyhow::bail!("queue is in state {}", self.as_str())
256 : }
257 569 : UploadQueue::Stopped(stopped) => Ok(stopped),
258 : }
259 569 : }
260 : }
261 :
262 : /// An in-progress upload or delete task.
263 0 : #[derive(Debug)]
264 : pub(crate) struct UploadTask {
265 : /// Unique ID of this task. Used as the key in `inprogress_tasks` above.
266 : pub(crate) task_id: u64,
267 : pub(crate) retries: AtomicU32,
268 :
269 : pub(crate) op: UploadOp,
270 : }
271 :
272 : /// A deletion of some layers within the lifetime of a timeline. This is not used
273 : /// for timeline deletion, which skips this queue and goes directly to DeletionQueue.
274 0 : #[derive(Debug)]
275 : pub(crate) struct Delete {
276 : pub(crate) layers: Vec<(LayerFileName, LayerFileMetadata)>,
277 : }
278 :
279 0 : #[derive(Debug)]
280 : pub(crate) enum UploadOp {
281 : /// Upload a layer file
282 : UploadLayer(ResidentLayer, LayerFileMetadata),
283 :
284 : /// Upload the metadata file
285 : UploadMetadata(IndexPart, Lsn),
286 :
287 : /// Delete layer files
288 : Delete(Delete),
289 :
290 : /// Barrier. When the barrier operation is reached,
291 : Barrier(tokio::sync::watch::Sender<()>),
292 :
293 : /// Shutdown; upon encountering this operation no new operations will be spawned, otherwise
294 : /// this is the same as a Barrier.
295 : Shutdown,
296 : }
297 :
298 : impl std::fmt::Display for UploadOp {
299 4945 : fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
300 4945 : match self {
301 3641 : UploadOp::UploadLayer(layer, metadata) => {
302 3641 : write!(
303 3641 : f,
304 3641 : "UploadLayer({}, size={:?}, gen={:?})",
305 3641 : layer,
306 3641 : metadata.file_size(),
307 3641 : metadata.generation
308 3641 : )
309 : }
310 1304 : UploadOp::UploadMetadata(_, lsn) => {
311 1304 : write!(f, "UploadMetadata(lsn: {})", lsn)
312 : }
313 0 : UploadOp::Delete(delete) => {
314 0 : write!(f, "Delete({} layers)", delete.layers.len())
315 : }
316 0 : UploadOp::Barrier(_) => write!(f, "Barrier"),
317 0 : UploadOp::Shutdown => write!(f, "Shutdown"),
318 : }
319 4945 : }
320 : }
|