Line data Source code
1 : pub mod claims;
2 : use crate::claims::{DeletePrefixClaims, EndpointStorageClaims};
3 : use anyhow::Result;
4 : use axum::extract::{FromRequestParts, Path};
5 : use axum::response::{IntoResponse, Response};
6 : use axum::{RequestPartsExt, http::StatusCode, http::request::Parts};
7 : use axum_extra::TypedHeader;
8 : use axum_extra::headers::{Authorization, authorization::Bearer};
9 : use camino::Utf8PathBuf;
10 : use remote_storage::{GenericRemoteStorage, RemotePath};
11 : use serde::{Deserialize, Serialize};
12 : use std::fmt::Display;
13 : use std::result::Result as StdResult;
14 : use std::sync::Arc;
15 : use tokio_util::sync::CancellationToken;
16 : use tracing::{debug, error};
17 : use utils::auth::JwtAuth;
18 : use utils::id::{EndpointId, TenantId, TimelineId};
19 :
20 50 : fn normalize_key(key: &str) -> StdResult<Utf8PathBuf, String> {
21 50 : let key = clean_utf8(&Utf8PathBuf::from(key));
22 50 : if key.starts_with("..") || key == "." || key == "/" {
23 9 : return Err(format!("invalid key {key}"));
24 41 : }
25 41 : match key.strip_prefix("/").map(Utf8PathBuf::from) {
26 1 : Ok(p) => Ok(p),
27 40 : _ => Ok(key),
28 : }
29 50 : }
30 :
31 : // Copied from path_clean crate with PathBuf->Utf8PathBuf
32 50 : fn clean_utf8(path: &camino::Utf8Path) -> Utf8PathBuf {
33 : use camino::Utf8Component as Comp;
34 50 : let mut out = Vec::new();
35 82 : for comp in path.components() {
36 82 : match comp {
37 1 : Comp::CurDir => (),
38 18 : Comp::ParentDir => match out.last() {
39 1 : Some(Comp::RootDir) => (),
40 12 : Some(Comp::Normal(_)) => {
41 12 : out.pop();
42 12 : }
43 : None | Some(Comp::CurDir) | Some(Comp::ParentDir) | Some(Comp::Prefix(_)) => {
44 5 : out.push(comp)
45 : }
46 : },
47 63 : comp => out.push(comp),
48 : }
49 : }
50 50 : if !out.is_empty() {
51 48 : out.iter().collect()
52 : } else {
53 2 : Utf8PathBuf::from(".")
54 : }
55 50 : }
56 :
57 : pub struct Storage {
58 : pub auth: JwtAuth,
59 : pub storage: GenericRemoteStorage,
60 : pub cancel: CancellationToken,
61 : pub max_upload_file_limit: usize,
62 : }
63 :
64 6 : #[derive(Deserialize, Serialize)]
65 : struct KeyRequest {
66 : tenant_id: TenantId,
67 : timeline_id: TimelineId,
68 : endpoint_id: EndpointId,
69 : path: String,
70 : }
71 :
72 6 : #[derive(Deserialize, Serialize, PartialEq)]
73 : struct PrefixKeyRequest {
74 : tenant_id: TenantId,
75 : timeline_id: Option<TimelineId>,
76 : endpoint_id: Option<EndpointId>,
77 : }
78 :
79 : #[derive(Debug, PartialEq)]
80 : pub struct S3Path {
81 : pub path: RemotePath,
82 : }
83 :
84 : impl TryFrom<&KeyRequest> for S3Path {
85 : type Error = String;
86 41 : fn try_from(req: &KeyRequest) -> StdResult<Self, Self::Error> {
87 : let KeyRequest {
88 41 : tenant_id,
89 41 : timeline_id,
90 41 : endpoint_id,
91 41 : path,
92 41 : } = &req;
93 41 : let prefix = format!("{tenant_id}/{timeline_id}/{endpoint_id}",);
94 41 : let path = Utf8PathBuf::from(prefix).join(normalize_key(path)?);
95 38 : let path = RemotePath::new(&path).unwrap(); // unwrap() because the path is already relative
96 38 : Ok(S3Path { path })
97 41 : }
98 : }
99 :
100 73 : fn unauthorized(route: impl Display, claims: impl Display) -> Response {
101 73 : debug!(%route, %claims, "route doesn't match claims");
102 73 : StatusCode::UNAUTHORIZED.into_response()
103 73 : }
104 :
105 14 : pub fn bad_request(err: impl Display, desc: &'static str) -> Response {
106 14 : debug!(%err, desc);
107 14 : (StatusCode::BAD_REQUEST, err.to_string()).into_response()
108 14 : }
109 :
110 21 : pub fn ok() -> Response {
111 21 : StatusCode::OK.into_response()
112 21 : }
113 :
114 0 : pub fn internal_error(err: impl Display, path: impl Display, desc: &'static str) -> Response {
115 0 : error!(%err, %path, desc);
116 0 : StatusCode::INTERNAL_SERVER_ERROR.into_response()
117 0 : }
118 :
119 15 : pub fn not_found(key: impl ToString) -> Response {
120 15 : (StatusCode::NOT_FOUND, key.to_string()).into_response()
121 0 : }
122 :
123 : impl FromRequestParts<Arc<Storage>> for S3Path {
124 : type Rejection = Response;
125 117 : async fn from_request_parts(
126 117 : parts: &mut Parts,
127 117 : state: &Arc<Storage>,
128 117 : ) -> Result<Self, Self::Rejection> {
129 117 : let Path(path): Path<KeyRequest> = parts
130 117 : .extract()
131 117 : .await
132 117 : .map_err(|e| bad_request(e, "invalid route"))?;
133 111 : let TypedHeader(Authorization(bearer)) = parts
134 111 : .extract::<TypedHeader<Authorization<Bearer>>>()
135 111 : .await
136 111 : .map_err(|e| bad_request(e, "invalid token"))?;
137 111 : let claims: EndpointStorageClaims = state
138 111 : .auth
139 111 : .decode(bearer.token())
140 111 : .map_err(|e| bad_request(e, "decoding token"))?
141 : .claims;
142 :
143 : // Read paths may have different endpoint ids. For readonly -> readwrite replica
144 : // prewarming, endpoint must read other endpoint's data.
145 111 : let endpoint_id = if parts.method == axum::http::Method::GET {
146 46 : claims.endpoint_id.clone()
147 : } else {
148 65 : path.endpoint_id.clone()
149 : };
150 :
151 111 : let route = EndpointStorageClaims {
152 111 : tenant_id: path.tenant_id,
153 111 : timeline_id: path.timeline_id,
154 111 : endpoint_id,
155 111 : exp: claims.exp,
156 111 : };
157 111 : if route != claims {
158 73 : return Err(unauthorized(route, claims));
159 38 : }
160 38 : (&path)
161 38 : .try_into()
162 38 : .map_err(|e| bad_request(e, "invalid route"))
163 117 : }
164 : }
165 :
166 : #[derive(Debug, PartialEq)]
167 : pub struct PrefixS3Path {
168 : pub path: RemotePath,
169 : }
170 :
171 : impl From<&DeletePrefixClaims> for PrefixS3Path {
172 9 : fn from(path: &DeletePrefixClaims) -> Self {
173 9 : let timeline_id = path
174 9 : .timeline_id
175 9 : .as_ref()
176 9 : .map(ToString::to_string)
177 9 : .unwrap_or("".to_string());
178 9 : let endpoint_id = path
179 9 : .endpoint_id
180 9 : .as_ref()
181 9 : .map(ToString::to_string)
182 9 : .unwrap_or("".to_string());
183 9 : let path = Utf8PathBuf::from(path.tenant_id.to_string())
184 9 : .join(timeline_id)
185 9 : .join(endpoint_id);
186 9 : let path = RemotePath::new(&path).unwrap(); // unwrap() because the path is already relative
187 9 : PrefixS3Path { path }
188 9 : }
189 : }
190 :
191 : impl FromRequestParts<Arc<Storage>> for PrefixS3Path {
192 : type Rejection = Response;
193 12 : async fn from_request_parts(
194 12 : parts: &mut Parts,
195 12 : state: &Arc<Storage>,
196 12 : ) -> Result<Self, Self::Rejection> {
197 12 : let Path(path) = parts
198 12 : .extract::<Path<PrefixKeyRequest>>()
199 12 : .await
200 12 : .map_err(|e| bad_request(e, "invalid route"))?;
201 6 : let TypedHeader(Authorization(bearer)) = parts
202 6 : .extract::<TypedHeader<Authorization<Bearer>>>()
203 6 : .await
204 6 : .map_err(|e| bad_request(e, "invalid token"))?;
205 6 : let claims: DeletePrefixClaims = state
206 6 : .auth
207 6 : .decode(bearer.token())
208 6 : .map_err(|e| bad_request(e, "invalid token"))?
209 : .claims;
210 6 : let route = DeletePrefixClaims {
211 6 : tenant_id: path.tenant_id,
212 6 : timeline_id: path.timeline_id,
213 6 : endpoint_id: path.endpoint_id,
214 6 : exp: claims.exp,
215 6 : };
216 6 : if route != claims {
217 0 : return Err(unauthorized(route, claims));
218 6 : }
219 6 : Ok((&route).into())
220 12 : }
221 : }
222 :
223 : #[cfg(test)]
224 : mod tests {
225 : use super::*;
226 :
227 : #[test]
228 1 : fn normalize_key() {
229 1 : let f = super::normalize_key;
230 1 : assert_eq!(f("hello/world/..").unwrap(), Utf8PathBuf::from("hello"));
231 1 : assert_eq!(
232 1 : f("ololo/1/../../not_ololo").unwrap(),
233 1 : Utf8PathBuf::from("not_ololo")
234 : );
235 1 : assert!(f("ololo/1/../../../").is_err());
236 1 : assert!(f(".").is_err());
237 1 : assert!(f("../").is_err());
238 1 : assert!(f("").is_err());
239 1 : assert_eq!(f("/1/2/3").unwrap(), Utf8PathBuf::from("1/2/3"));
240 1 : assert!(f("/1/2/3/../../../").is_err());
241 1 : assert!(f("/1/2/3/../../../../").is_err());
242 1 : }
243 :
244 : const TENANT_ID: TenantId =
245 : TenantId::from_array([1, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6]);
246 : const TIMELINE_ID: TimelineId =
247 : TimelineId::from_array([1, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 7]);
248 : const ENDPOINT_ID: &str = "ep-winter-frost-a662z3vg";
249 :
250 : #[test]
251 1 : fn s3_path() {
252 1 : let auth = EndpointStorageClaims {
253 1 : tenant_id: TENANT_ID,
254 1 : timeline_id: TIMELINE_ID,
255 1 : endpoint_id: ENDPOINT_ID.into(),
256 1 : exp: u64::MAX,
257 1 : };
258 2 : let s3_path = |key| {
259 2 : let path = &format!("{TENANT_ID}/{TIMELINE_ID}/{ENDPOINT_ID}/{key}");
260 2 : let path = RemotePath::from_string(path).unwrap();
261 2 : S3Path { path }
262 2 : };
263 :
264 1 : let path = "cache_key".to_string();
265 1 : let mut key_path = KeyRequest {
266 1 : path,
267 1 : tenant_id: auth.tenant_id,
268 1 : timeline_id: auth.timeline_id,
269 1 : endpoint_id: auth.endpoint_id,
270 1 : };
271 1 : assert_eq!(S3Path::try_from(&key_path).unwrap(), s3_path(key_path.path));
272 :
273 1 : key_path.path = "we/can/have/nested/paths".to_string();
274 1 : assert_eq!(S3Path::try_from(&key_path).unwrap(), s3_path(key_path.path));
275 :
276 1 : key_path.path = "../error/hello/../".to_string();
277 1 : assert!(S3Path::try_from(&key_path).is_err());
278 1 : }
279 :
280 : #[test]
281 1 : fn prefix_s3_path() {
282 1 : let mut path = DeletePrefixClaims {
283 1 : tenant_id: TENANT_ID,
284 1 : timeline_id: None,
285 1 : endpoint_id: None,
286 1 : exp: 0,
287 1 : };
288 3 : let prefix_path = |s: String| RemotePath::from_string(&s).unwrap();
289 1 : assert_eq!(
290 1 : PrefixS3Path::from(&path).path,
291 1 : prefix_path(format!("{TENANT_ID}"))
292 : );
293 :
294 1 : path.timeline_id = Some(TIMELINE_ID);
295 1 : assert_eq!(
296 1 : PrefixS3Path::from(&path).path,
297 1 : prefix_path(format!("{TENANT_ID}/{TIMELINE_ID}"))
298 : );
299 :
300 1 : path.endpoint_id = Some(ENDPOINT_ID.into());
301 1 : assert_eq!(
302 1 : PrefixS3Path::from(&path).path,
303 1 : prefix_path(format!("{TENANT_ID}/{TIMELINE_ID}/{ENDPOINT_ID}"))
304 : );
305 1 : }
306 : }
|