From 79869226250beff62dde2c57a8e6e16eaa893b75 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Mon, 7 Nov 2022 18:01:20 -0800 Subject: pds: add some more config --- adenosine-pds/src/bin/adenosine-pds.rs | 29 ++++- adenosine-pds/src/lib.rs | 205 ++++++++++++++++++++------------- 2 files changed, 155 insertions(+), 79 deletions(-) diff --git a/adenosine-pds/src/bin/adenosine-pds.rs b/adenosine-pds/src/bin/adenosine-pds.rs index f654dc1..3587896 100644 --- a/adenosine-pds/src/bin/adenosine-pds.rs +++ b/adenosine-pds/src/bin/adenosine-pds.rs @@ -54,6 +54,12 @@ enum Command { hide_env_values = true )] pds_secret_key: String, + + #[structopt(long = "--registration-domain", env = "ATP_PDS_REGISTRATION_DOMAIN")] + registration_domain: Option, + + #[structopt(long = "--public-url", env = "ATP_PDS_PUBLIC_URL")] + public_url: Option, }, /// Helper to import an IPLD CARv1 file in to sqlite data store @@ -98,10 +104,29 @@ fn main() -> Result<()> { Command::Serve { port, pds_secret_key, + registration_domain, + public_url, } => { - // TODO: log some config stuff? let keypair = KeyPair::from_hex(&pds_secret_key)?; - run_server(port, &opt.blockstore_db_path, &opt.atp_db_path, keypair) + // clean up config a bit + let registration_domain = match registration_domain { + None => None, + Some(v) if v.is_empty() => None, + Some(v) => Some(v), + }; + let public_url = match public_url { + None => format!("http://localhost:{}", port), + Some(v) if v.is_empty() => format!("http://localhost:{}", port), + Some(v) => v, + }; + let config = AtpServiceConfig { + listen_host_port: format!("localhost:{}", port), + public_url: public_url, + registration_domain: registration_domain, + }; + log::info!("PDS config: {:?}", config); + let srv = AtpService::new(&opt.blockstore_db_path, &opt.atp_db_path, keypair, config)?; + srv.run_server() } // TODO: handle alias Command::Import { car_path, alias } => { diff --git a/adenosine-pds/src/lib.rs b/adenosine-pds/src/lib.rs index 009175d..3389281 100644 --- a/adenosine-pds/src/lib.rs +++ b/adenosine-pds/src/lib.rs @@ -8,6 +8,7 @@ use rouille::{router, Request, Response}; use serde_json::{json, Value}; use std::collections::BTreeMap; use std::fmt; +use std::io::Read; use std::path::PathBuf; use std::str::FromStr; use std::sync::Mutex; @@ -30,12 +31,19 @@ pub use repo::{Mutation, RepoCommit, RepoStore}; pub use ucan_p256::P256KeyMaterial; use web::*; -struct AtpService { +pub struct AtpService { pub repo: RepoStore, pub atp_db: AtpDatabase, pub pds_keypair: KeyPair, - pub pds_public_url: String, pub tid_gen: TidLord, + pub config: AtpServiceConfig, +} + +#[derive(Clone, Debug)] +pub struct AtpServiceConfig { + pub listen_host_port: String, + pub public_url: String, + pub registration_domain: Option, } #[derive(Debug)] @@ -97,81 +105,105 @@ fn web_wrap(resp: Result) -> Response { } } -pub fn run_server( - port: u16, - blockstore_db_path: &PathBuf, - atp_db_path: &PathBuf, - keypair: KeyPair, -) -> Result<()> { - let srv = Mutex::new(AtpService { - repo: RepoStore::open(blockstore_db_path)?, - atp_db: AtpDatabase::open(atp_db_path)?, - pds_keypair: keypair, - pds_public_url: format!("http://localhost:{}", port), - tid_gen: TidLord::new(), - }); - - let log_ok = |req: &Request, _resp: &Response, elap: std::time::Duration| { - info!("{} {} ({:?})", req.method(), req.raw_url(), elap); - }; - let log_err = |req: &Request, elap: std::time::Duration| { - error!( - "HTTP handler panicked: {} {} ({:?})", - req.method(), - req.raw_url(), - elap - ); - }; - - // TODO: some static files? https://github.com/tomaka/rouille/blob/master/examples/static-files.rs - rouille::start_server(format!("localhost:{}", port), move |request| { - rouille::log_custom(request, log_ok, log_err, || { - router!(request, - (GET) ["/"] => { - let view = GenericHomeView { domain: "domain.todo".to_string() }; - Response::html(view.render().unwrap()) - }, - (GET) ["/about"] => { - let view = AboutView { domain: "domain.todo".to_string() }; - Response::html(view.render().unwrap()) - }, - (GET) ["/robots.txt"] => { - Response::text(include_str!("../templates/robots.txt")) - }, - (GET) ["/u/{did}", did: Did] => { - web_wrap(profile_handler(&srv, &did, request)) - }, - (GET) ["/u/{did}/{collection}/{tid}", did: Did, collection: Nsid, tid: Tid] => { - web_wrap(post_handler(&srv, &did, &collection, &tid, request)) - }, - (GET) ["/at/{did}", did: Did] => { - web_wrap(repo_handler(&srv, &did, request)) - }, - (GET) ["/at/{did}/{collection}", did: Did, collection: Nsid] => { - web_wrap(collection_handler(&srv, &did, &collection, request)) - }, - (GET) ["/at/{did}/{collection}/{tid}", did: Did, collection: Nsid, tid: Tid] => { - web_wrap(record_handler(&srv, &did, &collection, &tid, request)) - }, - (GET) ["/static/adenosine.css"] => { - Response::from_data("text/css", include_str!("../templates/adenosine.css")) - }, - (GET) ["/static/favicon.png"] => { - Response::from_data("image/png", include_bytes!("../templates/favicon.png").to_vec()) - }, - (GET) ["/static/logo_128.png"] => { - Response::from_data("image/png", include_bytes!("../templates/logo_128.png").to_vec()) - }, - (POST) ["/xrpc/{endpoint}", endpoint: String] => { - xrpc_wrap(xrpc_post_handler(&srv, &endpoint, request)) - }, - (GET) ["/xrpc/{endpoint}", endpoint: String] => { - xrpc_wrap(xrpc_get_handler(&srv, &endpoint, request)) - }, - _ => Response::text("404: Not Found") - ) +impl AtpService { + pub fn new( + blockstore_db_path: &PathBuf, + atp_db_path: &PathBuf, + keypair: KeyPair, + config: AtpServiceConfig, + ) -> Result { + Ok(AtpService { + repo: RepoStore::open(blockstore_db_path)?, + atp_db: AtpDatabase::open(atp_db_path)?, + pds_keypair: keypair, + tid_gen: TidLord::new(), + config, }) - }); + } + + pub fn run_server(self) -> Result<()> { + let config = self.config.clone(); + let srv = Mutex::new(self); + + let log_ok = |req: &Request, _resp: &Response, elap: std::time::Duration| { + info!("{} {} ({:?})", req.method(), req.raw_url(), elap); + }; + let log_err = |req: &Request, elap: std::time::Duration| { + error!( + "HTTP handler panicked: {} {} ({:?})", + req.method(), + req.raw_url(), + elap + ); + }; + + // TODO: some static files? https://github.com/tomaka/rouille/blob/master/examples/static-files.rs + rouille::start_server(config.listen_host_port, move |request| { + rouille::log_custom(request, log_ok, log_err, || { + router!(request, + (GET) ["/"] => { + let view = GenericHomeView { domain: "domain.todo".to_string() }; + Response::html(view.render().unwrap()) + }, + (GET) ["/about"] => { + let view = AboutView { domain: "domain.todo".to_string() }; + Response::html(view.render().unwrap()) + }, + (GET) ["/robots.txt"] => { + Response::text(include_str!("../templates/robots.txt")) + }, + (GET) ["/u/{did}", did: Did] => { + web_wrap(profile_handler(&srv, &did, request)) + }, + (GET) ["/u/{did}/{collection}/{tid}", did: Did, collection: Nsid, tid: Tid] => { + web_wrap(post_handler(&srv, &did, &collection, &tid, request)) + }, + (GET) ["/at/{did}", did: Did] => { + web_wrap(repo_handler(&srv, &did, request)) + }, + (GET) ["/at/{did}/{collection}", did: Did, collection: Nsid] => { + web_wrap(collection_handler(&srv, &did, &collection, request)) + }, + (GET) ["/at/{did}/{collection}/{tid}", did: Did, collection: Nsid, tid: Tid] => { + web_wrap(record_handler(&srv, &did, &collection, &tid, request)) + }, + (GET) ["/static/adenosine.css"] => { + Response::from_data("text/css", include_str!("../templates/adenosine.css")) + }, + (GET) ["/static/favicon.png"] => { + Response::from_data("image/png", include_bytes!("../templates/favicon.png").to_vec()) + }, + (GET) ["/static/logo_128.png"] => { + Response::from_data("image/png", include_bytes!("../templates/logo_128.png").to_vec()) + }, + (POST) ["/xrpc/{endpoint}", endpoint: String] => { + xrpc_wrap(xrpc_post_handler(&srv, &endpoint, request)) + }, + (GET) ["/xrpc/com.atproto.sync.getRepo"] => { + // this one endpoint returns CAR file, not JSON, so wrappers don't work + match xrpc_get_repo_handler(&srv, request) { + Ok(car_bytes) => Response::from_data("application/octet-stream", car_bytes), + Err(e) => { + let msg = e.to_string(); + let code = match e.downcast_ref::() { + Some(XrpcError::BadRequest(_)) => 400, + Some(XrpcError::NotFound(_)) => 404, + Some(XrpcError::Forbidden(_)) => 403, + None => 500, + }; + warn!("HTTP {}: {}", code, msg); + Response::json(&json!({ "message": msg })).with_status_code(code) + } + } + }, + (GET) ["/xrpc/{endpoint}", endpoint: String] => { + xrpc_wrap(xrpc_get_handler(&srv, &endpoint, request)) + }, + _ => Response::text("404: Not Found") + ) + }) + }); + } } /// Intentionally serializing with this instead of DAG-JSON, because ATP schemas don't encode CID @@ -331,6 +363,13 @@ fn xrpc_get_handler( } } +fn xrpc_get_repo_handler(srv: &Mutex, request: &Request) -> Result> { + let did = Did::from_str(&xrpc_required_param(request, "user")?)?; + let mut srv = srv.lock().expect("service mutex"); + let commit_cid = srv.repo.lookup_commit(&did)?.unwrap(); + Ok(srv.repo.export_car(&commit_cid, None)?) +} + fn xrpc_post_handler( srv: &Mutex, method: &str, @@ -356,7 +395,7 @@ fn xrpc_post_handler( // generate DID let create_op = did::CreateOp::new( req.handle.clone(), - srv.pds_public_url.clone(), + srv.config.public_url.clone(), &srv.pds_keypair, req.recoveryKey.clone(), ); @@ -504,6 +543,18 @@ fn xrpc_post_handler( // TODO: next handle updates to database Ok(json!({})) } + "com.atproto.sync.updateRepo" => { + let did = Did::from_str(&xrpc_required_param(request, "did")?)?; + // important that this read is before we take the mutex, because it could be slow! + let mut car_bytes: Vec = Default::default(); + // TODO: unwrap() + request.data().unwrap().read_to_end(&mut car_bytes)?; + let mut srv = srv.lock().unwrap(); + let _auth_did = &xrpc_check_auth_header(&mut srv, request, Some(&did))?; + srv.repo + .import_car_bytes(&car_bytes, Some(did.to_string()))?; + Ok(json!({})) + } _ => Err(anyhow!(XrpcError::NotFound(format!( "XRPC endpoint handler not found: {}", method -- cgit v1.2.3