From 5a9333475518c3a7d9396d120e07d813252a0a09 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Wed, 2 Nov 2022 01:39:19 -0700 Subject: pds: refactoring --- adenosine-pds/src/bin/adenosine-pds.rs | 8 ++- adenosine-pds/src/db.rs | 2 + adenosine-pds/src/lib.rs | 120 +++++++++++++++++++++++++++------ adenosine-pds/src/mst.rs | 19 ++++-- adenosine-pds/src/repo.rs | 8 ++- 5 files changed, 129 insertions(+), 28 deletions(-) diff --git a/adenosine-pds/src/bin/adenosine-pds.rs b/adenosine-pds/src/bin/adenosine-pds.rs index 0159ab7..cfae0ca 100644 --- a/adenosine-pds/src/bin/adenosine-pds.rs +++ b/adenosine-pds/src/bin/adenosine-pds.rs @@ -51,6 +51,9 @@ enum Command { Import { /// CARv1 file path to import from car_path: std::path::PathBuf, + + /// name of pointer to root of CAR DAG tree. Usually a DID + alias: String, }, /// Helper to print MST keys/docs from a sqlite repo @@ -82,7 +85,10 @@ fn main() -> Result<()> { // TODO: log some config stuff? run_server(port, &opt.blockstore_db_path, &opt.atp_db_path) } - Command::Import { car_path } => load_car_to_sqlite(&opt.blockstore_db_path, &car_path), + // TODO: handle alias + Command::Import { car_path, alias } => { + load_car_to_sqlite(&opt.blockstore_db_path, &car_path) + } Command::Inspect {} => mst::dump_mst_keys(&opt.blockstore_db_path), } } diff --git a/adenosine-pds/src/db.rs b/adenosine-pds/src/db.rs index 905210c..35c798d 100644 --- a/adenosine-pds/src/db.rs +++ b/adenosine-pds/src/db.rs @@ -2,6 +2,7 @@ use crate::AtpSession; /// ATP database (as distinct from blockstore) use anyhow::{anyhow, Result}; use lazy_static::lazy_static; +use log::debug; use rusqlite::{params, Connection}; use rusqlite_migration::{Migrations, M}; use serde_json::Value; @@ -107,6 +108,7 @@ impl AtpDatabase { // TODO: insert did_doc // TODO: also need to initialize repo with... profile? { + debug!("bcrypt hashing password (can be slow)..."); let password_bcrypt = bcrypt::hash(password, bcrypt::DEFAULT_COST)?; let signing_key = "key:TODO"; let did = "did:TODO"; diff --git a/adenosine-pds/src/lib.rs b/adenosine-pds/src/lib.rs index 0ef081d..e52cb69 100644 --- a/adenosine-pds/src/lib.rs +++ b/adenosine-pds/src/lib.rs @@ -1,6 +1,8 @@ -use anyhow::Result; +use anyhow::{anyhow, Result}; use log::{error, info}; -use rouille::{router, Request, Response}; +use rouille::{router, try_or_400, Request, Response}; +use serde_json::json; +use std::fmt; use std::path::PathBuf; use std::sync::Mutex; @@ -17,13 +19,61 @@ pub use db::AtpDatabase; pub use models::*; pub use repo::{RepoCommit, RepoStore}; +#[derive(Debug, serde::Deserialize, serde::Serialize, PartialEq, Eq)] +struct AccountRequest { + email: String, + username: String, + password: String, +} + +struct AtpService { + pub repo: RepoStore, + pub atp_db: AtpDatabase, +} + +#[derive(Debug)] +enum XrpcError { + BadRequest(String), + NotFound(String), + Forbidden(String), +} + +impl std::error::Error for XrpcError {} + +impl fmt::Display for XrpcError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::BadRequest(msg) | Self::NotFound(msg) | Self::Forbidden(msg) => { + write!(f, "{}", msg) + } + } + } +} + +/// Helper to take an XRPC result (always a JSON object), and transform it to a rouille response +fn xrpc_wrap(resp: Result) -> Response { + match resp { + Ok(val) => Response::json(&val), + Err(e) => { + let msg = e.to_string(); + let code = match e.downcast_ref::() { + Some(XrpcError::BadRequest(_)) => 400, + Some(XrpcError::NotFound(_)) => 404, + Some(XrpcError::Forbidden(_)) => 403, + None => 500, + }; + Response::json(&json!({ "message": msg })).with_status_code(code) + } + } +} + pub fn run_server(port: u16, blockstore_db_path: &PathBuf, atp_db_path: &PathBuf) -> Result<()> { // TODO: some static files? https://github.com/tomaka/rouille/blob/master/examples/static-files.rs - // TODO: could just open connection on every request? - let db = Mutex::new(AtpDatabase::open(atp_db_path)?); - let mut _blockstore: BlockStore = - BlockStore::open(blockstore_db_path, Default::default())?; + let srv = Mutex::new(AtpService { + repo: RepoStore::open(blockstore_db_path)?, + atp_db: AtpDatabase::open(atp_db_path)?, + }); let log_ok = |req: &Request, _resp: &Response, elap: std::time::Duration| { info!("{} {} ({:?})", req.method(), req.raw_url(), elap); @@ -42,23 +92,55 @@ pub fn run_server(port: u16, blockstore_db_path: &PathBuf, atp_db_path: &PathBuf (GET) ["/"] => { Response::text("Not much to see here yet!") }, - (GET) ["/xrpc/some.method"] => { - Response::text("didn't get a thing") - // TODO: reply with query params as a JSON body + (POST) ["/xrpc/com.atproto.createAccount"] => { + let req: AccountRequest = try_or_400!(rouille::input::json_input(request)); + let mut srv = srv.lock().unwrap(); + xrpc_wrap(srv.atp_db.create_account(&req.username, &req.password, &req.email)) }, - (POST) ["/xrpc/other.method"] => { - Response::text("didn't get other thing") - // TODO: parse and echo back JSON body - }, - - (GET) ["/xrpc/com.atproto.getRecord"] => { - // TODO: JSON response - // TODO: handle error - let mut db = db.lock().unwrap().new_connection().unwrap(); - Response::text(db.get_record("asdf", "123", "blah").unwrap().to_string()) + (GET) ["/xrpc/com.atproto.{endpoint}", endpoint: String] => { + xrpc_wrap(xrpc_get_atproto(&srv, &endpoint, request)) }, _ => rouille::Response::empty_404() ) }) }); } + +fn xrpc_get_atproto( + srv: &Mutex, + method: &str, + request: &Request, +) -> Result { + match method { + "getRecord" => { + let did = request.get_param("user").unwrap(); + let collection = request.get_param("collection").unwrap(); + let rkey = request.get_param("rkey").unwrap(); + let repo_key = format!("/{}/{}", collection, rkey); + let mut srv = srv.lock().expect("service mutex"); + let commit_cid = srv.repo.lookup_commit(&did)?.unwrap(); + let key = format!("/{}/{}", collection, rkey); + match srv.repo.get_record_by_key(&commit_cid, &key) { + // TODO: format as JSON, not text debug + Ok(Some(ipld)) => Ok(json!({ "thing": format!("{:?}", ipld) })), + Ok(None) => Err(anyhow!(XrpcError::NotFound(format!( + "could not find record: {}", + key + )))), + Err(e) => Err(e), + } + } + "syncGetRoot" => { + let did = request.get_param("did").unwrap(); + let mut srv = srv.lock().expect("service mutex"); + srv.repo + .lookup_commit(&did)? + .map(|v| json!({ "root": v })) + .ok_or(anyhow!("XXX: missing")) + } + _ => Err(anyhow!(XrpcError::NotFound(format!( + "XRPC endpoint handler not found: com.atproto.{}", + method + )))), + } +} diff --git a/adenosine-pds/src/mst.rs b/adenosine-pds/src/mst.rs index 3e01a92..db5e457 100644 --- a/adenosine-pds/src/mst.rs +++ b/adenosine-pds/src/mst.rs @@ -76,7 +76,7 @@ fn print_mst_keys(db: &mut BlockStore, cid: &Cid) -> Res let mut key: String = "".to_string(); for entry in node.e.iter() { key = format!("{}{}", &key[0..entry.p as usize], entry.k); - println!("{}\t-> {}", key, entry.v); + println!("\t{}\t-> {}", key, entry.v); if let Some(ref right) = entry.t { print_mst_keys(db, right)?; } @@ -92,12 +92,16 @@ pub fn dump_mst_keys(db_path: &PathBuf) -> Result<()> { error!("expected at least one alias in block store"); std::process::exit(-1); } - let (alias, commit_cid) = all_aliases[0].clone(); - info!( - "starting from {} [{}]", - commit_cid, - String::from_utf8_lossy(&alias) - ); + + // print all the aliases + for (alias, commit_cid) in all_aliases.iter() { + let did = String::from_utf8_lossy(&alias); + println!("{} -> {}", did, commit_cid); + } + + let (did, commit_cid) = all_aliases[0].clone(); + let did = String::from_utf8_lossy(&did); + info!("starting from {} [{}]", commit_cid, did); // NOTE: the faster way to develop would have been to decode to libipld::ipld::Ipld first? meh @@ -128,6 +132,7 @@ pub fn dump_mst_keys(db_path: &PathBuf) -> Result<()> { debug!("MST root node: {:?}", mst_node); debug!("============"); + println!("{}", did); print_mst_keys(&mut db, &root.data)?; Ok(()) } diff --git a/adenosine-pds/src/repo.rs b/adenosine-pds/src/repo.rs index e5de504..713210e 100644 --- a/adenosine-pds/src/repo.rs +++ b/adenosine-pds/src/repo.rs @@ -35,6 +35,12 @@ impl RepoStore { }) } + pub fn new_connection(&mut self) -> Result { + Ok(RepoStore { + db: self.db.additional_connection()?, + }) + } + pub fn get_ipld(&mut self, cid: &str) -> Result { let ipld_cid = Cid::from_str(cid)?; if let Some(b) = self.db.get_block(&ipld_cid)? { @@ -67,7 +73,7 @@ impl RepoStore { } /// Quick alias lookup - pub fn get_root(&mut self, did: &str) -> Result> { + pub fn lookup_commit(&mut self, did: &str) -> Result> { Ok(self .db .resolve(Cow::from(did.as_bytes()))? -- cgit v1.2.3