Line data Source code
1 : use std::sync::Arc;
2 :
3 : use anyhow::{bail, Result};
4 : use camino::Utf8PathBuf;
5 :
6 : use postgres_ffi::{MAX_SEND_SIZE, WAL_SEGMENT_SIZE};
7 : use tokio::{
8 : fs::OpenOptions,
9 : io::{AsyncSeekExt, AsyncWriteExt},
10 : };
11 : use tracing::{info, warn};
12 : use utils::{id::TenantTimelineId, lsn::Lsn};
13 :
14 : use crate::{
15 : control_file::{FileStorage, Storage},
16 : pull_timeline::{create_temp_timeline_dir, load_temp_timeline, validate_temp_timeline},
17 : state::TimelinePersistentState,
18 : timeline::{Timeline, TimelineError},
19 : wal_backup::copy_s3_segments,
20 : wal_storage::{wal_file_paths, WalReader},
21 : GlobalTimelines, SafeKeeperConf,
22 : };
23 :
24 : // we don't want to have more than 10 segments on disk after copy, because they take space
25 : const MAX_BACKUP_LAG: u64 = 10 * WAL_SEGMENT_SIZE as u64;
26 :
27 : pub struct Request {
28 : pub source: Arc<Timeline>,
29 : pub until_lsn: Lsn,
30 : pub destination_ttid: TenantTimelineId,
31 : }
32 :
33 48 : pub async fn handle_request(request: Request) -> Result<()> {
34 48 : // TODO: request.until_lsn MUST be a valid LSN, and we cannot check it :(
35 48 : // if LSN will point to the middle of a WAL record, timeline will be in "broken" state
36 48 :
37 48 : match GlobalTimelines::get(request.destination_ttid) {
38 : // timeline already exists. would be good to check that this timeline is the copy
39 : // of the source timeline, but it isn't obvious how to do that
40 0 : Ok(_) => return Ok(()),
41 : // timeline not found, we are going to create it
42 48 : Err(TimelineError::NotFound(_)) => {}
43 : // error, probably timeline was deleted
44 0 : res => {
45 0 : res?;
46 : }
47 : }
48 :
49 48 : let conf = &GlobalTimelines::get_global_config();
50 48 : let ttid = request.destination_ttid;
51 :
52 48 : let (_tmp_dir, tli_dir_path) = create_temp_timeline_dir(conf, ttid).await?;
53 :
54 48 : let (mem_state, state) = request.source.get_state().await;
55 48 : let start_lsn = state.timeline_start_lsn;
56 48 : if start_lsn == Lsn::INVALID {
57 0 : bail!("timeline is not initialized");
58 48 : }
59 48 : let backup_lsn = mem_state.backup_lsn;
60 48 :
61 48 : {
62 48 : let commit_lsn = mem_state.commit_lsn;
63 48 : let flush_lsn = request.source.get_flush_lsn().await;
64 :
65 48 : info!(
66 48 : "collected info about source timeline: start_lsn={}, backup_lsn={}, commit_lsn={}, flush_lsn={}",
67 48 : start_lsn, backup_lsn, commit_lsn, flush_lsn
68 48 : );
69 :
70 48 : assert!(backup_lsn >= start_lsn);
71 48 : assert!(commit_lsn >= start_lsn);
72 48 : assert!(flush_lsn >= start_lsn);
73 :
74 48 : if request.until_lsn > flush_lsn {
75 0 : bail!("requested LSN is beyond the end of the timeline");
76 48 : }
77 48 : if request.until_lsn < start_lsn {
78 0 : bail!("requested LSN is before the start of the timeline");
79 48 : }
80 48 :
81 48 : if request.until_lsn > commit_lsn {
82 1 : warn!("copy_timeline WAL is not fully committed");
83 47 : }
84 :
85 48 : if backup_lsn < request.until_lsn && request.until_lsn.0 - backup_lsn.0 > MAX_BACKUP_LAG {
86 : // we have a lot of segments that are not backed up. we can try to wait here until
87 : // segments will be backed up to remote storage, but it's not clear how long to wait
88 0 : bail!("too many segments are not backed up");
89 48 : }
90 48 : }
91 48 :
92 48 : let wal_seg_size = state.server.wal_seg_size as usize;
93 48 : if wal_seg_size == 0 {
94 0 : bail!("wal_seg_size is not set");
95 48 : }
96 48 :
97 48 : let first_segment = start_lsn.segment_number(wal_seg_size);
98 48 : let last_segment = request.until_lsn.segment_number(wal_seg_size);
99 :
100 48 : let new_backup_lsn = {
101 : // we can't have new backup_lsn greater than existing backup_lsn or start of the last segment
102 48 : let max_backup_lsn = backup_lsn.min(Lsn(last_segment * wal_seg_size as u64));
103 48 :
104 48 : if max_backup_lsn <= start_lsn {
105 : // probably we are starting from the first segment, which was not backed up yet.
106 : // note that start_lsn can be in the middle of the segment
107 33 : start_lsn
108 : } else {
109 : // we have some segments backed up, so we will assume all WAL below max_backup_lsn is backed up
110 15 : assert!(max_backup_lsn.segment_offset(wal_seg_size) == 0);
111 15 : max_backup_lsn
112 : }
113 : };
114 :
115 : // all previous segments will be copied inside S3
116 48 : let first_ondisk_segment = new_backup_lsn.segment_number(wal_seg_size);
117 48 : assert!(first_ondisk_segment <= last_segment);
118 48 : assert!(first_ondisk_segment >= first_segment);
119 :
120 48 : copy_s3_segments(
121 48 : wal_seg_size,
122 48 : &request.source.ttid,
123 48 : &request.destination_ttid,
124 48 : first_segment,
125 48 : first_ondisk_segment,
126 48 : )
127 239 : .await?;
128 :
129 48 : copy_disk_segments(
130 48 : conf,
131 48 : &state,
132 48 : wal_seg_size,
133 48 : &request.source.ttid,
134 48 : new_backup_lsn,
135 48 : request.until_lsn,
136 48 : &tli_dir_path,
137 48 : )
138 2156 : .await?;
139 :
140 48 : let mut new_state = TimelinePersistentState::new(
141 48 : &request.destination_ttid,
142 48 : state.server.clone(),
143 48 : vec![],
144 48 : request.until_lsn,
145 48 : start_lsn,
146 48 : );
147 48 : new_state.timeline_start_lsn = start_lsn;
148 48 : new_state.peer_horizon_lsn = request.until_lsn;
149 48 : new_state.backup_lsn = new_backup_lsn;
150 :
151 48 : let mut file_storage = FileStorage::create_new(tli_dir_path.clone(), conf, new_state.clone())?;
152 144 : file_storage.persist(&new_state).await?;
153 :
154 : // now we have a ready timeline in a temp directory
155 48 : validate_temp_timeline(conf, request.destination_ttid, &tli_dir_path).await?;
156 96 : load_temp_timeline(conf, request.destination_ttid, &tli_dir_path).await?;
157 :
158 48 : Ok(())
159 48 : }
160 :
161 48 : async fn copy_disk_segments(
162 48 : conf: &SafeKeeperConf,
163 48 : persisted_state: &TimelinePersistentState,
164 48 : wal_seg_size: usize,
165 48 : source_ttid: &TenantTimelineId,
166 48 : start_lsn: Lsn,
167 48 : end_lsn: Lsn,
168 48 : tli_dir_path: &Utf8PathBuf,
169 48 : ) -> Result<()> {
170 48 : let mut wal_reader = WalReader::new(
171 48 : conf.workdir.clone(),
172 48 : conf.timeline_dir(source_ttid),
173 48 : persisted_state,
174 48 : start_lsn,
175 48 : true,
176 48 : )?;
177 :
178 48 : let mut buf = [0u8; MAX_SEND_SIZE];
179 48 :
180 48 : let first_segment = start_lsn.segment_number(wal_seg_size);
181 48 : let last_segment = end_lsn.segment_number(wal_seg_size);
182 :
183 48 : for segment in first_segment..=last_segment {
184 48 : let segment_start = segment * wal_seg_size as u64;
185 48 : let segment_end = segment_start + wal_seg_size as u64;
186 48 :
187 48 : let copy_start = segment_start.max(start_lsn.0);
188 48 : let copy_end = segment_end.min(end_lsn.0);
189 48 :
190 48 : let copy_start = copy_start - segment_start;
191 48 : let copy_end = copy_end - segment_start;
192 :
193 48 : let wal_file_path = {
194 48 : let (normal, partial) = wal_file_paths(tli_dir_path, segment, wal_seg_size)?;
195 :
196 48 : if segment == last_segment {
197 48 : partial
198 : } else {
199 0 : normal
200 : }
201 : };
202 :
203 48 : write_segment(
204 48 : &mut buf,
205 48 : &wal_file_path,
206 48 : wal_seg_size as u64,
207 48 : copy_start,
208 48 : copy_end,
209 48 : &mut wal_reader,
210 48 : )
211 2156 : .await?;
212 : }
213 :
214 48 : Ok(())
215 48 : }
216 :
217 48 : async fn write_segment(
218 48 : buf: &mut [u8],
219 48 : file_path: &Utf8PathBuf,
220 48 : wal_seg_size: u64,
221 48 : from: u64,
222 48 : to: u64,
223 48 : reader: &mut WalReader,
224 48 : ) -> Result<()> {
225 48 : assert!(from <= to);
226 48 : assert!(to <= wal_seg_size);
227 :
228 48 : let mut file = OpenOptions::new()
229 48 : .create(true)
230 48 : .write(true)
231 48 : .open(&file_path)
232 48 : .await?;
233 :
234 : // maybe fill with zeros, as in wal_storage.rs?
235 48 : file.set_len(wal_seg_size).await?;
236 48 : file.seek(std::io::SeekFrom::Start(from)).await?;
237 :
238 48 : let mut bytes_left = to - from;
239 1287 : while bytes_left > 0 {
240 1239 : let len = bytes_left as usize;
241 1239 : let len = len.min(buf.len());
242 1341 : let len = reader.read(&mut buf[..len]).await?;
243 1239 : file.write_all(&buf[..len]).await?;
244 1239 : bytes_left -= len as u64;
245 : }
246 :
247 48 : file.flush().await?;
248 48 : file.sync_all().await?;
249 48 : Ok(())
250 48 : }
|