diff options
| author | Bryan Newbold <bnewbold@robocracy.org> | 2022-11-02 01:39:19 -0700 | 
|---|---|---|
| committer | Bryan Newbold <bnewbold@robocracy.org> | 2022-11-02 01:39:19 -0700 | 
| commit | 5a9333475518c3a7d9396d120e07d813252a0a09 (patch) | |
| tree | d38b3faa7e334e7674f6819f855b5dbbcb9eb180 | |
| parent | f3fdc7ba600c65fdb05efc2fd5e9a651b4b9956e (diff) | |
| download | adenosine-5a9333475518c3a7d9396d120e07d813252a0a09.tar.gz adenosine-5a9333475518c3a7d9396d120e07d813252a0a09.zip | |
pds: refactoring
| -rw-r--r-- | adenosine-pds/src/bin/adenosine-pds.rs | 8 | ||||
| -rw-r--r-- | adenosine-pds/src/db.rs | 2 | ||||
| -rw-r--r-- | adenosine-pds/src/lib.rs | 120 | ||||
| -rw-r--r-- | adenosine-pds/src/mst.rs | 19 | ||||
| -rw-r--r-- | adenosine-pds/src/repo.rs | 8 | 
5 files changed, 129 insertions, 28 deletions
| diff --git a/adenosine-pds/src/bin/adenosine-pds.rs b/adenosine-pds/src/bin/adenosine-pds.rs index 0159ab7..cfae0ca 100644 --- a/adenosine-pds/src/bin/adenosine-pds.rs +++ b/adenosine-pds/src/bin/adenosine-pds.rs @@ -51,6 +51,9 @@ enum Command {      Import {          /// CARv1 file path to import from          car_path: std::path::PathBuf, + +        /// name of pointer to root of CAR DAG tree. Usually a DID +        alias: String,      },      /// Helper to print MST keys/docs from a sqlite repo @@ -82,7 +85,10 @@ fn main() -> Result<()> {              // TODO: log some config stuff?              run_server(port, &opt.blockstore_db_path, &opt.atp_db_path)          } -        Command::Import { car_path } => load_car_to_sqlite(&opt.blockstore_db_path, &car_path), +        // TODO: handle alias +        Command::Import { car_path, alias } => { +            load_car_to_sqlite(&opt.blockstore_db_path, &car_path) +        }          Command::Inspect {} => mst::dump_mst_keys(&opt.blockstore_db_path),      }  } diff --git a/adenosine-pds/src/db.rs b/adenosine-pds/src/db.rs index 905210c..35c798d 100644 --- a/adenosine-pds/src/db.rs +++ b/adenosine-pds/src/db.rs @@ -2,6 +2,7 @@ use crate::AtpSession;  /// ATP database (as distinct from blockstore)  use anyhow::{anyhow, Result};  use lazy_static::lazy_static; +use log::debug;  use rusqlite::{params, Connection};  use rusqlite_migration::{Migrations, M};  use serde_json::Value; @@ -107,6 +108,7 @@ impl AtpDatabase {          // TODO: insert did_doc          // TODO: also need to initialize repo with... profile?          { +            debug!("bcrypt hashing password (can be slow)...");              let password_bcrypt = bcrypt::hash(password, bcrypt::DEFAULT_COST)?;              let signing_key = "key:TODO";              let did = "did:TODO"; diff --git a/adenosine-pds/src/lib.rs b/adenosine-pds/src/lib.rs index 0ef081d..e52cb69 100644 --- a/adenosine-pds/src/lib.rs +++ b/adenosine-pds/src/lib.rs @@ -1,6 +1,8 @@ -use anyhow::Result; +use anyhow::{anyhow, Result};  use log::{error, info}; -use rouille::{router, Request, Response}; +use rouille::{router, try_or_400, Request, Response}; +use serde_json::json; +use std::fmt;  use std::path::PathBuf;  use std::sync::Mutex; @@ -17,13 +19,61 @@ pub use db::AtpDatabase;  pub use models::*;  pub use repo::{RepoCommit, RepoStore}; +#[derive(Debug, serde::Deserialize, serde::Serialize, PartialEq, Eq)] +struct AccountRequest { +    email: String, +    username: String, +    password: String, +} + +struct AtpService { +    pub repo: RepoStore, +    pub atp_db: AtpDatabase, +} + +#[derive(Debug)] +enum XrpcError { +    BadRequest(String), +    NotFound(String), +    Forbidden(String), +} + +impl std::error::Error for XrpcError {} + +impl fmt::Display for XrpcError { +    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { +        match self { +            Self::BadRequest(msg) | Self::NotFound(msg) | Self::Forbidden(msg) => { +                write!(f, "{}", msg) +            } +        } +    } +} + +/// Helper to take an XRPC result (always a JSON object), and transform it to a rouille response +fn xrpc_wrap<S: serde::Serialize>(resp: Result<S>) -> Response { +    match resp { +        Ok(val) => Response::json(&val), +        Err(e) => { +            let msg = e.to_string(); +            let code = match e.downcast_ref::<XrpcError>() { +                Some(XrpcError::BadRequest(_)) => 400, +                Some(XrpcError::NotFound(_)) => 404, +                Some(XrpcError::Forbidden(_)) => 403, +                None => 500, +            }; +            Response::json(&json!({ "message": msg })).with_status_code(code) +        } +    } +} +  pub fn run_server(port: u16, blockstore_db_path: &PathBuf, atp_db_path: &PathBuf) -> Result<()> {      // TODO: some static files? https://github.com/tomaka/rouille/blob/master/examples/static-files.rs -    // TODO: could just open connection on every request? -    let db = Mutex::new(AtpDatabase::open(atp_db_path)?); -    let mut _blockstore: BlockStore<libipld::DefaultParams> = -        BlockStore::open(blockstore_db_path, Default::default())?; +    let srv = Mutex::new(AtpService { +        repo: RepoStore::open(blockstore_db_path)?, +        atp_db: AtpDatabase::open(atp_db_path)?, +    });      let log_ok = |req: &Request, _resp: &Response, elap: std::time::Duration| {          info!("{} {} ({:?})", req.method(), req.raw_url(), elap); @@ -42,23 +92,55 @@ pub fn run_server(port: u16, blockstore_db_path: &PathBuf, atp_db_path: &PathBuf                  (GET) ["/"] => {                      Response::text("Not much to see here yet!")                  }, -                (GET) ["/xrpc/some.method"] => { -                    Response::text("didn't get a thing") -                    // TODO: reply with query params as a JSON body +                (POST) ["/xrpc/com.atproto.createAccount"] => { +                    let req: AccountRequest = try_or_400!(rouille::input::json_input(request)); +                    let mut srv = srv.lock().unwrap(); +                    xrpc_wrap(srv.atp_db.create_account(&req.username, &req.password, &req.email))                  }, -                (POST) ["/xrpc/other.method"] => { -                    Response::text("didn't get other thing") -                    // TODO: parse and echo back JSON body -                }, - -                (GET) ["/xrpc/com.atproto.getRecord"] => { -                    // TODO: JSON response -                    // TODO: handle error -                    let mut db = db.lock().unwrap().new_connection().unwrap(); -                    Response::text(db.get_record("asdf", "123", "blah").unwrap().to_string()) +                (GET) ["/xrpc/com.atproto.{endpoint}", endpoint: String] => { +                    xrpc_wrap(xrpc_get_atproto(&srv, &endpoint, request))                  },                  _ => rouille::Response::empty_404()              )          })      });  } + +fn xrpc_get_atproto( +    srv: &Mutex<AtpService>, +    method: &str, +    request: &Request, +) -> Result<serde_json::Value> { +    match method { +        "getRecord" => { +            let did = request.get_param("user").unwrap(); +            let collection = request.get_param("collection").unwrap(); +            let rkey = request.get_param("rkey").unwrap(); +            let repo_key = format!("/{}/{}", collection, rkey); +            let mut srv = srv.lock().expect("service mutex"); +            let commit_cid = srv.repo.lookup_commit(&did)?.unwrap(); +            let key = format!("/{}/{}", collection, rkey); +            match srv.repo.get_record_by_key(&commit_cid, &key) { +                // TODO: format as JSON, not text debug +                Ok(Some(ipld)) => Ok(json!({ "thing": format!("{:?}", ipld) })), +                Ok(None) => Err(anyhow!(XrpcError::NotFound(format!( +                    "could not find record: {}", +                    key +                )))), +                Err(e) => Err(e), +            } +        } +        "syncGetRoot" => { +            let did = request.get_param("did").unwrap(); +            let mut srv = srv.lock().expect("service mutex"); +            srv.repo +                .lookup_commit(&did)? +                .map(|v| json!({ "root": v })) +                .ok_or(anyhow!("XXX: missing")) +        } +        _ => Err(anyhow!(XrpcError::NotFound(format!( +            "XRPC endpoint handler not found: com.atproto.{}", +            method +        )))), +    } +} diff --git a/adenosine-pds/src/mst.rs b/adenosine-pds/src/mst.rs index 3e01a92..db5e457 100644 --- a/adenosine-pds/src/mst.rs +++ b/adenosine-pds/src/mst.rs @@ -76,7 +76,7 @@ fn print_mst_keys(db: &mut BlockStore<libipld::DefaultParams>, cid: &Cid) -> Res      let mut key: String = "".to_string();      for entry in node.e.iter() {          key = format!("{}{}", &key[0..entry.p as usize], entry.k); -        println!("{}\t-> {}", key, entry.v); +        println!("\t{}\t-> {}", key, entry.v);          if let Some(ref right) = entry.t {              print_mst_keys(db, right)?;          } @@ -92,12 +92,16 @@ pub fn dump_mst_keys(db_path: &PathBuf) -> Result<()> {          error!("expected at least one alias in block store");          std::process::exit(-1);      } -    let (alias, commit_cid) = all_aliases[0].clone(); -    info!( -        "starting from {} [{}]", -        commit_cid, -        String::from_utf8_lossy(&alias) -    ); + +    // print all the aliases +    for (alias, commit_cid) in all_aliases.iter() { +        let did = String::from_utf8_lossy(&alias); +        println!("{} -> {}", did, commit_cid); +    } + +    let (did, commit_cid) = all_aliases[0].clone(); +    let did = String::from_utf8_lossy(&did); +    info!("starting from {} [{}]", commit_cid, did);      // NOTE: the faster way to develop would have been to decode to libipld::ipld::Ipld first? meh @@ -128,6 +132,7 @@ pub fn dump_mst_keys(db_path: &PathBuf) -> Result<()> {      debug!("MST root node: {:?}", mst_node);      debug!("============"); +    println!("{}", did);      print_mst_keys(&mut db, &root.data)?;      Ok(())  } diff --git a/adenosine-pds/src/repo.rs b/adenosine-pds/src/repo.rs index e5de504..713210e 100644 --- a/adenosine-pds/src/repo.rs +++ b/adenosine-pds/src/repo.rs @@ -35,6 +35,12 @@ impl RepoStore {          })      } +    pub fn new_connection(&mut self) -> Result<Self> { +        Ok(RepoStore { +            db: self.db.additional_connection()?, +        }) +    } +      pub fn get_ipld(&mut self, cid: &str) -> Result<Ipld> {          let ipld_cid = Cid::from_str(cid)?;          if let Some(b) = self.db.get_block(&ipld_cid)? { @@ -67,7 +73,7 @@ impl RepoStore {      }      /// Quick alias lookup -    pub fn get_root(&mut self, did: &str) -> Result<Option<String>> { +    pub fn lookup_commit(&mut self, did: &str) -> Result<Option<String>> {          Ok(self              .db              .resolve(Cow::from(did.as_bytes()))? | 
