From bc8493998d90799551c5e0703bbb4a6e69d2478a Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Fri, 4 Nov 2022 19:16:01 -0700 Subject: pds: basic repo CRUD coming together --- adenosine-pds/src/crypto.rs | 9 ++- adenosine-pds/src/db.rs | 7 +- adenosine-pds/src/lib.rs | 170 +++++++++++++++++++++++++++++++++++++++----- adenosine-pds/src/models.rs | 16 +++++ adenosine-pds/src/repo.rs | 36 +++++++++- 5 files changed, 214 insertions(+), 24 deletions(-) (limited to 'adenosine-pds/src') diff --git a/adenosine-pds/src/crypto.rs b/adenosine-pds/src/crypto.rs index 0720c07..1fa6f4c 100644 --- a/adenosine-pds/src/crypto.rs +++ b/adenosine-pds/src/crypto.rs @@ -6,7 +6,6 @@ use p256; use p256::ecdsa::signature::{Signer, Verifier}; use std::str::FromStr; use ucan::builder::UcanBuilder; -use ucan::crypto::KeyMaterial; // Need to: // @@ -66,12 +65,12 @@ impl KeyPair { } /// This is currently just an un-validated token; we don't actually verify these. - pub fn ucan(&self) -> Result { + pub fn ucan(&self, did: &str) -> Result { let key_material = self.ucan_keymaterial(); let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build()?; - rt.block_on(build_ucan(key_material)) + rt.block_on(build_ucan(key_material, did)) } pub fn to_hex(&self) -> String { @@ -85,10 +84,10 @@ impl KeyPair { } } -async fn build_ucan(key_material: P256KeyMaterial) -> Result { +async fn build_ucan(key_material: P256KeyMaterial, did: &str) -> Result { let token_string = UcanBuilder::default() .issued_by(&key_material) - .for_audience(key_material.get_did().await.unwrap().as_str()) + .for_audience(did) .with_nonce() .with_lifetime(60 * 60 * 24 * 90) .build()? diff --git a/adenosine-pds/src/db.rs b/adenosine-pds/src/db.rs index 17006cb..e6b957c 100644 --- a/adenosine-pds/src/db.rs +++ b/adenosine-pds/src/db.rs @@ -9,6 +9,9 @@ use serde_json::Value; use std::path::PathBuf; use std::str::FromStr; +/// Default is 12, but that is quite slow (on my laptop at least) +const BCRYPT_COST: u32 = 8; + #[cfg(test)] mod tests { use super::*; @@ -78,7 +81,7 @@ impl AtpDatabase { recovery_pubkey: &str, ) -> Result<()> { debug!("bcrypt hashing password (can be slow)..."); - let password_bcrypt = bcrypt::hash(password, bcrypt::DEFAULT_COST)?; + let password_bcrypt = bcrypt::hash(password, BCRYPT_COST)?; let mut stmt = self.conn.prepare_cached( "INSERT INTO account (username, password_bcrypt, email, did, recovery_pubkey) VALUES (?1, ?2, ?3, ?4, ?5)", )?; @@ -107,7 +110,7 @@ impl AtpDatabase { if !bcrypt::verify(password, &password_bcrypt)? { return Err(anyhow!("password did not match")); } - let jwt = keypair.ucan()?; + let jwt = keypair.ucan(&did)?; let mut stmt = self .conn .prepare_cached("INSERT INTO session (did, jwt) VALUES (?1, ?2)")?; diff --git a/adenosine-pds/src/lib.rs b/adenosine-pds/src/lib.rs index 0d73881..913b089 100644 --- a/adenosine-pds/src/lib.rs +++ b/adenosine-pds/src/lib.rs @@ -1,10 +1,15 @@ +use adenosine_cli::{AtUri, Did, Nsid, Tid, TidLord}; +use anyhow::Context; use anyhow::{anyhow, Result}; +use libipld::Cid; use libipld::Ipld; use log::{debug, error, info, warn}; use rouille::{router, Request, Response}; use serde_json::{json, Value}; +use std::collections::BTreeMap; use std::fmt; use std::path::PathBuf; +use std::str::FromStr; use std::sync::Mutex; mod car; @@ -20,7 +25,7 @@ pub use car::{load_car_to_blockstore, load_car_to_sqlite}; pub use crypto::{KeyPair, PubKey}; pub use db::AtpDatabase; pub use models::*; -pub use repo::{RepoCommit, RepoStore}; +pub use repo::{Mutation, RepoCommit, RepoStore}; pub use ucan_p256::P256KeyMaterial; struct AtpService { @@ -28,6 +33,7 @@ struct AtpService { pub atp_db: AtpDatabase, pub pds_keypair: KeyPair, pub pds_public_url: String, + pub tid_gen: TidLord, } #[derive(Debug)] @@ -78,6 +84,7 @@ pub fn run_server( atp_db: AtpDatabase::open(atp_db_path)?, pds_keypair: keypair, pds_public_url: format!("http://localhost:{}", port).to_string(), + tid_gen: TidLord::new(), }); let log_ok = |req: &Request, _resp: &Response, elap: std::time::Duration| { @@ -130,6 +137,31 @@ fn ipld_into_json_value(val: Ipld) -> Value { } } +/// Crude reverse generation +/// +/// Does not handle base64 to bytes, and the link generation is pretty simple (object elements with +/// key "car"). Numbers always come through as f64 (float). +fn json_value_into_ipld(val: Value) -> Ipld { + match val { + Value::Null => Ipld::Null, + Value::Bool(b) => Ipld::Bool(b), + Value::String(s) => Ipld::String(s), + // TODO: handle numbers better? + Value::Number(v) => Ipld::Float(v.as_f64().unwrap()), + Value::Array(l) => Ipld::List(l.into_iter().map(|v| json_value_into_ipld(v)).collect()), + Value::Object(m) => { + let map: BTreeMap = BTreeMap::from_iter(m.into_iter().map(|(k, v)| { + if k == "car" && v.is_string() { + (k, Ipld::Link(Cid::from_str(v.as_str().unwrap()).unwrap())) + } else { + (k, json_value_into_ipld(v)) + } + })); + Ipld::Map(map) + } + } +} + fn xrpc_required_param(request: &Request, key: &str) -> Result { Ok(request.get_param(key).ok_or(XrpcError::BadRequest(format!( "require '{}' query parameter", @@ -138,7 +170,11 @@ fn xrpc_required_param(request: &Request, key: &str) -> Result { } /// Returns DID of validated user -fn xrpc_check_auth_header(srv: &mut AtpService, request: &Request) -> Result { +fn xrpc_check_auth_header( + srv: &mut AtpService, + request: &Request, + req_did: Option<&Did>, +) -> Result { let header = request .header("Authorization") .ok_or(XrpcError::Forbidden(format!("require auth header")))?; @@ -146,10 +182,17 @@ fn xrpc_check_auth_header(srv: &mut AtpService, request: &Request) -> Result Ok(did), + let did = match srv.atp_db.check_auth_token(&jwt)? { + Some(did) => did, None => Err(XrpcError::Forbidden(format!("session token not found")))?, + }; + let did = Did::from_str(&did)?; + if req_did.is_some() && Some(&did) != req_did { + Err(XrpcError::Forbidden(format!( + "can only modify your own repo" + )))?; } + Ok(did) } fn xrpc_get_handler( @@ -161,8 +204,8 @@ fn xrpc_get_handler( "com.atproto.getAccountsConfig" => { Ok(json!({"availableUserDomains": ["test"], "inviteCodeRequired": false})) } - "com.atproto.getRecord" => { - let did = xrpc_required_param(request, "did")?; + "com.atproto.repoGetRecord" => { + let did = Did::from_str(&xrpc_required_param(request, "user")?)?; let collection = xrpc_required_param(request, "collection")?; let rkey = xrpc_required_param(request, "rkey")?; let mut srv = srv.lock().expect("service mutex"); @@ -178,7 +221,7 @@ fn xrpc_get_handler( } } "com.atproto.syncGetRoot" => { - let did = xrpc_required_param(request, "did")?; + let did = Did::from_str(&xrpc_required_param(request, "did")?)?; let mut srv = srv.lock().expect("service mutex"); srv.repo .lookup_commit(&did)? @@ -188,15 +231,17 @@ fn xrpc_get_handler( "com.atproto.repoListRecords" => { // TODO: limit, before, after, tid, reverse // TODO: handle non-DID 'user' - // TODO: validate 'collection' as an NSID // TODO: limit result set size - let did = xrpc_required_param(request, "user")?; - let collection = xrpc_required_param(request, "collection")?; + let did = Did::from_str(&xrpc_required_param(request, "user")?)?; + let collection = Nsid::from_str(&xrpc_required_param(request, "collection")?)?; let mut record_list: Vec = vec![]; let mut srv = srv.lock().expect("service mutex"); - let full_map = srv.repo.mst_to_map(&did)?; + let commit_cid = &srv.repo.lookup_commit(&did)?.unwrap(); + let last_commit = srv.repo.get_commit(&commit_cid)?; + let full_map = srv.repo.mst_to_map(&last_commit.mst_cid)?; let prefix = format!("/{}/", collection); for (mst_key, cid) in full_map.iter() { + debug!("{}", mst_key); if mst_key.starts_with(&prefix) { let record = srv.repo.get_ipld(cid)?; record_list.push(json!({ @@ -209,15 +254,15 @@ fn xrpc_get_handler( Ok(json!({ "records": record_list })) } "com.atproto.repoDescribe" => { - let did = xrpc_required_param(request, "user")?; + let did = Did::from_str(&xrpc_required_param(request, "user")?)?; // TODO: resolve username? - let username = did.clone(); + let username = did.to_string(); let mut srv = srv.lock().expect("service mutex"); let did_doc = srv.atp_db.get_did_doc(&did)?; let collections: Vec = srv.repo.collections(&did)?; let desc = RepoDescribe { name: username, - did: did, + did: did.to_string(), didDoc: did_doc, collections: collections, nameIsCorrect: true, @@ -292,7 +337,7 @@ fn xrpc_post_handler( Ok(json!(sess)) } "com.atproto.createSession" => { - let req: AccountRequest = rouille::input::json_input(request) + let req: SessionRequest = rouille::input::json_input(request) .map_err(|e| XrpcError::BadRequest(format!("failed to parse JSON body: {}", e)))?; let mut srv = srv.lock().unwrap(); let keypair = srv.pds_keypair.clone(); @@ -304,7 +349,7 @@ fn xrpc_post_handler( } "com.atproto.deleteSession" => { let mut srv = srv.lock().unwrap(); - let _did = xrpc_check_auth_header(&mut srv, request)?; + let _did = xrpc_check_auth_header(&mut srv, request, None)?; let header = request .header("Authorization") .ok_or(XrpcError::Forbidden(format!("require auth header")))?; @@ -319,6 +364,99 @@ fn xrpc_post_handler( }; Ok(json!({})) } + "com.atproto.repoBatchWrite" => { + let batch: RepoBatchWriteBody = rouille::input::json_input(request)?; + // TODO: validate edits against schemas + let did = Did::from_str(&xrpc_required_param(request, "did")?)?; + let mut srv = srv.lock().unwrap(); + let _auth_did = &xrpc_check_auth_header(&mut srv, request, Some(&did))?; + let commit_cid = &srv.repo.lookup_commit(&did)?.unwrap(); + let last_commit = srv.repo.get_commit(&commit_cid)?; + let mut mutations: Vec = Default::default(); + for w in batch.writes.iter() { + let m = match w.op_type.as_str() { + "create" => Mutation::Create( + Nsid::from_str(&w.collection)?, + // TODO: user input unwrap here + w.rkey + .as_ref() + .map(|t| Tid::from_str(&t).unwrap()) + .unwrap_or_else(|| srv.tid_gen.next()), + json_value_into_ipld(w.value.clone()), + ), + "update" => Mutation::Update( + Nsid::from_str(&w.collection)?, + Tid::from_str(w.rkey.as_ref().unwrap())?, + json_value_into_ipld(w.value.clone()), + ), + "delete" => Mutation::Delete( + Nsid::from_str(&w.collection)?, + Tid::from_str(w.rkey.as_ref().unwrap())?, + ), + _ => Err(anyhow!("unhandled operation type: {}", w.op_type))?, + }; + mutations.push(m); + } + let new_mst_cid = srv.repo.update_mst(&last_commit.mst_cid, &mutations)?; + let new_root_cid = srv.repo.write_root( + &last_commit.meta_cid, + Some(&last_commit.commit_cid), + &new_mst_cid, + )?; + srv.repo.write_commit(&did, &new_root_cid, "dummy-sig")?; + // TODO: next handle updates to database + Ok(json!({})) + } + "com.atproto.repoCreateRecord" => { + // TODO: validate edits against schemas + let did = Did::from_str(&xrpc_required_param(request, "did")?)?; + let collection = Nsid::from_str(&xrpc_required_param(request, "collection")?)?; + let record: Value = rouille::input::json_input(request)?; + let mut srv = srv.lock().unwrap(); + let _auth_did = &xrpc_check_auth_header(&mut srv, request, Some(&did))?; + debug!("reading commit"); + let commit_cid = &srv.repo.lookup_commit(&did)?.unwrap(); + let last_commit = srv.repo.get_commit(&commit_cid)?; + let mutations: Vec = vec![Mutation::Create( + collection, + srv.tid_gen.next(), + json_value_into_ipld(record), + )]; + debug!("mutating tree"); + let new_mst_cid = srv + .repo + .update_mst(&last_commit.mst_cid, &mutations) + .context("updating MST in repo")?; + debug!("writing new root"); + let new_root_cid = srv.repo.write_root( + &last_commit.meta_cid, + Some(&last_commit.commit_cid), + &new_mst_cid, + )?; + debug!("writing new commit"); + srv.repo.write_commit(&did, &new_root_cid, "dummy-sig")?; + // TODO: next handle updates to database + Ok(json!({})) + } + "com.atproto.repoDeleteRecord" => { + let did = Did::from_str(&xrpc_required_param(request, "did")?)?; + let collection = Nsid::from_str(&xrpc_required_param(request, "collection")?)?; + let tid = Tid::from_str(&xrpc_required_param(request, "rkey")?)?; + let mut srv = srv.lock().unwrap(); + let _auth_did = &xrpc_check_auth_header(&mut srv, request, Some(&did))?; + let commit_cid = &srv.repo.lookup_commit(&did)?.unwrap(); + let last_commit = srv.repo.get_commit(&commit_cid)?; + let mutations: Vec = vec![Mutation::Delete(collection, tid)]; + let new_mst_cid = srv.repo.update_mst(&last_commit.mst_cid, &mutations)?; + let new_root_cid = srv.repo.write_root( + &last_commit.meta_cid, + Some(&last_commit.commit_cid), + &new_mst_cid, + )?; + srv.repo.write_commit(&did, &new_root_cid, "dummy-sig")?; + // TODO: next handle updates to database + Ok(json!({})) + } _ => Err(anyhow!(XrpcError::NotFound(format!( "XRPC endpoint handler not found: {}", method diff --git a/adenosine-pds/src/models.rs b/adenosine-pds/src/models.rs index 9da6104..afadeea 100644 --- a/adenosine-pds/src/models.rs +++ b/adenosine-pds/src/models.rs @@ -35,3 +35,19 @@ pub struct RepoDescribe { pub collections: Vec, pub nameIsCorrect: bool, } + +#[allow(non_snake_case)] +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq)] +pub struct RepoBatchWriteBody { + pub writes: Vec, +} + +#[allow(non_snake_case)] +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq)] +pub struct RepoBatchWrite { + #[serde(rename = "type")] + pub op_type: String, + pub collection: String, + pub rkey: Option, + pub value: serde_json::Value, +} diff --git a/adenosine-pds/src/repo.rs b/adenosine-pds/src/repo.rs index 8c14a18..d8c4451 100644 --- a/adenosine-pds/src/repo.rs +++ b/adenosine-pds/src/repo.rs @@ -1,5 +1,6 @@ use crate::load_car_to_blockstore; use crate::mst::{collect_mst_keys, generate_mst, CommitNode, MetadataNode, RootNode}; +use adenosine_cli::{Did, Nsid, Tid}; use anyhow::{anyhow, ensure, Context, Result}; use ipfs_sqlite_block_store::BlockStore; use libipld::cbor::DagCborCodec; @@ -7,6 +8,7 @@ use libipld::multihash::Code; use libipld::prelude::Codec; use libipld::store::DefaultParams; use libipld::{Block, Cid, Ipld}; +use log::debug; use std::borrow::Cow; use std::collections::BTreeMap; use std::collections::HashSet; @@ -15,8 +17,10 @@ use std::str::FromStr; pub struct RepoCommit { pub sig: Box<[u8]>, + pub commit_cid: String, pub did: String, pub prev: Option, + pub meta_cid: String, pub mst_cid: String, } @@ -24,6 +28,12 @@ pub struct RepoStore { db: BlockStore, } +pub enum Mutation { + Create(Nsid, Tid, Ipld), + Update(Nsid, Tid, Ipld), + Delete(Nsid, Tid), +} + impl RepoStore { pub fn open(db_path: &PathBuf) -> Result { Ok(RepoStore { @@ -127,6 +137,8 @@ impl RepoStore { ); Ok(RepoCommit { sig: commit_node.sig, + commit_cid: commit_cid.to_string(), + meta_cid: root_node.meta.to_string(), did: metadata_node.did, prev: root_node.prev.map(|cid| cid.to_string()), mst_cid: root_node.data.to_string(), @@ -150,8 +162,9 @@ impl RepoStore { }; let map = self.mst_to_map(&commit.mst_cid)?; let mut collections: HashSet = Default::default(); + // XXX: confirm that keys actually start with leading slash for k in map.keys() { - let coll = k.split("/").nth(0).unwrap(); + let coll = k.split("/").nth(1).unwrap(); collections.insert(coll.to_string()); } Ok(collections.into_iter().collect()) @@ -224,6 +237,27 @@ impl RepoStore { Ok(cid_map) } + pub fn update_mst(&mut self, mst_cid: &str, mutations: &Vec) -> Result { + let mut cid_map = self.mst_to_cid_map(mst_cid)?; + for m in mutations.iter() { + match m { + Mutation::Create(collection, tid, val) => { + let cid = self.put_ipld(val)?; + cid_map.insert(format!("/{}/{}", collection, tid), Cid::from_str(&cid)?); + } + Mutation::Update(collection, tid, val) => { + let cid = self.put_ipld(val)?; + cid_map.insert(format!("/{}/{}", collection, tid), Cid::from_str(&cid)?); + } + Mutation::Delete(collection, tid) => { + cid_map.remove(&format!("/{}/{}", collection, tid)); + } + } + } + let mst_cid = generate_mst(&mut self.db, &mut cid_map)?; + Ok(mst_cid.to_string()) + } + /// Returns all the keys for a directory, as a sorted vec of strings pub fn mst_to_map(&mut self, mst_cid: &str) -> Result> { let cid_map = self.mst_to_cid_map(mst_cid)?; -- cgit v1.2.3