aboutsummaryrefslogtreecommitdiffstats
path: root/adenosine-pds/src/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'adenosine-pds/src/lib.rs')
-rw-r--r--adenosine-pds/src/lib.rs53
1 files changed, 34 insertions, 19 deletions
diff --git a/adenosine-pds/src/lib.rs b/adenosine-pds/src/lib.rs
index 894f4a2..3d70d08 100644
--- a/adenosine-pds/src/lib.rs
+++ b/adenosine-pds/src/lib.rs
@@ -68,6 +68,7 @@ enum XrpcError {
BadRequest(String),
NotFound(String),
Forbidden(String),
+ MutexPoisoned,
}
impl std::error::Error for XrpcError {}
@@ -78,6 +79,7 @@ impl fmt::Display for XrpcError {
Self::BadRequest(msg) | Self::NotFound(msg) | Self::Forbidden(msg) => {
write!(f, "{}", msg)
}
+ Self::MutexPoisoned => write!(f, "service mutex poisoned"),
}
}
}
@@ -92,6 +94,8 @@ fn xrpc_wrap<S: serde::Serialize>(resp: Result<S>) -> Response {
Some(XrpcError::BadRequest(_)) => 400,
Some(XrpcError::NotFound(_)) => 404,
Some(XrpcError::Forbidden(_)) => 403,
+ // crash hard on mutex poison error
+ Some(XrpcError::MutexPoisoned) => std::process::exit(-1),
None => 500,
};
warn!("HTTP {}: {}", code, msg);
@@ -111,13 +115,17 @@ fn web_wrap(resp: Result<String>) -> Response {
Some(XrpcError::BadRequest(_)) => 400,
Some(XrpcError::NotFound(_)) => 404,
Some(XrpcError::Forbidden(_)) => 403,
+ // crash hard on mutex poison error
+ Some(XrpcError::MutexPoisoned) => std::process::exit(-1),
None => 500,
};
warn!("HTTP {}: {}", code, msg);
- Response::html(format!(
- "<html><body><h1>{}</h1><p>{}</body></html>",
- code, msg
- ))
+ let view = ErrorView {
+ domain: "ERROR".to_string(),
+ status_code: code,
+ error_message: msg,
+ };
+ Response::html(view.render().unwrap())
}
}
}
@@ -152,8 +160,14 @@ impl AtpService {
let config = self.config.clone();
let srv = Mutex::new(self);
- let log_ok = |req: &Request, _resp: &Response, elap: std::time::Duration| {
- info!("{} {} ({:?})", req.method(), req.raw_url(), elap);
+ let log_ok = |req: &Request, resp: &Response, elap: std::time::Duration| {
+ info!(
+ "{} {} ({}, {:?})",
+ req.method(),
+ req.raw_url(),
+ resp.status_code,
+ elap
+ );
};
let log_err = |req: &Request, elap: std::time::Duration| {
error!(
@@ -164,7 +178,6 @@ impl AtpService {
);
};
- // TODO: some static files? https://github.com/tomaka/rouille/blob/master/examples/static-files.rs
rouille::start_server(config.listen_host_port, move |request| {
rouille::log_custom(request, log_ok, log_err, || {
router!(request,
@@ -225,6 +238,8 @@ impl AtpService {
Some(XrpcError::BadRequest(_)) => 400,
Some(XrpcError::NotFound(_)) => 404,
Some(XrpcError::Forbidden(_)) => 403,
+ // crash hard on mutex poison error
+ Some(XrpcError::MutexPoisoned) => std::process::exit(-1),
None => 500,
};
warn!("HTTP {}: {}", code, msg);
@@ -325,7 +340,7 @@ fn xrpc_get_handler(
) -> Result<serde_json::Value> {
match method {
"com.atproto.server.getAccountsConfig" => {
- let srv = srv.lock().expect("service mutex");
+ let srv = srv.lock().or(Err(XrpcError::MutexPoisoned))?;
let mut avail_domains = vec![];
if let Some(domain) = &srv.config.registration_domain {
avail_domains.push(domain)
@@ -338,7 +353,7 @@ fn xrpc_get_handler(
let did = Did::from_str(&xrpc_required_param(request, "user")?)?;
let collection = Nsid::from_str(&xrpc_required_param(request, "collection")?)?;
let rkey = Tid::from_str(&xrpc_required_param(request, "rkey")?)?;
- let mut srv = srv.lock().expect("service mutex");
+ let mut srv = srv.lock().or(Err(XrpcError::MutexPoisoned))?;
let key = format!("/{}/{}", collection, rkey);
match srv.repo.get_atp_record(&did, &collection, &rkey) {
// TODO: format as JSON, not text debug
@@ -352,7 +367,7 @@ fn xrpc_get_handler(
}
"com.atproto.sync.getRoot" => {
let did = Did::from_str(&xrpc_required_param(request, "did")?)?;
- let mut srv = srv.lock().expect("service mutex");
+ let mut srv = srv.lock().or(Err(XrpcError::MutexPoisoned))?;
srv.repo
.lookup_commit(&did)?
.map(|v| json!({ "root": v }))
@@ -365,7 +380,7 @@ fn xrpc_get_handler(
let did = Did::from_str(&xrpc_required_param(request, "user")?)?;
let collection = Nsid::from_str(&xrpc_required_param(request, "collection")?)?;
let mut record_list: Vec<Value> = vec![];
- let mut srv = srv.lock().expect("service mutex");
+ let mut srv = srv.lock().or(Err(XrpcError::MutexPoisoned))?;
let commit_cid = &srv.repo.lookup_commit(&did)?.unwrap();
let last_commit = srv.repo.get_commit(commit_cid)?;
let full_map = srv.repo.mst_to_map(&last_commit.mst_cid)?;
@@ -384,7 +399,7 @@ fn xrpc_get_handler(
Ok(json!({ "records": record_list }))
}
"com.atproto.session.get" => {
- let mut srv = srv.lock().expect("service mutex");
+ let mut srv = srv.lock().or(Err(XrpcError::MutexPoisoned))?;
let auth_did = &xrpc_check_auth_header(&mut srv, request, None)?;
let handle = srv
.atp_db
@@ -394,7 +409,7 @@ fn xrpc_get_handler(
}
"com.atproto.handle.resolve" => {
let handle = xrpc_required_param(request, "handle")?;
- let mut srv = srv.lock().expect("service mutex");
+ let mut srv = srv.lock().or(Err(XrpcError::MutexPoisoned))?;
match srv.atp_db.resolve_handle(&handle)? {
Some(did) => Ok(json!({"did": did.to_string()})),
None => Err(XrpcError::NotFound(format!(
@@ -435,7 +450,7 @@ fn xrpc_get_handler(
fn xrpc_get_repo_handler(srv: &Mutex<AtpService>, request: &Request) -> Result<Vec<u8>> {
let did = Did::from_str(&xrpc_required_param(request, "user")?)?;
- let mut srv = srv.lock().expect("service mutex");
+ let mut srv = srv.lock().or(Err(XrpcError::MutexPoisoned))?;
let commit_cid = srv.repo.lookup_commit(&did)?.unwrap();
Ok(srv.repo.export_car(&commit_cid, None)?)
}
@@ -667,7 +682,7 @@ fn account_view_handler(
request: &Request,
) -> Result<String> {
let host = request.header("Host").unwrap_or("localhost");
- let mut srv = srv.lock().expect("service mutex");
+ let mut srv = srv.lock().or(Err(XrpcError::MutexPoisoned))?;
// TODO: unwrap as 404
let did = srv
.atp_db
@@ -694,7 +709,7 @@ fn thread_view_handler(
) -> Result<String> {
let host = request.header("Host").unwrap_or("localhost");
let collection = Nsid::from_str("app.bsky.feed.post")?;
- let mut srv = srv.lock().expect("service mutex");
+ let mut srv = srv.lock().or(Err(XrpcError::MutexPoisoned))?;
// TODO: not unwrap
let did = srv.atp_db.resolve_handle(handle)?.unwrap();
@@ -714,7 +729,7 @@ fn repo_view_handler(srv: &Mutex<AtpService>, did: &str, request: &Request) -> R
let host = request.header("Host").unwrap_or("localhost");
let did = Did::from_str(did)?;
- let mut srv = srv.lock().expect("service mutex");
+ let mut srv = srv.lock().or(Err(XrpcError::MutexPoisoned))?;
let did_doc = srv.atp_db.get_did_doc(&did)?;
let commit_cid = &srv.repo.lookup_commit(&did)?.unwrap();
let commit = srv.repo.get_commit(commit_cid)?;
@@ -747,7 +762,7 @@ fn collection_view_handler(
let collection = Nsid::from_str(collection)?;
let mut record_list: Vec<Value> = vec![];
- let mut srv = srv.lock().expect("service mutex");
+ let mut srv = srv.lock().or(Err(XrpcError::MutexPoisoned))?;
let commit_cid = &srv.repo.lookup_commit(&did)?.unwrap();
let last_commit = srv.repo.get_commit(commit_cid)?;
let full_map = srv.repo.mst_to_map(&last_commit.mst_cid)?;
@@ -786,7 +801,7 @@ fn record_view_handler(
let collection = Nsid::from_str(collection)?;
let rkey = Tid::from_str(tid)?;
- let mut srv = srv.lock().expect("service mutex");
+ let mut srv = srv.lock().or(Err(XrpcError::MutexPoisoned))?;
let key = format!("/{}/{}", collection, rkey);
let record = match srv.repo.get_atp_record(&did, &collection, &rkey) {
Ok(Some(ipld)) => ipld_into_json_value(ipld),