Line data Source code
1 : //! Common traits and structs for layers
2 :
3 : pub mod batch_split_writer;
4 : pub mod delta_layer;
5 : pub mod errors;
6 : pub mod filter_iterator;
7 : pub mod image_layer;
8 : pub mod inmemory_layer;
9 : pub(crate) mod layer;
10 : mod layer_desc;
11 : mod layer_name;
12 : pub mod merge_iterator;
13 :
14 : use std::cmp::Ordering;
15 : use std::collections::hash_map::Entry;
16 : use std::collections::{BinaryHeap, HashMap};
17 : use std::ops::Range;
18 : use std::pin::Pin;
19 : use std::sync::Arc;
20 : use std::sync::atomic::AtomicUsize;
21 : use std::time::{Duration, SystemTime, UNIX_EPOCH};
22 :
23 : use crate::PERF_TRACE_TARGET;
24 : pub use batch_split_writer::{BatchLayerWriter, SplitDeltaLayerWriter, SplitImageLayerWriter};
25 : use bytes::Bytes;
26 : pub use delta_layer::{DeltaLayer, DeltaLayerWriter, ValueRef};
27 : use futures::StreamExt;
28 : use futures::stream::FuturesUnordered;
29 : pub use image_layer::{ImageLayer, ImageLayerWriter};
30 : pub use inmemory_layer::InMemoryLayer;
31 : pub(crate) use layer::{EvictionError, Layer, ResidentLayer};
32 : pub use layer_desc::{PersistentLayerDesc, PersistentLayerKey};
33 : pub use layer_name::{DeltaLayerName, ImageLayerName, LayerName};
34 : use pageserver_api::config::GetVectoredConcurrentIo;
35 : use pageserver_api::key::Key;
36 : use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
37 : use tracing::{Instrument, info_span, trace};
38 : use utils::lsn::Lsn;
39 : use utils::sync::gate::GateGuard;
40 : use wal_decoder::models::record::NeonWalRecord;
41 : use wal_decoder::models::value::Value;
42 :
43 : use self::inmemory_layer::InMemoryLayerFileId;
44 : use super::PageReconstructError;
45 : use super::layer_map::InMemoryLayerDesc;
46 : use super::timeline::{GetVectoredError, ReadPath};
47 : use crate::context::{
48 : AccessStatsBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
49 : };
50 :
51 0 : pub fn range_overlaps<T>(a: &Range<T>, b: &Range<T>) -> bool
52 0 : where
53 0 : T: PartialOrd<T>,
54 : {
55 0 : if a.start < b.start {
56 0 : a.end > b.start
57 : } else {
58 0 : b.end > a.start
59 : }
60 0 : }
61 :
62 : /// Struct used to communicate across calls to 'get_value_reconstruct_data'.
63 : ///
64 : /// Before first call, you can fill in 'page_img' if you have an older cached
65 : /// version of the page available. That can save work in
66 : /// 'get_value_reconstruct_data', as it can stop searching for page versions
67 : /// when all the WAL records going back to the cached image have been collected.
68 : ///
69 : /// When get_value_reconstruct_data returns Complete, 'img' is set to an image
70 : /// of the page, or the oldest WAL record in 'records' is a will_init-type
71 : /// record that initializes the page without requiring a previous image.
72 : ///
73 : /// If 'get_page_reconstruct_data' returns Continue, some 'records' may have
74 : /// been collected, but there are more records outside the current layer. Pass
75 : /// the same ValueReconstructState struct in the next 'get_value_reconstruct_data'
76 : /// call, to collect more records.
77 : ///
78 : #[derive(Debug, Default, Clone)]
79 : pub(crate) struct ValueReconstructState {
80 : pub(crate) records: Vec<(Lsn, NeonWalRecord)>,
81 : pub(crate) img: Option<(Lsn, Bytes)>,
82 : }
83 :
84 : impl ValueReconstructState {
85 : /// Returns the number of page deltas applied to the page image.
86 727070 : pub fn num_deltas(&self) -> usize {
87 727070 : match self.img {
88 699602 : Some(_) => self.records.len(),
89 27468 : None => self.records.len() - 1, // omit will_init record
90 : }
91 727070 : }
92 : }
93 :
94 : #[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
95 : pub(crate) enum ValueReconstructSituation {
96 : Complete,
97 : #[default]
98 : Continue,
99 : }
100 :
101 : /// On disk representation of a value loaded in a buffer
102 : #[derive(Debug)]
103 : pub(crate) enum OnDiskValue {
104 : /// Unencoded [`Value::Image`]
105 : RawImage(Bytes),
106 : /// Encoded [`Value`]. Can deserialize into an image or a WAL record
107 : WalRecordOrImage(Bytes),
108 : }
109 :
110 : /// Reconstruct data accumulated for a single key during a vectored get
111 : #[derive(Debug, Default)]
112 : pub struct VectoredValueReconstructState {
113 : pub(crate) on_disk_values: Vec<(Lsn, OnDiskValueIoWaiter)>,
114 :
115 : pub(crate) situation: ValueReconstructSituation,
116 : }
117 :
118 : #[derive(Debug)]
119 : pub(crate) struct OnDiskValueIoWaiter {
120 : rx: tokio::sync::oneshot::Receiver<OnDiskValueIoResult>,
121 : }
122 :
123 : #[derive(Debug)]
124 : #[must_use]
125 : pub(crate) enum OnDiskValueIo {
126 : /// Traversal identified this IO as required to complete the vectored get.
127 : Required {
128 : num_active_ios: Arc<AtomicUsize>,
129 : tx: tokio::sync::oneshot::Sender<OnDiskValueIoResult>,
130 : },
131 : /// Sparse keyspace reads always read all the values for a given key,
132 : /// even though only the first value is needed.
133 : ///
134 : /// This variant represents the unnecessary IOs for those values at lower LSNs
135 : /// that aren't needed, but are currently still being done.
136 : ///
137 : /// The execution of unnecessary IOs was a pre-existing behavior before concurrent IO.
138 : /// We added this explicit representation here so that we can drop
139 : /// unnecessary IO results immediately, instead of buffering them in
140 : /// `oneshot` channels inside [`VectoredValueReconstructState`] until
141 : /// [`VectoredValueReconstructState::collect_pending_ios`] gets called.
142 : Unnecessary,
143 : }
144 :
145 : type OnDiskValueIoResult = Result<OnDiskValue, std::io::Error>;
146 :
147 : impl OnDiskValueIo {
148 1797835 : pub(crate) fn complete(self, res: OnDiskValueIoResult) {
149 1797835 : match self {
150 1753091 : OnDiskValueIo::Required { num_active_ios, tx } => {
151 1753091 : num_active_ios.fetch_sub(1, std::sync::atomic::Ordering::Release);
152 1753091 : let _ = tx.send(res);
153 1753091 : }
154 44744 : OnDiskValueIo::Unnecessary => {
155 44744 : // Nobody cared, see variant doc comment.
156 44744 : }
157 : }
158 1797835 : }
159 : }
160 :
161 : #[derive(Debug, thiserror::Error)]
162 : pub(crate) enum WaitCompletionError {
163 : #[error("OnDiskValueIo was dropped without completing, likely the sidecar task panicked")]
164 : IoDropped,
165 : }
166 :
167 : impl OnDiskValueIoWaiter {
168 1753090 : pub(crate) async fn wait_completion(self) -> Result<OnDiskValueIoResult, WaitCompletionError> {
169 : // NB: for Unnecessary IOs, this method never gets called because we don't add them to `on_disk_values`.
170 1753090 : self.rx.await.map_err(|_| WaitCompletionError::IoDropped)
171 1753090 : }
172 : }
173 :
174 : impl VectoredValueReconstructState {
175 : /// # Cancel-Safety
176 : ///
177 : /// Technically fine to stop polling this future, but, the IOs will still
178 : /// be executed to completion by the sidecar task and hold on to / consume resources.
179 : /// Better not do it to make reasonsing about the system easier.
180 363570 : pub(crate) async fn collect_pending_ios(
181 363570 : self,
182 363570 : ) -> Result<ValueReconstructState, PageReconstructError> {
183 : use utils::bin_ser::BeSer;
184 :
185 363570 : let mut res = Ok(ValueReconstructState::default());
186 :
187 : // We should try hard not to bail early, so that by the time we return from this
188 : // function, all IO for this value is done. It's not required -- we could totally
189 : // stop polling the IO futures in the sidecar task, they need to support that,
190 : // but just stopping to poll doesn't reduce the IO load on the disk. It's easier
191 : // to reason about the system if we just wait for all IO to complete, even if
192 : // we're no longer interested in the result.
193 : //
194 : // Revisit this when IO futures are replaced with a more sophisticated IO system
195 : // and an IO scheduler, where we know which IOs were submitted and which ones
196 : // just queued. Cf the comment on IoConcurrency::spawn_io.
197 2116660 : for (lsn, waiter) in self.on_disk_values {
198 1753090 : let value_recv_res = waiter
199 1753090 : .wait_completion()
200 1753090 : // we rely on the caller to poll us to completion, so this is not a bail point
201 1753090 : .await;
202 : // Force not bailing early by wrapping the code into a closure.
203 : #[allow(clippy::redundant_closure_call)]
204 1753090 : let _: () = (|| {
205 1753090 : match (&mut res, value_recv_res) {
206 0 : (Err(_), _) => {
207 0 : // We've already failed, no need to process more.
208 0 : }
209 0 : (Ok(_), Err(wait_err)) => {
210 0 : // This shouldn't happen - likely the sidecar task panicked.
211 0 : res = Err(PageReconstructError::Other(wait_err.into()));
212 0 : }
213 0 : (Ok(_), Ok(Err(err))) => {
214 0 : let err: std::io::Error = err;
215 0 : // TODO: returning IO error here will fail a compute query.
216 0 : // Probably not what we want, we're not doing `maybe_fatal_err`
217 0 : // in the IO futures.
218 0 : // But it's been like that for a long time, not changing it
219 0 : // as part of concurrent IO.
220 0 : // => https://github.com/neondatabase/neon/issues/10454
221 0 : res = Err(PageReconstructError::Other(err.into()));
222 0 : }
223 23857 : (Ok(ok), Ok(Ok(OnDiskValue::RawImage(img)))) => {
224 23857 : assert!(ok.img.is_none());
225 23857 : ok.img = Some((lsn, img));
226 : }
227 1729233 : (Ok(ok), Ok(Ok(OnDiskValue::WalRecordOrImage(buf)))) => {
228 1729233 : match Value::des(&buf) {
229 1403254 : Ok(Value::WalRecord(rec)) => {
230 1403254 : ok.records.push((lsn, rec));
231 1403254 : }
232 325979 : Ok(Value::Image(img)) => {
233 325979 : assert!(ok.img.is_none());
234 325979 : ok.img = Some((lsn, img));
235 : }
236 0 : Err(err) => {
237 0 : res = Err(PageReconstructError::Other(err.into()));
238 0 : }
239 : }
240 : }
241 : }
242 : })();
243 : }
244 :
245 363570 : res
246 363570 : }
247 :
248 : /// Benchmarking utility to await for the completion of all pending ios
249 : ///
250 : /// # Cancel-Safety
251 : ///
252 : /// Technically fine to stop polling this future, but, the IOs will still
253 : /// be executed to completion by the sidecar task and hold on to / consume resources.
254 : /// Better not do it to make reasonsing about the system easier.
255 : #[cfg(feature = "benchmarking")]
256 : pub async fn sink_pending_ios(self) -> Result<(), std::io::Error> {
257 : let mut res = Ok(());
258 :
259 : // We should try hard not to bail early, so that by the time we return from this
260 : // function, all IO for this value is done. It's not required -- we could totally
261 : // stop polling the IO futures in the sidecar task, they need to support that,
262 : // but just stopping to poll doesn't reduce the IO load on the disk. It's easier
263 : // to reason about the system if we just wait for all IO to complete, even if
264 : // we're no longer interested in the result.
265 : //
266 : // Revisit this when IO futures are replaced with a more sophisticated IO system
267 : // and an IO scheduler, where we know which IOs were submitted and which ones
268 : // just queued. Cf the comment on IoConcurrency::spawn_io.
269 : for (_lsn, waiter) in self.on_disk_values {
270 : let value_recv_res = waiter
271 : .wait_completion()
272 : // we rely on the caller to poll us to completion, so this is not a bail point
273 : .await;
274 :
275 : match (&mut res, value_recv_res) {
276 : (Err(_), _) => {
277 : // We've already failed, no need to process more.
278 : }
279 : (Ok(_), Err(_wait_err)) => {
280 : // This shouldn't happen - likely the sidecar task panicked.
281 : unreachable!();
282 : }
283 : (Ok(_), Ok(Err(err))) => {
284 : let err: std::io::Error = err;
285 : res = Err(err);
286 : }
287 : (Ok(_ok), Ok(Ok(OnDiskValue::RawImage(_img)))) => {}
288 : (Ok(_ok), Ok(Ok(OnDiskValue::WalRecordOrImage(_buf)))) => {}
289 : }
290 : }
291 :
292 : res
293 : }
294 : }
295 :
296 : /// Bag of data accumulated during a vectored get..
297 : pub struct ValuesReconstructState {
298 : /// The keys will be removed after `get_vectored` completes. The caller outside `Timeline`
299 : /// should not expect to get anything from this hashmap.
300 : pub keys: HashMap<Key, VectoredValueReconstructState>,
301 : /// The keys which are already retrieved
302 : keys_done: KeySpaceRandomAccum,
303 :
304 : /// The keys covered by the image layers
305 : keys_with_image_coverage: Option<Range<Key>>,
306 :
307 : // Statistics that are still accessible as a caller of `get_vectored_impl`.
308 : layers_visited: u32,
309 : delta_layers_visited: u32,
310 :
311 : pub(crate) enable_debug: bool,
312 : pub(crate) debug_state: ValueReconstructState,
313 :
314 : pub(crate) io_concurrency: IoConcurrency,
315 : num_active_ios: Arc<AtomicUsize>,
316 :
317 : pub(crate) read_path: Option<ReadPath>,
318 : }
319 :
320 : /// The level of IO concurrency to be used on the read path
321 : ///
322 : /// The desired end state is that we always do parallel IO.
323 : /// This struct and the dispatching in the impl will be removed once
324 : /// we've built enough confidence.
325 : pub enum IoConcurrency {
326 : Sequential,
327 : SidecarTask {
328 : task_id: usize,
329 : ios_tx: tokio::sync::mpsc::UnboundedSender<IoFuture>,
330 : },
331 : }
332 :
333 : type IoFuture = Pin<Box<dyn Send + Future<Output = ()>>>;
334 :
335 : pub(crate) enum SelectedIoConcurrency {
336 : Sequential,
337 : SidecarTask(GateGuard),
338 : }
339 :
340 : impl std::fmt::Debug for IoConcurrency {
341 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
342 0 : match self {
343 0 : IoConcurrency::Sequential => write!(f, "Sequential"),
344 0 : IoConcurrency::SidecarTask { .. } => write!(f, "SidecarTask"),
345 : }
346 0 : }
347 : }
348 :
349 : impl std::fmt::Debug for SelectedIoConcurrency {
350 17 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
351 17 : match self {
352 0 : SelectedIoConcurrency::Sequential => write!(f, "Sequential"),
353 17 : SelectedIoConcurrency::SidecarTask(_) => write!(f, "SidecarTask"),
354 : }
355 17 : }
356 : }
357 :
358 : impl IoConcurrency {
359 : /// Force sequential IO. This is a temporary workaround until we have
360 : /// moved plumbing-through-the-call-stack
361 : /// of IoConcurrency into `RequestContextq.
362 : ///
363 : /// DO NOT USE for new code.
364 : ///
365 : /// Tracking issue: <https://github.com/neondatabase/neon/issues/10460>.
366 301278 : pub(crate) fn sequential() -> Self {
367 301278 : Self::spawn(SelectedIoConcurrency::Sequential)
368 301278 : }
369 :
370 321 : pub fn spawn_from_conf(conf: GetVectoredConcurrentIo, gate_guard: GateGuard) -> IoConcurrency {
371 321 : let selected = match conf {
372 0 : GetVectoredConcurrentIo::Sequential => SelectedIoConcurrency::Sequential,
373 321 : GetVectoredConcurrentIo::SidecarTask => SelectedIoConcurrency::SidecarTask(gate_guard),
374 : };
375 321 : Self::spawn(selected)
376 321 : }
377 :
378 301616 : pub(crate) fn spawn(io_concurrency: SelectedIoConcurrency) -> Self {
379 301616 : match io_concurrency {
380 301278 : SelectedIoConcurrency::Sequential => IoConcurrency::Sequential,
381 338 : SelectedIoConcurrency::SidecarTask(gate_guard) => {
382 338 : let (ios_tx, ios_rx) = tokio::sync::mpsc::unbounded_channel();
383 : static TASK_ID: AtomicUsize = AtomicUsize::new(0);
384 338 : let task_id = TASK_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
385 : // TODO: enrich the span with more context (tenant,shard,timeline) + (basebackup|pagestream|...)
386 338 : let span =
387 338 : tracing::info_span!(parent: None, "IoConcurrency_sidecar", task_id = task_id);
388 338 : trace!(task_id, "spawning sidecar task");
389 338 : tokio::spawn(async move {
390 337 : trace!("start");
391 337 : scopeguard::defer!{ trace!("end") };
392 : type IosRx = tokio::sync::mpsc::UnboundedReceiver<IoFuture>;
393 : enum State {
394 : Waiting {
395 : // invariant: is_empty(), but we recycle the allocation
396 : empty_futures: FuturesUnordered<IoFuture>,
397 : ios_rx: IosRx,
398 : },
399 : Executing {
400 : futures: FuturesUnordered<IoFuture>,
401 : ios_rx: IosRx,
402 : },
403 : ShuttingDown {
404 : futures: FuturesUnordered<IoFuture>,
405 : },
406 : }
407 337 : let mut state = State::Waiting {
408 337 : empty_futures: FuturesUnordered::new(),
409 337 : ios_rx,
410 337 : };
411 : loop {
412 22989 : match state {
413 : State::Waiting {
414 10405 : empty_futures,
415 10405 : mut ios_rx,
416 : } => {
417 10405 : assert!(empty_futures.is_empty());
418 10405 : tokio::select! {
419 10405 : fut = ios_rx.recv() => {
420 10386 : if let Some(fut) = fut {
421 10146 : trace!("received new io future");
422 10146 : empty_futures.push(fut);
423 10146 : state = State::Executing { futures: empty_futures, ios_rx };
424 : } else {
425 240 : state = State::ShuttingDown { futures: empty_futures }
426 : }
427 : }
428 : }
429 : }
430 : State::Executing {
431 12266 : mut futures,
432 12266 : mut ios_rx,
433 : } => {
434 12266 : tokio::select! {
435 12266 : res = futures.next() => {
436 11128 : trace!("io future completed");
437 11128 : assert!(res.is_some());
438 11128 : if futures.is_empty() {
439 10068 : state = State::Waiting { empty_futures: futures, ios_rx};
440 10068 : } else {
441 1060 : state = State::Executing { futures, ios_rx };
442 1060 : }
443 : }
444 12266 : fut = ios_rx.recv() => {
445 1138 : if let Some(fut) = fut {
446 1060 : trace!("received new io future");
447 1060 : futures.push(fut);
448 1060 : state = State::Executing { futures, ios_rx};
449 78 : } else {
450 78 : state = State::ShuttingDown { futures };
451 78 : }
452 : }
453 : }
454 : }
455 : State::ShuttingDown {
456 318 : mut futures,
457 : } => {
458 318 : trace!("shutting down");
459 396 : while let Some(()) = futures.next().await {
460 78 : trace!("io future completed (shutdown)");
461 : // drain
462 : }
463 318 : trace!("shutdown complete");
464 318 : break;
465 : }
466 : }
467 : }
468 318 : drop(gate_guard); // drop it right before we exit
469 338 : }.instrument(span));
470 338 : IoConcurrency::SidecarTask { task_id, ios_tx }
471 : }
472 : }
473 301616 : }
474 :
475 : /// Submit an IO to be executed in the background. DEADLOCK RISK, read the full doc string.
476 : ///
477 : /// The IO is represented as an opaque future.
478 : /// IO completion must be handled inside the future, e.g., through a oneshot channel.
479 : ///
480 : /// The API seems simple but there are multiple **pitfalls** involving
481 : /// DEADLOCK RISK.
482 : ///
483 : /// First, there are no guarantees about the exexecution of the IO.
484 : /// It may be `await`ed in-place before this function returns.
485 : /// It may be polled partially by this task and handed off to another task to be finished.
486 : /// It may be polled and then dropped before returning ready.
487 : ///
488 : /// This means that submitted IOs must not be interedependent.
489 : /// Interdependence may be through shared limited resources, e.g.,
490 : /// - VirtualFile file descriptor cache slot acquisition
491 : /// - tokio-epoll-uring slot
492 : ///
493 : /// # Why current usage is safe from deadlocks
494 : ///
495 : /// Textbook condition for a deadlock is that _all_ of the following be given
496 : /// - Mutual exclusion
497 : /// - Hold and wait
498 : /// - No preemption
499 : /// - Circular wait
500 : ///
501 : /// The current usage is safe because:
502 : /// - Mutual exclusion: IO futures definitely use mutexes, no way around that for now
503 : /// - Hold and wait: IO futures currently hold two kinds of locks/resources while waiting
504 : /// for acquisition of other resources:
505 : /// - VirtualFile file descriptor cache slot tokio mutex
506 : /// - tokio-epoll-uring slot (uses tokio notify => wait queue, much like mutex)
507 : /// - No preemption: there's no taking-away of acquired locks/resources => given
508 : /// - Circular wait: this is the part of the condition that isn't met: all IO futures
509 : /// first acquire VirtualFile mutex, then tokio-epoll-uring slot.
510 : /// There is no IO future that acquires slot before VirtualFile.
511 : /// Hence there can be no circular waiting.
512 : /// Hence there cannot be a deadlock.
513 : ///
514 : /// This is a very fragile situation and must be revisited whenver any code called from
515 : /// inside the IO futures is changed.
516 : ///
517 : /// We will move away from opaque IO futures towards well-defined IOs at some point in
518 : /// the future when we have shipped this first version of concurrent IO to production
519 : /// and are ready to retire the Sequential mode which runs the futures in place.
520 : /// Right now, while brittle, the opaque IO approach allows us to ship the feature
521 : /// with minimal changes to the code and minimal changes to existing behavior in Sequential mode.
522 : ///
523 : /// Also read the comment in `collect_pending_ios`.
524 412408 : pub(crate) async fn spawn_io<F>(&mut self, fut: F)
525 412408 : where
526 412408 : F: std::future::Future<Output = ()> + Send + 'static,
527 412408 : {
528 412408 : match self {
529 401202 : IoConcurrency::Sequential => fut.await,
530 11206 : IoConcurrency::SidecarTask { ios_tx, .. } => {
531 11206 : let fut = Box::pin(fut);
532 : // NB: experiments showed that doing an opportunistic poll of `fut` here was bad for throughput
533 : // while insignificant for latency.
534 : // It would make sense to revisit the tokio-epoll-uring API in the future such that we can try
535 : // a submission here, but never poll the future. That way, io_uring can make proccess while
536 : // the future sits in the ios_tx queue.
537 11206 : match ios_tx.send(fut) {
538 11206 : Ok(()) => {}
539 : Err(_) => {
540 0 : unreachable!("the io task must have exited, likely it panicked")
541 : }
542 : }
543 : }
544 : }
545 412408 : }
546 :
547 : #[cfg(test)]
548 17 : pub(crate) fn spawn_for_test() -> impl std::ops::DerefMut<Target = Self> {
549 : use std::ops::{Deref, DerefMut};
550 :
551 : use tracing::info;
552 : use utils::sync::gate::Gate;
553 :
554 : // Spawn needs a Gate, give it one.
555 : struct Wrapper {
556 : inner: IoConcurrency,
557 : #[allow(dead_code)]
558 : gate: Box<Gate>,
559 : }
560 : impl Deref for Wrapper {
561 : type Target = IoConcurrency;
562 :
563 9246 : fn deref(&self) -> &Self::Target {
564 9246 : &self.inner
565 9246 : }
566 : }
567 : impl DerefMut for Wrapper {
568 0 : fn deref_mut(&mut self) -> &mut Self::Target {
569 0 : &mut self.inner
570 0 : }
571 : }
572 17 : let gate = Box::new(Gate::default());
573 :
574 : // The default behavior when running Rust unit tests without any further
575 : // flags is to use the new behavior.
576 : // The CI uses the following environment variable to unit test both old
577 : // and new behavior.
578 : // NB: the Python regression & perf tests take the `else` branch
579 : // below and have their own defaults management.
580 17 : let selected = {
581 : // The pageserver_api::config type is unsuitable because it's internally tagged.
582 0 : #[derive(serde::Deserialize)]
583 : #[serde(rename_all = "kebab-case")]
584 : enum TestOverride {
585 : Sequential,
586 : SidecarTask,
587 : }
588 : use once_cell::sync::Lazy;
589 17 : static TEST_OVERRIDE: Lazy<TestOverride> = Lazy::new(|| {
590 17 : utils::env::var_serde_json_string(
591 17 : "NEON_PAGESERVER_UNIT_TEST_GET_VECTORED_CONCURRENT_IO",
592 : )
593 17 : .unwrap_or(TestOverride::SidecarTask)
594 17 : });
595 :
596 17 : match *TEST_OVERRIDE {
597 0 : TestOverride::Sequential => SelectedIoConcurrency::Sequential,
598 : TestOverride::SidecarTask => {
599 17 : SelectedIoConcurrency::SidecarTask(gate.enter().expect("just created it"))
600 : }
601 : }
602 : };
603 :
604 17 : info!(?selected, "get_vectored_concurrent_io test");
605 :
606 17 : Wrapper {
607 17 : inner: Self::spawn(selected),
608 17 : gate,
609 17 : }
610 17 : }
611 : }
612 :
613 : impl Clone for IoConcurrency {
614 19141 : fn clone(&self) -> Self {
615 19141 : match self {
616 0 : IoConcurrency::Sequential => IoConcurrency::Sequential,
617 19141 : IoConcurrency::SidecarTask { task_id, ios_tx } => IoConcurrency::SidecarTask {
618 19141 : task_id: *task_id,
619 19141 : ios_tx: ios_tx.clone(),
620 19141 : },
621 : }
622 19141 : }
623 : }
624 :
625 : /// Make noise in case the [`ValuesReconstructState`] gets dropped while
626 : /// there are still IOs in flight.
627 : /// Refer to `collect_pending_ios` for why we prefer not to do that.
628 : //
629 : /// We log from here instead of from the sidecar task because the [`ValuesReconstructState`]
630 : /// gets dropped in a tracing span with more context.
631 : /// We repeat the sidecar tasks's `task_id` so we can correlate what we emit here with
632 : /// the logs / panic handler logs from the sidecar task, which also logs the `task_id`.
633 : impl Drop for ValuesReconstructState {
634 312421 : fn drop(&mut self) {
635 312421 : let num_active_ios = self
636 312421 : .num_active_ios
637 312421 : .load(std::sync::atomic::Ordering::Acquire);
638 312421 : if num_active_ios == 0 {
639 312420 : return;
640 1 : }
641 1 : let sidecar_task_id = match &self.io_concurrency {
642 0 : IoConcurrency::Sequential => None,
643 1 : IoConcurrency::SidecarTask { task_id, .. } => Some(*task_id),
644 : };
645 1 : tracing::warn!(
646 : num_active_ios,
647 : ?sidecar_task_id,
648 0 : backtrace=%std::backtrace::Backtrace::force_capture(),
649 0 : "dropping ValuesReconstructState while some IOs have not been completed",
650 : );
651 312421 : }
652 : }
653 :
654 : impl ValuesReconstructState {
655 312421 : pub fn new(io_concurrency: IoConcurrency) -> Self {
656 312421 : Self {
657 312421 : keys: HashMap::new(),
658 312421 : keys_done: KeySpaceRandomAccum::new(),
659 312421 : keys_with_image_coverage: None,
660 312421 : layers_visited: 0,
661 312421 : delta_layers_visited: 0,
662 312421 : io_concurrency,
663 312421 : enable_debug: false,
664 312421 : debug_state: ValueReconstructState::default(),
665 312421 : num_active_ios: Arc::new(AtomicUsize::new(0)),
666 312421 : read_path: None,
667 312421 : }
668 312421 : }
669 :
670 0 : pub(crate) fn new_with_debug(io_concurrency: IoConcurrency) -> Self {
671 0 : Self {
672 0 : keys: HashMap::new(),
673 0 : keys_done: KeySpaceRandomAccum::new(),
674 0 : keys_with_image_coverage: None,
675 0 : layers_visited: 0,
676 0 : delta_layers_visited: 0,
677 0 : io_concurrency,
678 0 : enable_debug: true,
679 0 : debug_state: ValueReconstructState::default(),
680 0 : num_active_ios: Arc::new(AtomicUsize::new(0)),
681 0 : read_path: None,
682 0 : }
683 0 : }
684 :
685 : /// Absolutely read [`IoConcurrency::spawn_io`] to learn about assumptions & pitfalls.
686 412408 : pub(crate) async fn spawn_io<F>(&mut self, fut: F)
687 412408 : where
688 412408 : F: std::future::Future<Output = ()> + Send + 'static,
689 412408 : {
690 412408 : self.io_concurrency.spawn_io(fut).await;
691 412408 : }
692 :
693 0 : pub(crate) fn set_debug_state(&mut self, debug_state: &ValueReconstructState) {
694 0 : if self.enable_debug {
695 0 : self.debug_state = debug_state.clone();
696 0 : }
697 0 : }
698 :
699 446162 : pub(crate) fn on_layer_visited(&mut self, layer: &ReadableLayer) {
700 446162 : self.layers_visited += 1;
701 446162 : if let ReadableLayer::PersistentLayer(layer) = layer {
702 138845 : if layer.layer_desc().is_delta() {
703 124723 : self.delta_layers_visited += 1;
704 124723 : }
705 307317 : }
706 446162 : }
707 :
708 173 : pub(crate) fn get_delta_layers_visited(&self) -> u32 {
709 173 : self.delta_layers_visited
710 173 : }
711 :
712 312406 : pub(crate) fn get_layers_visited(&self) -> u32 {
713 312406 : self.layers_visited
714 312406 : }
715 :
716 : /// On hitting image layer, we can mark all keys in this range as done, because
717 : /// if the image layer does not contain a key, it is deleted/never added.
718 14128 : pub(crate) fn on_image_layer_visited(&mut self, key_range: &Range<Key>) {
719 14128 : let prev_val = self.keys_with_image_coverage.replace(key_range.clone());
720 14128 : assert_eq!(
721 : prev_val, None,
722 0 : "should consume the keyspace before the next iteration"
723 : );
724 14128 : }
725 :
726 : /// Update the state collected for a given key.
727 : /// Returns true if this was the last value needed for the key and false otherwise.
728 : ///
729 : /// If the key is done after the update, mark it as such.
730 : ///
731 : /// If the key is in the sparse keyspace (i.e., aux files), we do not track them in
732 : /// `key_done`.
733 : // TODO: rename this method & update description.
734 1797835 : pub(crate) fn update_key(&mut self, key: &Key, lsn: Lsn, completes: bool) -> OnDiskValueIo {
735 1797835 : let state = self.keys.entry(*key).or_default();
736 :
737 1797835 : let is_sparse_key = key.is_sparse();
738 :
739 1797835 : let required_io = match state.situation {
740 : ValueReconstructSituation::Complete => {
741 44744 : if is_sparse_key {
742 : // Sparse keyspace might be visited multiple times because
743 : // we don't track unmapped keyspaces.
744 44744 : return OnDiskValueIo::Unnecessary;
745 : } else {
746 0 : unreachable!()
747 : }
748 : }
749 : ValueReconstructSituation::Continue => {
750 1753091 : self.num_active_ios
751 1753091 : .fetch_add(1, std::sync::atomic::Ordering::Release);
752 1753091 : let (tx, rx) = tokio::sync::oneshot::channel();
753 1753091 : state.on_disk_values.push((lsn, OnDiskValueIoWaiter { rx }));
754 1753091 : OnDiskValueIo::Required {
755 1753091 : tx,
756 1753091 : num_active_ios: Arc::clone(&self.num_active_ios),
757 1753091 : }
758 : }
759 : };
760 :
761 1753091 : if completes && state.situation == ValueReconstructSituation::Continue {
762 363571 : state.situation = ValueReconstructSituation::Complete;
763 363571 : if !is_sparse_key {
764 331685 : self.keys_done.add_key(*key);
765 331685 : }
766 1389520 : }
767 :
768 1753091 : required_io
769 1797835 : }
770 :
771 : /// Returns the key space describing the keys that have
772 : /// been marked as completed since the last call to this function.
773 : /// Returns individual keys done, and the image layer coverage.
774 446162 : pub(crate) fn consume_done_keys(&mut self) -> (KeySpace, Option<Range<Key>>) {
775 446162 : (
776 446162 : self.keys_done.consume_keyspace(),
777 446162 : self.keys_with_image_coverage.take(),
778 446162 : )
779 446162 : }
780 : }
781 :
782 : /// A key that uniquely identifies a layer in a timeline
783 : #[derive(Debug, PartialEq, Eq, Clone, Hash)]
784 : pub(crate) enum LayerId {
785 : PersitentLayerId(PersistentLayerKey),
786 : InMemoryLayerId(InMemoryLayerFileId),
787 : }
788 :
789 : /// Uniquely identify a layer visit by the layer
790 : /// and LSN range of the reads. Note that the end of the range is exclusive.
791 : ///
792 : /// The layer itself is not enough since we may have different LSN lower
793 : /// bounds for delta layer reads. Scenarios where this can happen are:
794 : ///
795 : /// 1. Layer overlaps: imagine an image layer inside and in-memory layer
796 : /// and a query that only partially hits the image layer. Part of the query
797 : /// needs to read the whole in-memory layer and the other part needs to read
798 : /// only up to the image layer. Hence, they'll have different LSN floor values
799 : /// for the read.
800 : ///
801 : /// 2. Scattered reads: the read path supports starting at different LSNs. Imagine
802 : /// The start LSN for one range is inside a layer and the start LSN for another range
803 : /// Is above the layer (includes all of it). Both ranges need to read the layer all the
804 : /// Way to the end but starting at different points. Hence, they'll have different LSN
805 : /// Ceil values.
806 : ///
807 : /// The implication is that we might visit the same layer multiple times
808 : /// in order to read different LSN ranges from it. In practice, this isn't very concerning
809 : /// because:
810 : /// 1. Layer overlaps are rare and generally not intended
811 : /// 2. Scattered reads will stabilise after the first few layers provided their starting LSNs
812 : /// are grouped tightly enough (likely the case).
813 : #[derive(Debug, PartialEq, Eq, Clone, Hash)]
814 : struct LayerToVisitId {
815 : layer_id: LayerId,
816 : lsn_floor: Lsn,
817 : lsn_ceil: Lsn,
818 : }
819 :
820 : #[derive(Debug, PartialEq, Eq, Hash)]
821 : pub enum ReadableLayerWeak {
822 : PersistentLayer(Arc<PersistentLayerDesc>),
823 : InMemoryLayer(InMemoryLayerDesc),
824 : }
825 :
826 : /// Layer wrapper for the read path. Note that it is valid
827 : /// to use these layers even after external operations have
828 : /// been performed on them (compaction, freeze, etc.).
829 : #[derive(Debug)]
830 : pub(crate) enum ReadableLayer {
831 : PersistentLayer(Layer),
832 : InMemoryLayer(Arc<InMemoryLayer>),
833 : }
834 :
835 : /// A partial description of a read to be done.
836 : #[derive(Debug, Clone)]
837 : struct LayerVisit {
838 : /// An id used to resolve the readable layer within the fringe
839 : layer_to_visit_id: LayerToVisitId,
840 : /// Lsn range for the read, used for selecting the next read
841 : lsn_range: Range<Lsn>,
842 : }
843 :
844 : /// Data structure which maintains a fringe of layers for the
845 : /// read path. The fringe is the set of layers which intersects
846 : /// the current keyspace that the search is descending on.
847 : /// Each layer tracks the keyspace that intersects it.
848 : ///
849 : /// The fringe must appear sorted by Lsn. Hence, it uses
850 : /// a two layer indexing scheme.
851 : #[derive(Debug)]
852 : pub(crate) struct LayerFringe {
853 : planned_visits_by_lsn: BinaryHeap<LayerVisit>,
854 : visit_reads: HashMap<LayerToVisitId, LayerVisitReads>,
855 : }
856 :
857 : #[derive(Debug)]
858 : struct LayerVisitReads {
859 : layer: ReadableLayer,
860 : target_keyspace: KeySpaceRandomAccum,
861 : }
862 :
863 : impl LayerFringe {
864 425394 : pub(crate) fn new() -> Self {
865 425394 : LayerFringe {
866 425394 : planned_visits_by_lsn: BinaryHeap::new(),
867 425394 : visit_reads: HashMap::new(),
868 425394 : }
869 425394 : }
870 :
871 871556 : pub(crate) fn next_layer(&mut self) -> Option<(ReadableLayer, KeySpace, Range<Lsn>)> {
872 871556 : let read_desc = self.planned_visits_by_lsn.pop()?;
873 :
874 446162 : let removed = self.visit_reads.remove_entry(&read_desc.layer_to_visit_id);
875 :
876 446162 : match removed {
877 : Some((
878 : _,
879 : LayerVisitReads {
880 446162 : layer,
881 446162 : mut target_keyspace,
882 : },
883 446162 : )) => Some((
884 446162 : layer,
885 446162 : target_keyspace.consume_keyspace(),
886 446162 : read_desc.lsn_range,
887 446162 : )),
888 0 : None => unreachable!("fringe internals are always consistent"),
889 : }
890 871556 : }
891 :
892 460553 : pub(crate) fn update(
893 460553 : &mut self,
894 460553 : layer: ReadableLayer,
895 460553 : keyspace: KeySpace,
896 460553 : lsn_range: Range<Lsn>,
897 460553 : ) {
898 460553 : let layer_to_visit_id = LayerToVisitId {
899 460553 : layer_id: layer.id(),
900 460553 : lsn_floor: lsn_range.start,
901 460553 : lsn_ceil: lsn_range.end,
902 460553 : };
903 :
904 460553 : let entry = self.visit_reads.entry(layer_to_visit_id.clone());
905 460553 : match entry {
906 14391 : Entry::Occupied(mut entry) => {
907 14391 : entry.get_mut().target_keyspace.add_keyspace(keyspace);
908 14391 : }
909 446162 : Entry::Vacant(entry) => {
910 446162 : self.planned_visits_by_lsn.push(LayerVisit {
911 446162 : lsn_range,
912 446162 : layer_to_visit_id: layer_to_visit_id.clone(),
913 446162 : });
914 446162 : let mut accum = KeySpaceRandomAccum::new();
915 446162 : accum.add_keyspace(keyspace);
916 446162 : entry.insert(LayerVisitReads {
917 446162 : layer,
918 446162 : target_keyspace: accum,
919 446162 : });
920 446162 : }
921 : }
922 460553 : }
923 : }
924 :
925 : impl Default for LayerFringe {
926 0 : fn default() -> Self {
927 0 : Self::new()
928 0 : }
929 : }
930 :
931 : impl Ord for LayerVisit {
932 89326 : fn cmp(&self, other: &Self) -> Ordering {
933 89326 : let ord = self.lsn_range.end.cmp(&other.lsn_range.end);
934 89326 : if ord == std::cmp::Ordering::Equal {
935 7257 : self.lsn_range.start.cmp(&other.lsn_range.start).reverse()
936 : } else {
937 82069 : ord
938 : }
939 89326 : }
940 : }
941 :
942 : impl PartialOrd for LayerVisit {
943 89326 : fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
944 89326 : Some(self.cmp(other))
945 89326 : }
946 : }
947 :
948 : impl PartialEq for LayerVisit {
949 0 : fn eq(&self, other: &Self) -> bool {
950 0 : self.lsn_range == other.lsn_range
951 0 : }
952 : }
953 :
954 : impl Eq for LayerVisit {}
955 :
956 : impl ReadableLayer {
957 460553 : pub(crate) fn id(&self) -> LayerId {
958 460553 : match self {
959 149392 : Self::PersistentLayer(layer) => LayerId::PersitentLayerId(layer.layer_desc().key()),
960 311161 : Self::InMemoryLayer(layer) => LayerId::InMemoryLayerId(layer.file_id()),
961 : }
962 460553 : }
963 :
964 446162 : pub(crate) async fn get_values_reconstruct_data(
965 446162 : &self,
966 446162 : keyspace: KeySpace,
967 446162 : lsn_range: Range<Lsn>,
968 446162 : reconstruct_state: &mut ValuesReconstructState,
969 446162 : ctx: &RequestContext,
970 446162 : ) -> Result<(), GetVectoredError> {
971 446162 : match self {
972 138845 : ReadableLayer::PersistentLayer(layer) => {
973 138845 : let ctx = RequestContextBuilder::from(ctx)
974 138845 : .perf_span(|crnt_perf_span| {
975 0 : info_span!(
976 : target: PERF_TRACE_TARGET,
977 0 : parent: crnt_perf_span,
978 : "PLAN_LAYER",
979 : layer = %layer
980 : )
981 0 : })
982 138845 : .attached_child();
983 :
984 138845 : layer
985 138845 : .get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, &ctx)
986 138845 : .maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
987 138845 : .await
988 : }
989 307317 : ReadableLayer::InMemoryLayer(layer) => {
990 307317 : let ctx = RequestContextBuilder::from(ctx)
991 307317 : .perf_span(|crnt_perf_span| {
992 0 : info_span!(
993 : target: PERF_TRACE_TARGET,
994 0 : parent: crnt_perf_span,
995 : "PLAN_LAYER",
996 : layer = %layer
997 : )
998 0 : })
999 307317 : .attached_child();
1000 :
1001 307317 : layer
1002 307317 : .get_values_reconstruct_data(keyspace, lsn_range, reconstruct_state, &ctx)
1003 307317 : .maybe_perf_instrument(&ctx, |crnt_perf_span| crnt_perf_span.clone())
1004 307317 : .await
1005 : }
1006 : }
1007 446162 : }
1008 : }
1009 :
1010 : /// Layers contain a hint indicating whether they are likely to be used for reads.
1011 : ///
1012 : /// This is a hint rather than an authoritative value, so that we do not have to update it synchronously
1013 : /// when changing the visibility of layers (for example when creating a branch that makes some previously
1014 : /// covered layers visible). It should be used for cache management but not for correctness-critical checks.
1015 : #[derive(Debug, Clone, PartialEq, Eq)]
1016 : pub enum LayerVisibilityHint {
1017 : /// A Visible layer might be read while serving a read, because there is not an image layer between it
1018 : /// and a readable LSN (the tip of the branch or a child's branch point)
1019 : Visible,
1020 : /// A Covered layer probably won't be read right now, but _can_ be read in future if someone creates
1021 : /// a branch or ephemeral endpoint at an LSN below the layer that covers this.
1022 : Covered,
1023 : }
1024 :
1025 : pub(crate) struct LayerAccessStats(std::sync::atomic::AtomicU64);
1026 :
1027 : #[derive(Clone, Copy, strum_macros::EnumString)]
1028 : pub(crate) enum LayerAccessStatsReset {
1029 : NoReset,
1030 : AllStats,
1031 : }
1032 :
1033 : impl Default for LayerAccessStats {
1034 991 : fn default() -> Self {
1035 : // Default value is to assume resident since creation time, and visible.
1036 991 : let (_mask, mut value) = Self::to_low_res_timestamp(Self::RTIME_SHIFT, SystemTime::now());
1037 991 : value |= 0x1 << Self::VISIBILITY_SHIFT;
1038 :
1039 991 : Self(std::sync::atomic::AtomicU64::new(value))
1040 991 : }
1041 : }
1042 :
1043 : // Efficient store of two very-low-resolution timestamps and some bits. Used for storing last access time and
1044 : // last residence change time.
1045 : impl LayerAccessStats {
1046 : // How many high bits to drop from a u32 timestamp?
1047 : // - Only storing up to a u32 timestamp will work fine until 2038 (if this code is still in use
1048 : // after that, this software has been very successful!)
1049 : // - Dropping the top bit is implicitly safe because unix timestamps are meant to be
1050 : // stored in an i32, so they never used it.
1051 : // - Dropping the next two bits is safe because this code is only running on systems in
1052 : // years >= 2024, and these bits have been 1 since 2021
1053 : //
1054 : // Therefore we may store only 28 bits for a timestamp with one second resolution. We do
1055 : // this truncation to make space for some flags in the high bits of our u64.
1056 : const TS_DROP_HIGH_BITS: u32 = u32::count_ones(Self::TS_ONES) + 1;
1057 : const TS_MASK: u32 = 0x1f_ff_ff_ff;
1058 : const TS_ONES: u32 = 0x60_00_00_00;
1059 :
1060 : const ATIME_SHIFT: u32 = 0;
1061 : const RTIME_SHIFT: u32 = 32 - Self::TS_DROP_HIGH_BITS;
1062 : const VISIBILITY_SHIFT: u32 = 64 - 2 * Self::TS_DROP_HIGH_BITS;
1063 :
1064 137680 : fn write_bits(&self, mask: u64, value: u64) -> u64 {
1065 137680 : self.0
1066 137680 : .fetch_update(
1067 : // TODO: decide what orderings are correct
1068 137680 : std::sync::atomic::Ordering::Relaxed,
1069 137680 : std::sync::atomic::Ordering::Relaxed,
1070 137680 : |v| Some((v & !mask) | (value & mask)),
1071 : )
1072 137680 : .expect("Inner function is infallible")
1073 137680 : }
1074 :
1075 138474 : fn to_low_res_timestamp(shift: u32, time: SystemTime) -> (u64, u64) {
1076 : // Drop the low three bits of the timestamp, for an ~8s accuracy
1077 138474 : let timestamp = time.duration_since(UNIX_EPOCH).unwrap().as_secs() & (Self::TS_MASK as u64);
1078 :
1079 138474 : ((Self::TS_MASK as u64) << shift, timestamp << shift)
1080 138474 : }
1081 :
1082 73 : fn read_low_res_timestamp(&self, shift: u32) -> Option<SystemTime> {
1083 73 : let read = self.0.load(std::sync::atomic::Ordering::Relaxed);
1084 :
1085 73 : let ts_bits = (read & ((Self::TS_MASK as u64) << shift)) >> shift;
1086 73 : if ts_bits == 0 {
1087 33 : None
1088 : } else {
1089 40 : Some(UNIX_EPOCH + Duration::from_secs(ts_bits | (Self::TS_ONES as u64)))
1090 : }
1091 73 : }
1092 :
1093 : /// Record a change in layer residency.
1094 : ///
1095 : /// Recording the event must happen while holding the layer map lock to
1096 : /// ensure that latest-activity-threshold-based layer eviction (eviction_task.rs)
1097 : /// can do an "imitate access" to this layer, before it observes `now-latest_activity() > threshold`.
1098 : ///
1099 : /// If we instead recorded the residence event with a timestamp from before grabbing the layer map lock,
1100 : /// the following race could happen:
1101 : ///
1102 : /// - Compact: Write out an L1 layer from several L0 layers. This records residence event LayerCreate with the current timestamp.
1103 : /// - Eviction: imitate access logical size calculation. This accesses the L0 layers because the L1 layer is not yet in the layer map.
1104 : /// - Compact: Grab layer map lock, add the new L1 to layer map and remove the L0s, release layer map lock.
1105 : /// - Eviction: observes the new L1 layer whose only activity timestamp is the LayerCreate event.
1106 25 : pub(crate) fn record_residence_event_at(&self, now: SystemTime) {
1107 25 : let (mask, value) = Self::to_low_res_timestamp(Self::RTIME_SHIFT, now);
1108 25 : self.write_bits(mask, value);
1109 25 : }
1110 :
1111 24 : pub(crate) fn record_residence_event(&self) {
1112 24 : self.record_residence_event_at(SystemTime::now())
1113 24 : }
1114 :
1115 137458 : fn record_access_at(&self, now: SystemTime) -> bool {
1116 137458 : let (mut mask, mut value) = Self::to_low_res_timestamp(Self::ATIME_SHIFT, now);
1117 :
1118 : // A layer which is accessed must be visible.
1119 137458 : mask |= 0x1 << Self::VISIBILITY_SHIFT;
1120 137458 : value |= 0x1 << Self::VISIBILITY_SHIFT;
1121 :
1122 137458 : let old_bits = self.write_bits(mask, value);
1123 1 : !matches!(
1124 137458 : self.decode_visibility(old_bits),
1125 : LayerVisibilityHint::Visible
1126 : )
1127 137458 : }
1128 :
1129 : /// Returns true if we modified the layer's visibility to set it to Visible implicitly
1130 : /// as a result of this access
1131 138851 : pub(crate) fn record_access(&self, ctx: &RequestContext) -> bool {
1132 138851 : if ctx.access_stats_behavior() == AccessStatsBehavior::Skip {
1133 1396 : return false;
1134 137455 : }
1135 :
1136 137455 : self.record_access_at(SystemTime::now())
1137 138851 : }
1138 :
1139 0 : fn as_api_model(
1140 0 : &self,
1141 0 : reset: LayerAccessStatsReset,
1142 0 : ) -> pageserver_api::models::LayerAccessStats {
1143 0 : let ret = pageserver_api::models::LayerAccessStats {
1144 0 : access_time: self
1145 0 : .read_low_res_timestamp(Self::ATIME_SHIFT)
1146 0 : .unwrap_or(UNIX_EPOCH),
1147 0 : residence_time: self
1148 0 : .read_low_res_timestamp(Self::RTIME_SHIFT)
1149 0 : .unwrap_or(UNIX_EPOCH),
1150 0 : visible: matches!(self.visibility(), LayerVisibilityHint::Visible),
1151 : };
1152 0 : match reset {
1153 0 : LayerAccessStatsReset::NoReset => {}
1154 0 : LayerAccessStatsReset::AllStats => {
1155 0 : self.write_bits((Self::TS_MASK as u64) << Self::ATIME_SHIFT, 0x0);
1156 0 : self.write_bits((Self::TS_MASK as u64) << Self::RTIME_SHIFT, 0x0);
1157 0 : }
1158 : }
1159 0 : ret
1160 0 : }
1161 :
1162 : /// Get the latest access timestamp, falling back to latest residence event. The latest residence event
1163 : /// will be this Layer's construction time, if its residence hasn't changed since then.
1164 21 : pub(crate) fn latest_activity(&self) -> SystemTime {
1165 21 : if let Some(t) = self.read_low_res_timestamp(Self::ATIME_SHIFT) {
1166 3 : t
1167 : } else {
1168 18 : self.read_low_res_timestamp(Self::RTIME_SHIFT)
1169 18 : .expect("Residence time is set on construction")
1170 : }
1171 21 : }
1172 :
1173 : /// Whether this layer has been accessed (excluding in [`AccessStatsBehavior::Skip`]).
1174 : ///
1175 : /// This indicates whether the layer has been used for some purpose that would motivate
1176 : /// us to keep it on disk, such as for serving a getpage request.
1177 17 : fn accessed(&self) -> bool {
1178 : // Consider it accessed if the most recent access is more recent than
1179 : // the most recent change in residence status.
1180 : match (
1181 17 : self.read_low_res_timestamp(Self::ATIME_SHIFT),
1182 17 : self.read_low_res_timestamp(Self::RTIME_SHIFT),
1183 : ) {
1184 15 : (None, _) => false,
1185 0 : (Some(_), None) => true,
1186 2 : (Some(a), Some(r)) => a >= r,
1187 : }
1188 17 : }
1189 :
1190 : /// Helper for extracting the visibility hint from the literal value of our inner u64
1191 138059 : fn decode_visibility(&self, bits: u64) -> LayerVisibilityHint {
1192 138059 : match (bits >> Self::VISIBILITY_SHIFT) & 0x1 {
1193 138052 : 1 => LayerVisibilityHint::Visible,
1194 7 : 0 => LayerVisibilityHint::Covered,
1195 0 : _ => unreachable!(),
1196 : }
1197 138059 : }
1198 :
1199 : /// Returns the old value which has been replaced
1200 197 : pub(crate) fn set_visibility(&self, visibility: LayerVisibilityHint) -> LayerVisibilityHint {
1201 197 : let value = match visibility {
1202 180 : LayerVisibilityHint::Visible => 0x1 << Self::VISIBILITY_SHIFT,
1203 17 : LayerVisibilityHint::Covered => 0x0,
1204 : };
1205 :
1206 197 : let old_bits = self.write_bits(0x1 << Self::VISIBILITY_SHIFT, value);
1207 197 : self.decode_visibility(old_bits)
1208 197 : }
1209 :
1210 404 : pub(crate) fn visibility(&self) -> LayerVisibilityHint {
1211 404 : let read = self.0.load(std::sync::atomic::Ordering::Relaxed);
1212 404 : self.decode_visibility(read)
1213 404 : }
1214 : }
1215 :
1216 : /// Get a layer descriptor from a layer.
1217 : pub(crate) trait AsLayerDesc {
1218 : /// Get the layer descriptor.
1219 : fn layer_desc(&self) -> &PersistentLayerDesc;
1220 : }
1221 :
1222 : pub mod tests {
1223 : use pageserver_api::shard::TenantShardId;
1224 : use utils::id::TimelineId;
1225 :
1226 : use super::*;
1227 :
1228 : impl From<DeltaLayerName> for PersistentLayerDesc {
1229 11 : fn from(value: DeltaLayerName) -> Self {
1230 11 : PersistentLayerDesc::new_delta(
1231 11 : TenantShardId::from([0; 18]),
1232 11 : TimelineId::from_array([0; 16]),
1233 11 : value.key_range,
1234 11 : value.lsn_range,
1235 : 233,
1236 : )
1237 11 : }
1238 : }
1239 :
1240 : impl From<ImageLayerName> for PersistentLayerDesc {
1241 12 : fn from(value: ImageLayerName) -> Self {
1242 12 : PersistentLayerDesc::new_img(
1243 12 : TenantShardId::from([0; 18]),
1244 12 : TimelineId::from_array([0; 16]),
1245 12 : value.key_range,
1246 12 : value.lsn,
1247 : 233,
1248 : )
1249 12 : }
1250 : }
1251 :
1252 : impl From<LayerName> for PersistentLayerDesc {
1253 23 : fn from(value: LayerName) -> Self {
1254 23 : match value {
1255 11 : LayerName::Delta(d) => Self::from(d),
1256 12 : LayerName::Image(i) => Self::from(i),
1257 : }
1258 23 : }
1259 : }
1260 : }
1261 :
1262 : /// Range wrapping newtype, which uses display to render Debug.
1263 : ///
1264 : /// Useful with `Key`, which has too verbose `{:?}` for printing multiple layers.
1265 : struct RangeDisplayDebug<'a, T: std::fmt::Display>(&'a Range<T>);
1266 :
1267 : impl<T: std::fmt::Display> std::fmt::Debug for RangeDisplayDebug<'_, T> {
1268 0 : fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1269 0 : write!(f, "{}..{}", self.0.start, self.0.end)
1270 0 : }
1271 : }
1272 :
1273 : #[cfg(test)]
1274 : mod tests2 {
1275 : use pageserver_api::key::DBDIR_KEY;
1276 : use tracing::info;
1277 :
1278 : use super::*;
1279 : use crate::tenant::storage_layer::IoConcurrency;
1280 :
1281 : /// TODO: currently this test relies on manual visual inspection of the --no-capture output.
1282 : /// Should look like so:
1283 : /// ```text
1284 : /// RUST_LOG=trace cargo nextest run --features testing --no-capture test_io_concurrency_noise
1285 : /// running 1 test
1286 : /// 2025-01-21T17:42:01.335679Z INFO get_vectored_concurrent_io test selected=SidecarTask
1287 : /// 2025-01-21T17:42:01.335680Z TRACE spawning sidecar task task_id=0
1288 : /// 2025-01-21T17:42:01.335937Z TRACE IoConcurrency_sidecar{task_id=0}: start
1289 : /// 2025-01-21T17:42:01.335972Z TRACE IoConcurrency_sidecar{task_id=0}: received new io future
1290 : /// 2025-01-21T17:42:01.335999Z INFO IoConcurrency_sidecar{task_id=0}: waiting for signal to complete IO
1291 : /// 2025-01-21T17:42:01.336229Z WARN dropping ValuesReconstructState while some IOs have not been completed num_active_ios=1 sidecar_task_id=Some(0) backtrace= 0: <pageserver::tenant::storage_layer::ValuesReconstructState as core::ops::drop::Drop>::drop
1292 : /// at ./src/tenant/storage_layer.rs:553:24
1293 : /// 1: core::ptr::drop_in_place<pageserver::tenant::storage_layer::ValuesReconstructState>
1294 : /// at /home/christian/.rustup/toolchains/1.84.0-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/ptr/mod.rs:521:1
1295 : /// 2: core::mem::drop
1296 : /// at /home/christian/.rustup/toolchains/1.84.0-x86_64-unknown-linux-gnu/lib/rustlib/src/rust/library/core/src/mem/mod.rs:942:24
1297 : /// 3: pageserver::tenant::storage_layer::tests2::test_io_concurrency_noise::{{closure}}
1298 : /// at ./src/tenant/storage_layer.rs:1159:9
1299 : /// ...
1300 : /// 49: <unknown>
1301 : /// 2025-01-21T17:42:01.452293Z INFO IoConcurrency_sidecar{task_id=0}: completing IO
1302 : /// 2025-01-21T17:42:01.452357Z TRACE IoConcurrency_sidecar{task_id=0}: io future completed
1303 : /// 2025-01-21T17:42:01.452473Z TRACE IoConcurrency_sidecar{task_id=0}: end
1304 : /// test tenant::storage_layer::tests2::test_io_concurrency_noise ... ok
1305 : ///
1306 : /// ```
1307 : #[tokio::test]
1308 1 : async fn test_io_concurrency_noise() {
1309 1 : crate::tenant::harness::setup_logging();
1310 :
1311 1 : let io_concurrency = IoConcurrency::spawn_for_test();
1312 1 : match *io_concurrency {
1313 : IoConcurrency::Sequential => {
1314 : // This test asserts behavior in sidecar mode, doesn't make sense in sequential mode.
1315 0 : return;
1316 : }
1317 1 : IoConcurrency::SidecarTask { .. } => {}
1318 : }
1319 1 : let mut reconstruct_state = ValuesReconstructState::new(io_concurrency.clone());
1320 :
1321 1 : let (io_fut_is_waiting_tx, io_fut_is_waiting) = tokio::sync::oneshot::channel();
1322 1 : let (do_complete_io, should_complete_io) = tokio::sync::oneshot::channel();
1323 1 : let (io_fut_exiting_tx, io_fut_exiting) = tokio::sync::oneshot::channel();
1324 :
1325 1 : let io = reconstruct_state.update_key(&DBDIR_KEY, Lsn(8), true);
1326 1 : reconstruct_state
1327 1 : .spawn_io(async move {
1328 1 : info!("waiting for signal to complete IO");
1329 1 : io_fut_is_waiting_tx.send(()).unwrap();
1330 1 : should_complete_io.await.unwrap();
1331 1 : info!("completing IO");
1332 1 : io.complete(Ok(OnDiskValue::RawImage(Bytes::new())));
1333 1 : io_fut_exiting_tx.send(()).unwrap();
1334 1 : })
1335 1 : .await;
1336 :
1337 1 : io_fut_is_waiting.await.unwrap();
1338 :
1339 : // this is what makes the noise
1340 1 : drop(reconstruct_state);
1341 :
1342 1 : do_complete_io.send(()).unwrap();
1343 :
1344 1 : io_fut_exiting.await.unwrap();
1345 1 : }
1346 : }
|