aboutsummaryrefslogtreecommitdiffstats
path: root/adenosine-pds/src/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'adenosine-pds/src/lib.rs')
-rw-r--r--adenosine-pds/src/lib.rs205
1 files changed, 128 insertions, 77 deletions
diff --git a/adenosine-pds/src/lib.rs b/adenosine-pds/src/lib.rs
index 009175d..3389281 100644
--- a/adenosine-pds/src/lib.rs
+++ b/adenosine-pds/src/lib.rs
@@ -8,6 +8,7 @@ use rouille::{router, Request, Response};
use serde_json::{json, Value};
use std::collections::BTreeMap;
use std::fmt;
+use std::io::Read;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Mutex;
@@ -30,12 +31,19 @@ pub use repo::{Mutation, RepoCommit, RepoStore};
pub use ucan_p256::P256KeyMaterial;
use web::*;
-struct AtpService {
+pub struct AtpService {
pub repo: RepoStore,
pub atp_db: AtpDatabase,
pub pds_keypair: KeyPair,
- pub pds_public_url: String,
pub tid_gen: TidLord,
+ pub config: AtpServiceConfig,
+}
+
+#[derive(Clone, Debug)]
+pub struct AtpServiceConfig {
+ pub listen_host_port: String,
+ pub public_url: String,
+ pub registration_domain: Option<String>,
}
#[derive(Debug)]
@@ -97,81 +105,105 @@ fn web_wrap(resp: Result<String>) -> Response {
}
}
-pub fn run_server(
- port: u16,
- blockstore_db_path: &PathBuf,
- atp_db_path: &PathBuf,
- keypair: KeyPair,
-) -> Result<()> {
- let srv = Mutex::new(AtpService {
- repo: RepoStore::open(blockstore_db_path)?,
- atp_db: AtpDatabase::open(atp_db_path)?,
- pds_keypair: keypair,
- pds_public_url: format!("http://localhost:{}", port),
- tid_gen: TidLord::new(),
- });
-
- let log_ok = |req: &Request, _resp: &Response, elap: std::time::Duration| {
- info!("{} {} ({:?})", req.method(), req.raw_url(), elap);
- };
- let log_err = |req: &Request, elap: std::time::Duration| {
- error!(
- "HTTP handler panicked: {} {} ({:?})",
- req.method(),
- req.raw_url(),
- elap
- );
- };
-
- // TODO: some static files? https://github.com/tomaka/rouille/blob/master/examples/static-files.rs
- rouille::start_server(format!("localhost:{}", port), move |request| {
- rouille::log_custom(request, log_ok, log_err, || {
- router!(request,
- (GET) ["/"] => {
- let view = GenericHomeView { domain: "domain.todo".to_string() };
- Response::html(view.render().unwrap())
- },
- (GET) ["/about"] => {
- let view = AboutView { domain: "domain.todo".to_string() };
- Response::html(view.render().unwrap())
- },
- (GET) ["/robots.txt"] => {
- Response::text(include_str!("../templates/robots.txt"))
- },
- (GET) ["/u/{did}", did: Did] => {
- web_wrap(profile_handler(&srv, &did, request))
- },
- (GET) ["/u/{did}/{collection}/{tid}", did: Did, collection: Nsid, tid: Tid] => {
- web_wrap(post_handler(&srv, &did, &collection, &tid, request))
- },
- (GET) ["/at/{did}", did: Did] => {
- web_wrap(repo_handler(&srv, &did, request))
- },
- (GET) ["/at/{did}/{collection}", did: Did, collection: Nsid] => {
- web_wrap(collection_handler(&srv, &did, &collection, request))
- },
- (GET) ["/at/{did}/{collection}/{tid}", did: Did, collection: Nsid, tid: Tid] => {
- web_wrap(record_handler(&srv, &did, &collection, &tid, request))
- },
- (GET) ["/static/adenosine.css"] => {
- Response::from_data("text/css", include_str!("../templates/adenosine.css"))
- },
- (GET) ["/static/favicon.png"] => {
- Response::from_data("image/png", include_bytes!("../templates/favicon.png").to_vec())
- },
- (GET) ["/static/logo_128.png"] => {
- Response::from_data("image/png", include_bytes!("../templates/logo_128.png").to_vec())
- },
- (POST) ["/xrpc/{endpoint}", endpoint: String] => {
- xrpc_wrap(xrpc_post_handler(&srv, &endpoint, request))
- },
- (GET) ["/xrpc/{endpoint}", endpoint: String] => {
- xrpc_wrap(xrpc_get_handler(&srv, &endpoint, request))
- },
- _ => Response::text("404: Not Found")
- )
+impl AtpService {
+ pub fn new(
+ blockstore_db_path: &PathBuf,
+ atp_db_path: &PathBuf,
+ keypair: KeyPair,
+ config: AtpServiceConfig,
+ ) -> Result<Self> {
+ Ok(AtpService {
+ repo: RepoStore::open(blockstore_db_path)?,
+ atp_db: AtpDatabase::open(atp_db_path)?,
+ pds_keypair: keypair,
+ tid_gen: TidLord::new(),
+ config,
})
- });
+ }
+
+ pub fn run_server(self) -> Result<()> {
+ let config = self.config.clone();
+ let srv = Mutex::new(self);
+
+ let log_ok = |req: &Request, _resp: &Response, elap: std::time::Duration| {
+ info!("{} {} ({:?})", req.method(), req.raw_url(), elap);
+ };
+ let log_err = |req: &Request, elap: std::time::Duration| {
+ error!(
+ "HTTP handler panicked: {} {} ({:?})",
+ req.method(),
+ req.raw_url(),
+ elap
+ );
+ };
+
+ // TODO: some static files? https://github.com/tomaka/rouille/blob/master/examples/static-files.rs
+ rouille::start_server(config.listen_host_port, move |request| {
+ rouille::log_custom(request, log_ok, log_err, || {
+ router!(request,
+ (GET) ["/"] => {
+ let view = GenericHomeView { domain: "domain.todo".to_string() };
+ Response::html(view.render().unwrap())
+ },
+ (GET) ["/about"] => {
+ let view = AboutView { domain: "domain.todo".to_string() };
+ Response::html(view.render().unwrap())
+ },
+ (GET) ["/robots.txt"] => {
+ Response::text(include_str!("../templates/robots.txt"))
+ },
+ (GET) ["/u/{did}", did: Did] => {
+ web_wrap(profile_handler(&srv, &did, request))
+ },
+ (GET) ["/u/{did}/{collection}/{tid}", did: Did, collection: Nsid, tid: Tid] => {
+ web_wrap(post_handler(&srv, &did, &collection, &tid, request))
+ },
+ (GET) ["/at/{did}", did: Did] => {
+ web_wrap(repo_handler(&srv, &did, request))
+ },
+ (GET) ["/at/{did}/{collection}", did: Did, collection: Nsid] => {
+ web_wrap(collection_handler(&srv, &did, &collection, request))
+ },
+ (GET) ["/at/{did}/{collection}/{tid}", did: Did, collection: Nsid, tid: Tid] => {
+ web_wrap(record_handler(&srv, &did, &collection, &tid, request))
+ },
+ (GET) ["/static/adenosine.css"] => {
+ Response::from_data("text/css", include_str!("../templates/adenosine.css"))
+ },
+ (GET) ["/static/favicon.png"] => {
+ Response::from_data("image/png", include_bytes!("../templates/favicon.png").to_vec())
+ },
+ (GET) ["/static/logo_128.png"] => {
+ Response::from_data("image/png", include_bytes!("../templates/logo_128.png").to_vec())
+ },
+ (POST) ["/xrpc/{endpoint}", endpoint: String] => {
+ xrpc_wrap(xrpc_post_handler(&srv, &endpoint, request))
+ },
+ (GET) ["/xrpc/com.atproto.sync.getRepo"] => {
+ // this one endpoint returns CAR file, not JSON, so wrappers don't work
+ match xrpc_get_repo_handler(&srv, request) {
+ Ok(car_bytes) => Response::from_data("application/octet-stream", car_bytes),
+ Err(e) => {
+ let msg = e.to_string();
+ let code = match e.downcast_ref::<XrpcError>() {
+ Some(XrpcError::BadRequest(_)) => 400,
+ Some(XrpcError::NotFound(_)) => 404,
+ Some(XrpcError::Forbidden(_)) => 403,
+ None => 500,
+ };
+ warn!("HTTP {}: {}", code, msg);
+ Response::json(&json!({ "message": msg })).with_status_code(code)
+ }
+ }
+ },
+ (GET) ["/xrpc/{endpoint}", endpoint: String] => {
+ xrpc_wrap(xrpc_get_handler(&srv, &endpoint, request))
+ },
+ _ => Response::text("404: Not Found")
+ )
+ })
+ });
+ }
}
/// Intentionally serializing with this instead of DAG-JSON, because ATP schemas don't encode CID
@@ -331,6 +363,13 @@ fn xrpc_get_handler(
}
}
+fn xrpc_get_repo_handler(srv: &Mutex<AtpService>, request: &Request) -> Result<Vec<u8>> {
+ let did = Did::from_str(&xrpc_required_param(request, "user")?)?;
+ let mut srv = srv.lock().expect("service mutex");
+ let commit_cid = srv.repo.lookup_commit(&did)?.unwrap();
+ Ok(srv.repo.export_car(&commit_cid, None)?)
+}
+
fn xrpc_post_handler(
srv: &Mutex<AtpService>,
method: &str,
@@ -356,7 +395,7 @@ fn xrpc_post_handler(
// generate DID
let create_op = did::CreateOp::new(
req.handle.clone(),
- srv.pds_public_url.clone(),
+ srv.config.public_url.clone(),
&srv.pds_keypair,
req.recoveryKey.clone(),
);
@@ -504,6 +543,18 @@ fn xrpc_post_handler(
// TODO: next handle updates to database
Ok(json!({}))
}
+ "com.atproto.sync.updateRepo" => {
+ let did = Did::from_str(&xrpc_required_param(request, "did")?)?;
+ // important that this read is before we take the mutex, because it could be slow!
+ let mut car_bytes: Vec<u8> = Default::default();
+ // TODO: unwrap()
+ request.data().unwrap().read_to_end(&mut car_bytes)?;
+ let mut srv = srv.lock().unwrap();
+ let _auth_did = &xrpc_check_auth_header(&mut srv, request, Some(&did))?;
+ srv.repo
+ .import_car_bytes(&car_bytes, Some(did.to_string()))?;
+ Ok(json!({}))
+ }
_ => Err(anyhow!(XrpcError::NotFound(format!(
"XRPC endpoint handler not found: {}",
method