Line data Source code
1 : #![recursion_limit = "300"]
2 : #![deny(clippy::undocumented_unsafe_blocks)]
3 :
4 : mod auth;
5 : pub mod basebackup;
6 : pub mod config;
7 : pub mod consumption_metrics;
8 : pub mod context;
9 : pub mod control_plane_client;
10 : pub mod deletion_queue;
11 : pub mod disk_usage_eviction_task;
12 : pub mod http;
13 : pub mod import_datadir;
14 : pub mod l0_flush;
15 :
16 : use futures::{stream::FuturesUnordered, StreamExt};
17 : pub use pageserver_api::keyspace;
18 : use tokio_util::sync::CancellationToken;
19 : mod assert_u64_eq_usize;
20 : pub mod aux_file;
21 : pub mod metrics;
22 : pub mod page_cache;
23 : pub mod page_service;
24 : pub mod pgdatadir_mapping;
25 : pub mod repository;
26 : pub mod span;
27 : pub(crate) mod statvfs;
28 : pub mod task_mgr;
29 : pub mod tenant;
30 : pub mod utilization;
31 : pub mod virtual_file;
32 : pub mod walingest;
33 : pub mod walrecord;
34 : pub mod walredo;
35 :
36 : use camino::Utf8Path;
37 : use deletion_queue::DeletionQueue;
38 : use tenant::{
39 : mgr::{BackgroundPurges, TenantManager},
40 : secondary,
41 : };
42 : use tracing::{info, info_span};
43 :
44 : /// Current storage format version
45 : ///
46 : /// This is embedded in the header of all the layer files.
47 : /// If you make any backwards-incompatible changes to the storage
48 : /// format, bump this!
49 : /// Note that TimelineMetadata uses its own version number to track
50 : /// backwards-compatible changes to the metadata format.
51 : pub const STORAGE_FORMAT_VERSION: u16 = 3;
52 :
53 : pub const DEFAULT_PG_VERSION: u32 = 16;
54 :
55 : // Magic constants used to identify different kinds of files
56 : pub const IMAGE_FILE_MAGIC: u16 = 0x5A60;
57 : pub const DELTA_FILE_MAGIC: u16 = 0x5A61;
58 :
59 : static ZERO_PAGE: bytes::Bytes = bytes::Bytes::from_static(&[0u8; 8192]);
60 :
61 : pub use crate::metrics::preinitialize_metrics;
62 :
63 : pub struct CancellableTask {
64 : pub task: tokio::task::JoinHandle<()>,
65 : pub cancel: CancellationToken,
66 : }
67 : pub struct HttpEndpointListener(pub CancellableTask);
68 : pub struct ConsumptionMetricsTasks(pub CancellableTask);
69 : pub struct DiskUsageEvictionTask(pub CancellableTask);
70 : impl CancellableTask {
71 0 : pub async fn shutdown(self) {
72 0 : self.cancel.cancel();
73 0 : self.task.await.unwrap();
74 0 : }
75 : }
76 :
77 0 : #[tracing::instrument(skip_all, fields(%exit_code))]
78 : #[allow(clippy::too_many_arguments)]
79 : pub async fn shutdown_pageserver(
80 : http_listener: HttpEndpointListener,
81 : page_service: page_service::Listener,
82 : consumption_metrics_worker: ConsumptionMetricsTasks,
83 : disk_usage_eviction_task: Option<DiskUsageEvictionTask>,
84 : tenant_manager: &TenantManager,
85 : background_purges: BackgroundPurges,
86 : mut deletion_queue: DeletionQueue,
87 : secondary_controller_tasks: secondary::GlobalTasks,
88 : exit_code: i32,
89 : ) {
90 : use std::time::Duration;
91 :
92 : let started_at = std::time::Instant::now();
93 :
94 : // If the orderly shutdown below takes too long, we still want to make
95 : // sure that all walredo processes are killed and wait()ed on by us, not systemd.
96 : //
97 : // (Leftover walredo processes are the hypothesized trigger for the systemd freezes
98 : // that we keep seeing in prod => https://github.com/neondatabase/cloud/issues/11387.
99 : //
100 : // We use a thread instead of a tokio task because the background runtime is likely busy
101 : // with the final flushing / uploads. This activity here has priority, and due to lack
102 : // of scheduling priority feature sin the tokio scheduler, using a separate thread is
103 : // an effective priority booster.
104 : let walredo_extraordinary_shutdown_thread_span = {
105 : let span = info_span!(parent: None, "walredo_extraordinary_shutdown_thread");
106 : span.follows_from(tracing::Span::current());
107 : span
108 : };
109 : let walredo_extraordinary_shutdown_thread_cancel = CancellationToken::new();
110 : let walredo_extraordinary_shutdown_thread = std::thread::spawn({
111 : let walredo_extraordinary_shutdown_thread_cancel =
112 : walredo_extraordinary_shutdown_thread_cancel.clone();
113 0 : move || {
114 0 : let rt = tokio::runtime::Builder::new_current_thread()
115 0 : .enable_all()
116 0 : .build()
117 0 : .unwrap();
118 0 : let _entered = rt.enter();
119 0 : let _entered = walredo_extraordinary_shutdown_thread_span.enter();
120 0 : if let Ok(()) = rt.block_on(tokio::time::timeout(
121 0 : Duration::from_secs(8),
122 0 : walredo_extraordinary_shutdown_thread_cancel.cancelled(),
123 0 : )) {
124 0 : info!("cancellation requested");
125 0 : return;
126 0 : }
127 0 : let managers = tenant::WALREDO_MANAGERS
128 0 : .lock()
129 0 : .unwrap()
130 0 : // prevents new walredo managers from being inserted
131 0 : .take()
132 0 : .expect("only we take()");
133 0 : // Use FuturesUnordered to get in queue early for each manager's
134 0 : // heavier_once_cell semaphore wait list.
135 0 : // Also, for idle tenants that for some reason haven't
136 0 : // shut down yet, it's quite likely that we're not going
137 0 : // to get Poll::Pending once.
138 0 : let mut futs: FuturesUnordered<_> = managers
139 0 : .into_iter()
140 0 : .filter_map(|(_, mgr)| mgr.upgrade())
141 0 : .map(|mgr| async move { tokio::task::unconstrained(mgr.shutdown()).await })
142 0 : .collect();
143 0 : info!(count=%futs.len(), "built FuturesUnordered");
144 0 : let mut last_log_at = std::time::Instant::now();
145 : #[derive(Debug, Default)]
146 : struct Results {
147 : initiated: u64,
148 : already: u64,
149 : }
150 0 : let mut results = Results::default();
151 0 : while let Some(we_initiated) = rt.block_on(futs.next()) {
152 0 : if we_initiated {
153 0 : results.initiated += 1;
154 0 : } else {
155 0 : results.already += 1;
156 0 : }
157 0 : if last_log_at.elapsed() > Duration::from_millis(100) {
158 0 : info!(remaining=%futs.len(), ?results, "progress");
159 0 : last_log_at = std::time::Instant::now();
160 0 : }
161 : }
162 0 : info!(?results, "done");
163 0 : }
164 : });
165 :
166 : // Shut down the libpq endpoint task. This prevents new connections from
167 : // being accepted.
168 : let remaining_connections = timed(
169 : page_service.stop_accepting(),
170 : "shutdown LibpqEndpointListener",
171 : Duration::from_secs(1),
172 : )
173 : .await;
174 :
175 : // Shut down all the tenants. This flushes everything to disk and kills
176 : // the checkpoint and GC tasks.
177 : timed(
178 : tenant_manager.shutdown(),
179 : "shutdown all tenants",
180 : Duration::from_secs(5),
181 : )
182 : .await;
183 :
184 : // Shut down any page service tasks: any in-progress work for particular timelines or tenants
185 : // should already have been canclled via mgr::shutdown_all_tenants
186 : timed(
187 : remaining_connections.shutdown(),
188 : "shutdown PageRequestHandlers",
189 : Duration::from_secs(1),
190 : )
191 : .await;
192 :
193 : // Best effort to persist any outstanding deletions, to avoid leaking objects
194 : deletion_queue.shutdown(Duration::from_secs(5)).await;
195 :
196 : timed(
197 : consumption_metrics_worker.0.shutdown(),
198 : "shutdown consumption metrics",
199 : Duration::from_secs(1),
200 : )
201 : .await;
202 :
203 : timed(
204 0 : futures::future::OptionFuture::from(disk_usage_eviction_task.map(|t| t.0.shutdown())),
205 : "shutdown disk usage eviction",
206 : Duration::from_secs(1),
207 : )
208 : .await;
209 :
210 : timed(
211 : background_purges.shutdown(),
212 : "shutdown background purges",
213 : Duration::from_secs(1),
214 : )
215 : .await;
216 :
217 : // Shut down the HTTP endpoint last, so that you can still check the server's
218 : // status while it's shutting down.
219 : // FIXME: We should probably stop accepting commands like attach/detach earlier.
220 : timed(
221 : http_listener.0.shutdown(),
222 : "shutdown http",
223 : Duration::from_secs(1),
224 : )
225 : .await;
226 :
227 : timed(
228 : secondary_controller_tasks.wait(), // cancellation happened in caller
229 : "secondary controller wait",
230 : Duration::from_secs(1),
231 : )
232 : .await;
233 :
234 : // There should be nothing left, but let's be sure
235 : timed(
236 : task_mgr::shutdown_tasks(None, None, None),
237 : "shutdown leftovers",
238 : Duration::from_secs(1),
239 : )
240 : .await;
241 :
242 : info!("cancel & join walredo_extraordinary_shutdown_thread");
243 : walredo_extraordinary_shutdown_thread_cancel.cancel();
244 : walredo_extraordinary_shutdown_thread.join().unwrap();
245 : info!("walredo_extraordinary_shutdown_thread done");
246 :
247 : info!(
248 : elapsed_ms = started_at.elapsed().as_millis(),
249 : "Shut down successfully completed"
250 : );
251 : std::process::exit(exit_code);
252 : }
253 :
254 : /// Per-tenant configuration file.
255 : /// Full path: `tenants/<tenant_id>/config-v1`.
256 : pub(crate) const TENANT_LOCATION_CONFIG_NAME: &str = "config-v1";
257 :
258 : /// Per-tenant copy of their remote heatmap, downloaded into the local
259 : /// tenant path while in secondary mode.
260 : pub(crate) const TENANT_HEATMAP_BASENAME: &str = "heatmap-v1.json";
261 :
262 : /// A suffix used for various temporary files. Any temporary files found in the
263 : /// data directory at pageserver startup can be automatically removed.
264 : pub(crate) const TEMP_FILE_SUFFIX: &str = "___temp";
265 :
266 : /// A marker file to mark that a timeline directory was not fully initialized.
267 : /// If a timeline directory with this marker is encountered at pageserver startup,
268 : /// the timeline directory and the marker file are both removed.
269 : /// Full path: `tenants/<tenant_id>/timelines/<timeline_id>___uninit`.
270 : pub(crate) const TIMELINE_UNINIT_MARK_SUFFIX: &str = "___uninit";
271 :
272 : pub(crate) const TIMELINE_DELETE_MARK_SUFFIX: &str = "___delete";
273 :
274 24 : pub fn is_temporary(path: &Utf8Path) -> bool {
275 24 : match path.file_name() {
276 24 : Some(name) => name.ends_with(TEMP_FILE_SUFFIX),
277 0 : None => false,
278 : }
279 24 : }
280 :
281 48 : fn ends_with_suffix(path: &Utf8Path, suffix: &str) -> bool {
282 48 : match path.file_name() {
283 48 : Some(name) => name.ends_with(suffix),
284 0 : None => false,
285 : }
286 48 : }
287 :
288 : // FIXME: DO NOT ADD new query methods like this, which will have a next step of parsing timelineid
289 : // from the directory name. Instead create type "UninitMark(TimelineId)" and only parse it once
290 : // from the name.
291 :
292 24 : pub(crate) fn is_uninit_mark(path: &Utf8Path) -> bool {
293 24 : ends_with_suffix(path, TIMELINE_UNINIT_MARK_SUFFIX)
294 24 : }
295 :
296 24 : pub(crate) fn is_delete_mark(path: &Utf8Path) -> bool {
297 24 : ends_with_suffix(path, TIMELINE_DELETE_MARK_SUFFIX)
298 24 : }
299 :
300 : /// During pageserver startup, we need to order operations not to exhaust tokio worker threads by
301 : /// blocking.
302 : ///
303 : /// The instances of this value exist only during startup, otherwise `None` is provided, meaning no
304 : /// delaying is needed.
305 : #[derive(Clone)]
306 : pub struct InitializationOrder {
307 : /// Each initial tenant load task carries this until it is done loading timelines from remote storage
308 : pub initial_tenant_load_remote: Option<utils::completion::Completion>,
309 :
310 : /// Each initial tenant load task carries this until completion.
311 : pub initial_tenant_load: Option<utils::completion::Completion>,
312 :
313 : /// Barrier for when we can start any background jobs.
314 : ///
315 : /// This can be broken up later on, but right now there is just one class of a background job.
316 : pub background_jobs_can_start: utils::completion::Barrier,
317 : }
318 :
319 : /// Time the future with a warning when it exceeds a threshold.
320 90 : async fn timed<Fut: std::future::Future>(
321 90 : fut: Fut,
322 90 : name: &str,
323 90 : warn_at: std::time::Duration,
324 90 : ) -> <Fut as std::future::Future>::Output {
325 90 : let started = std::time::Instant::now();
326 90 :
327 90 : let mut fut = std::pin::pin!(fut);
328 90 :
329 90 : match tokio::time::timeout(warn_at, &mut fut).await {
330 84 : Ok(ret) => {
331 84 : tracing::info!(
332 : stage = name,
333 0 : elapsed_ms = started.elapsed().as_millis(),
334 0 : "completed"
335 : );
336 84 : ret
337 : }
338 : Err(_) => {
339 6 : tracing::info!(
340 : stage = name,
341 0 : elapsed_ms = started.elapsed().as_millis(),
342 0 : "still waiting, taking longer than expected..."
343 : );
344 :
345 6 : let ret = fut.await;
346 :
347 : // this has a global allowed_errors
348 6 : tracing::warn!(
349 : stage = name,
350 0 : elapsed_ms = started.elapsed().as_millis(),
351 0 : "completed, took longer than expected"
352 : );
353 :
354 6 : ret
355 : }
356 : }
357 90 : }
358 :
359 : #[cfg(test)]
360 : mod timed_tests {
361 : use super::timed;
362 : use std::time::Duration;
363 :
364 : #[tokio::test]
365 6 : async fn timed_completes_when_inner_future_completes() {
366 6 : // A future that completes on time should have its result returned
367 6 : let r1 = timed(
368 6 : async move {
369 6 : tokio::time::sleep(Duration::from_millis(10)).await;
370 6 : 123
371 6 : },
372 6 : "test 1",
373 6 : Duration::from_millis(50),
374 6 : )
375 6 : .await;
376 6 : assert_eq!(r1, 123);
377 6 :
378 6 : // A future that completes too slowly should also have its result returned
379 6 : let r1 = timed(
380 6 : async move {
381 18 : tokio::time::sleep(Duration::from_millis(50)).await;
382 6 : 456
383 6 : },
384 6 : "test 1",
385 6 : Duration::from_millis(10),
386 6 : )
387 12 : .await;
388 6 : assert_eq!(r1, 456);
389 6 : }
390 : }
|