diff options
Diffstat (limited to 'adenosine-pds')
| -rw-r--r-- | adenosine-pds/src/bin/adenosine-pds.rs | 29 | ||||
| -rw-r--r-- | adenosine-pds/src/lib.rs | 205 | 
2 files changed, 155 insertions, 79 deletions
| diff --git a/adenosine-pds/src/bin/adenosine-pds.rs b/adenosine-pds/src/bin/adenosine-pds.rs index f654dc1..3587896 100644 --- a/adenosine-pds/src/bin/adenosine-pds.rs +++ b/adenosine-pds/src/bin/adenosine-pds.rs @@ -54,6 +54,12 @@ enum Command {              hide_env_values = true          )]          pds_secret_key: String, + +        #[structopt(long = "--registration-domain", env = "ATP_PDS_REGISTRATION_DOMAIN")] +        registration_domain: Option<String>, + +        #[structopt(long = "--public-url", env = "ATP_PDS_PUBLIC_URL")] +        public_url: Option<String>,      },      /// Helper to import an IPLD CARv1 file in to sqlite data store @@ -98,10 +104,29 @@ fn main() -> Result<()> {          Command::Serve {              port,              pds_secret_key, +            registration_domain, +            public_url,          } => { -            // TODO: log some config stuff?              let keypair = KeyPair::from_hex(&pds_secret_key)?; -            run_server(port, &opt.blockstore_db_path, &opt.atp_db_path, keypair) +            // clean up config a bit +            let registration_domain = match registration_domain { +                None => None, +                Some(v) if v.is_empty() => None, +                Some(v) => Some(v), +            }; +            let public_url = match public_url { +                None => format!("http://localhost:{}", port), +                Some(v) if v.is_empty() => format!("http://localhost:{}", port), +                Some(v) => v, +            }; +            let config = AtpServiceConfig { +                listen_host_port: format!("localhost:{}", port), +                public_url: public_url, +                registration_domain: registration_domain, +            }; +            log::info!("PDS config: {:?}", config); +            let srv = AtpService::new(&opt.blockstore_db_path, &opt.atp_db_path, keypair, config)?; +            srv.run_server()          }          // TODO: handle alias          Command::Import { car_path, alias } => { diff --git a/adenosine-pds/src/lib.rs b/adenosine-pds/src/lib.rs index 009175d..3389281 100644 --- a/adenosine-pds/src/lib.rs +++ b/adenosine-pds/src/lib.rs @@ -8,6 +8,7 @@ use rouille::{router, Request, Response};  use serde_json::{json, Value};  use std::collections::BTreeMap;  use std::fmt; +use std::io::Read;  use std::path::PathBuf;  use std::str::FromStr;  use std::sync::Mutex; @@ -30,12 +31,19 @@ pub use repo::{Mutation, RepoCommit, RepoStore};  pub use ucan_p256::P256KeyMaterial;  use web::*; -struct AtpService { +pub struct AtpService {      pub repo: RepoStore,      pub atp_db: AtpDatabase,      pub pds_keypair: KeyPair, -    pub pds_public_url: String,      pub tid_gen: TidLord, +    pub config: AtpServiceConfig, +} + +#[derive(Clone, Debug)] +pub struct AtpServiceConfig { +    pub listen_host_port: String, +    pub public_url: String, +    pub registration_domain: Option<String>,  }  #[derive(Debug)] @@ -97,81 +105,105 @@ fn web_wrap(resp: Result<String>) -> Response {      }  } -pub fn run_server( -    port: u16, -    blockstore_db_path: &PathBuf, -    atp_db_path: &PathBuf, -    keypair: KeyPair, -) -> Result<()> { -    let srv = Mutex::new(AtpService { -        repo: RepoStore::open(blockstore_db_path)?, -        atp_db: AtpDatabase::open(atp_db_path)?, -        pds_keypair: keypair, -        pds_public_url: format!("http://localhost:{}", port), -        tid_gen: TidLord::new(), -    }); - -    let log_ok = |req: &Request, _resp: &Response, elap: std::time::Duration| { -        info!("{} {} ({:?})", req.method(), req.raw_url(), elap); -    }; -    let log_err = |req: &Request, elap: std::time::Duration| { -        error!( -            "HTTP handler panicked: {} {} ({:?})", -            req.method(), -            req.raw_url(), -            elap -        ); -    }; - -    // TODO: some static files? https://github.com/tomaka/rouille/blob/master/examples/static-files.rs -    rouille::start_server(format!("localhost:{}", port), move |request| { -        rouille::log_custom(request, log_ok, log_err, || { -            router!(request, -                (GET) ["/"] => { -                    let view = GenericHomeView { domain: "domain.todo".to_string() }; -                    Response::html(view.render().unwrap()) -                }, -                (GET) ["/about"] => { -                    let view = AboutView { domain: "domain.todo".to_string() }; -                    Response::html(view.render().unwrap()) -                }, -                (GET) ["/robots.txt"] => { -                    Response::text(include_str!("../templates/robots.txt")) -                }, -                (GET) ["/u/{did}", did: Did] => { -                    web_wrap(profile_handler(&srv, &did, request)) -                }, -                (GET) ["/u/{did}/{collection}/{tid}", did: Did, collection: Nsid, tid: Tid] => { -                    web_wrap(post_handler(&srv, &did, &collection, &tid, request)) -                }, -                (GET) ["/at/{did}", did: Did] => { -                    web_wrap(repo_handler(&srv, &did, request)) -                }, -                (GET) ["/at/{did}/{collection}", did: Did, collection: Nsid] => { -                    web_wrap(collection_handler(&srv, &did, &collection, request)) -                }, -                (GET) ["/at/{did}/{collection}/{tid}", did: Did, collection: Nsid, tid: Tid] => { -                    web_wrap(record_handler(&srv, &did, &collection, &tid, request)) -                }, -                (GET) ["/static/adenosine.css"] => { -                    Response::from_data("text/css", include_str!("../templates/adenosine.css")) -                }, -                (GET) ["/static/favicon.png"] => { -                    Response::from_data("image/png", include_bytes!("../templates/favicon.png").to_vec()) -                }, -                (GET) ["/static/logo_128.png"] => { -                    Response::from_data("image/png", include_bytes!("../templates/logo_128.png").to_vec()) -                }, -                (POST) ["/xrpc/{endpoint}", endpoint: String] => { -                    xrpc_wrap(xrpc_post_handler(&srv, &endpoint, request)) -                }, -                (GET) ["/xrpc/{endpoint}", endpoint: String] => { -                    xrpc_wrap(xrpc_get_handler(&srv, &endpoint, request)) -                }, -                _ => Response::text("404: Not Found") -            ) +impl AtpService { +    pub fn new( +        blockstore_db_path: &PathBuf, +        atp_db_path: &PathBuf, +        keypair: KeyPair, +        config: AtpServiceConfig, +    ) -> Result<Self> { +        Ok(AtpService { +            repo: RepoStore::open(blockstore_db_path)?, +            atp_db: AtpDatabase::open(atp_db_path)?, +            pds_keypair: keypair, +            tid_gen: TidLord::new(), +            config,          }) -    }); +    } + +    pub fn run_server(self) -> Result<()> { +        let config = self.config.clone(); +        let srv = Mutex::new(self); + +        let log_ok = |req: &Request, _resp: &Response, elap: std::time::Duration| { +            info!("{} {} ({:?})", req.method(), req.raw_url(), elap); +        }; +        let log_err = |req: &Request, elap: std::time::Duration| { +            error!( +                "HTTP handler panicked: {} {} ({:?})", +                req.method(), +                req.raw_url(), +                elap +            ); +        }; + +        // TODO: some static files? https://github.com/tomaka/rouille/blob/master/examples/static-files.rs +        rouille::start_server(config.listen_host_port, move |request| { +            rouille::log_custom(request, log_ok, log_err, || { +                router!(request, +                    (GET) ["/"] => { +                        let view = GenericHomeView { domain: "domain.todo".to_string() }; +                        Response::html(view.render().unwrap()) +                    }, +                    (GET) ["/about"] => { +                        let view = AboutView { domain: "domain.todo".to_string() }; +                        Response::html(view.render().unwrap()) +                    }, +                    (GET) ["/robots.txt"] => { +                        Response::text(include_str!("../templates/robots.txt")) +                    }, +                    (GET) ["/u/{did}", did: Did] => { +                        web_wrap(profile_handler(&srv, &did, request)) +                    }, +                    (GET) ["/u/{did}/{collection}/{tid}", did: Did, collection: Nsid, tid: Tid] => { +                        web_wrap(post_handler(&srv, &did, &collection, &tid, request)) +                    }, +                    (GET) ["/at/{did}", did: Did] => { +                        web_wrap(repo_handler(&srv, &did, request)) +                    }, +                    (GET) ["/at/{did}/{collection}", did: Did, collection: Nsid] => { +                        web_wrap(collection_handler(&srv, &did, &collection, request)) +                    }, +                    (GET) ["/at/{did}/{collection}/{tid}", did: Did, collection: Nsid, tid: Tid] => { +                        web_wrap(record_handler(&srv, &did, &collection, &tid, request)) +                    }, +                    (GET) ["/static/adenosine.css"] => { +                        Response::from_data("text/css", include_str!("../templates/adenosine.css")) +                    }, +                    (GET) ["/static/favicon.png"] => { +                        Response::from_data("image/png", include_bytes!("../templates/favicon.png").to_vec()) +                    }, +                    (GET) ["/static/logo_128.png"] => { +                        Response::from_data("image/png", include_bytes!("../templates/logo_128.png").to_vec()) +                    }, +                    (POST) ["/xrpc/{endpoint}", endpoint: String] => { +                        xrpc_wrap(xrpc_post_handler(&srv, &endpoint, request)) +                    }, +                    (GET) ["/xrpc/com.atproto.sync.getRepo"] => { +                        // this one endpoint returns CAR file, not JSON, so wrappers don't work +                        match xrpc_get_repo_handler(&srv, request) { +                            Ok(car_bytes) => Response::from_data("application/octet-stream", car_bytes), +                            Err(e) => { +                                let msg = e.to_string(); +                                let code = match e.downcast_ref::<XrpcError>() { +                                    Some(XrpcError::BadRequest(_)) => 400, +                                    Some(XrpcError::NotFound(_)) => 404, +                                    Some(XrpcError::Forbidden(_)) => 403, +                                    None => 500, +                                }; +                                warn!("HTTP {}: {}", code, msg); +                                Response::json(&json!({ "message": msg })).with_status_code(code) +                            } +                        } +                    }, +                    (GET) ["/xrpc/{endpoint}", endpoint: String] => { +                        xrpc_wrap(xrpc_get_handler(&srv, &endpoint, request)) +                    }, +                    _ => Response::text("404: Not Found") +                ) +            }) +        }); +    }  }  /// Intentionally serializing with this instead of DAG-JSON, because ATP schemas don't encode CID @@ -331,6 +363,13 @@ fn xrpc_get_handler(      }  } +fn xrpc_get_repo_handler(srv: &Mutex<AtpService>, request: &Request) -> Result<Vec<u8>> { +    let did = Did::from_str(&xrpc_required_param(request, "user")?)?; +    let mut srv = srv.lock().expect("service mutex"); +    let commit_cid = srv.repo.lookup_commit(&did)?.unwrap(); +    Ok(srv.repo.export_car(&commit_cid, None)?) +} +  fn xrpc_post_handler(      srv: &Mutex<AtpService>,      method: &str, @@ -356,7 +395,7 @@ fn xrpc_post_handler(              // generate DID              let create_op = did::CreateOp::new(                  req.handle.clone(), -                srv.pds_public_url.clone(), +                srv.config.public_url.clone(),                  &srv.pds_keypair,                  req.recoveryKey.clone(),              ); @@ -504,6 +543,18 @@ fn xrpc_post_handler(              // TODO: next handle updates to database              Ok(json!({}))          } +        "com.atproto.sync.updateRepo" => { +            let did = Did::from_str(&xrpc_required_param(request, "did")?)?; +            // important that this read is before we take the mutex, because it could be slow! +            let mut car_bytes: Vec<u8> = Default::default(); +            // TODO: unwrap() +            request.data().unwrap().read_to_end(&mut car_bytes)?; +            let mut srv = srv.lock().unwrap(); +            let _auth_did = &xrpc_check_auth_header(&mut srv, request, Some(&did))?; +            srv.repo +                .import_car_bytes(&car_bytes, Some(did.to_string()))?; +            Ok(json!({})) +        }          _ => Err(anyhow!(XrpcError::NotFound(format!(              "XRPC endpoint handler not found: {}",              method | 
