aboutsummaryrefslogtreecommitdiffstats
path: root/adenosine-pds/src
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2022-11-04 19:16:01 -0700
committerBryan Newbold <bnewbold@robocracy.org>2022-11-04 19:16:01 -0700
commitbc8493998d90799551c5e0703bbb4a6e69d2478a (patch)
tree482d0360ae7f395568e3cdf0fd48d2e5313e9eb9 /adenosine-pds/src
parent5f220855db95d006e4168356759a5b871899d759 (diff)
downloadadenosine-bc8493998d90799551c5e0703bbb4a6e69d2478a.tar.gz
adenosine-bc8493998d90799551c5e0703bbb4a6e69d2478a.zip
pds: basic repo CRUD coming together
Diffstat (limited to 'adenosine-pds/src')
-rw-r--r--adenosine-pds/src/crypto.rs9
-rw-r--r--adenosine-pds/src/db.rs7
-rw-r--r--adenosine-pds/src/lib.rs170
-rw-r--r--adenosine-pds/src/models.rs16
-rw-r--r--adenosine-pds/src/repo.rs36
5 files changed, 214 insertions, 24 deletions
diff --git a/adenosine-pds/src/crypto.rs b/adenosine-pds/src/crypto.rs
index 0720c07..1fa6f4c 100644
--- a/adenosine-pds/src/crypto.rs
+++ b/adenosine-pds/src/crypto.rs
@@ -6,7 +6,6 @@ use p256;
use p256::ecdsa::signature::{Signer, Verifier};
use std::str::FromStr;
use ucan::builder::UcanBuilder;
-use ucan::crypto::KeyMaterial;
// Need to:
//
@@ -66,12 +65,12 @@ impl KeyPair {
}
/// This is currently just an un-validated token; we don't actually verify these.
- pub fn ucan(&self) -> Result<String> {
+ pub fn ucan(&self, did: &str) -> Result<String> {
let key_material = self.ucan_keymaterial();
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
- rt.block_on(build_ucan(key_material))
+ rt.block_on(build_ucan(key_material, did))
}
pub fn to_hex(&self) -> String {
@@ -85,10 +84,10 @@ impl KeyPair {
}
}
-async fn build_ucan(key_material: P256KeyMaterial) -> Result<String> {
+async fn build_ucan(key_material: P256KeyMaterial, did: &str) -> Result<String> {
let token_string = UcanBuilder::default()
.issued_by(&key_material)
- .for_audience(key_material.get_did().await.unwrap().as_str())
+ .for_audience(did)
.with_nonce()
.with_lifetime(60 * 60 * 24 * 90)
.build()?
diff --git a/adenosine-pds/src/db.rs b/adenosine-pds/src/db.rs
index 17006cb..e6b957c 100644
--- a/adenosine-pds/src/db.rs
+++ b/adenosine-pds/src/db.rs
@@ -9,6 +9,9 @@ use serde_json::Value;
use std::path::PathBuf;
use std::str::FromStr;
+/// Default is 12, but that is quite slow (on my laptop at least)
+const BCRYPT_COST: u32 = 8;
+
#[cfg(test)]
mod tests {
use super::*;
@@ -78,7 +81,7 @@ impl AtpDatabase {
recovery_pubkey: &str,
) -> Result<()> {
debug!("bcrypt hashing password (can be slow)...");
- let password_bcrypt = bcrypt::hash(password, bcrypt::DEFAULT_COST)?;
+ let password_bcrypt = bcrypt::hash(password, BCRYPT_COST)?;
let mut stmt = self.conn.prepare_cached(
"INSERT INTO account (username, password_bcrypt, email, did, recovery_pubkey) VALUES (?1, ?2, ?3, ?4, ?5)",
)?;
@@ -107,7 +110,7 @@ impl AtpDatabase {
if !bcrypt::verify(password, &password_bcrypt)? {
return Err(anyhow!("password did not match"));
}
- let jwt = keypair.ucan()?;
+ let jwt = keypair.ucan(&did)?;
let mut stmt = self
.conn
.prepare_cached("INSERT INTO session (did, jwt) VALUES (?1, ?2)")?;
diff --git a/adenosine-pds/src/lib.rs b/adenosine-pds/src/lib.rs
index 0d73881..913b089 100644
--- a/adenosine-pds/src/lib.rs
+++ b/adenosine-pds/src/lib.rs
@@ -1,10 +1,15 @@
+use adenosine_cli::{AtUri, Did, Nsid, Tid, TidLord};
+use anyhow::Context;
use anyhow::{anyhow, Result};
+use libipld::Cid;
use libipld::Ipld;
use log::{debug, error, info, warn};
use rouille::{router, Request, Response};
use serde_json::{json, Value};
+use std::collections::BTreeMap;
use std::fmt;
use std::path::PathBuf;
+use std::str::FromStr;
use std::sync::Mutex;
mod car;
@@ -20,7 +25,7 @@ pub use car::{load_car_to_blockstore, load_car_to_sqlite};
pub use crypto::{KeyPair, PubKey};
pub use db::AtpDatabase;
pub use models::*;
-pub use repo::{RepoCommit, RepoStore};
+pub use repo::{Mutation, RepoCommit, RepoStore};
pub use ucan_p256::P256KeyMaterial;
struct AtpService {
@@ -28,6 +33,7 @@ struct AtpService {
pub atp_db: AtpDatabase,
pub pds_keypair: KeyPair,
pub pds_public_url: String,
+ pub tid_gen: TidLord,
}
#[derive(Debug)]
@@ -78,6 +84,7 @@ pub fn run_server(
atp_db: AtpDatabase::open(atp_db_path)?,
pds_keypair: keypair,
pds_public_url: format!("http://localhost:{}", port).to_string(),
+ tid_gen: TidLord::new(),
});
let log_ok = |req: &Request, _resp: &Response, elap: std::time::Duration| {
@@ -130,6 +137,31 @@ fn ipld_into_json_value(val: Ipld) -> Value {
}
}
+/// Crude reverse generation
+///
+/// Does not handle base64 to bytes, and the link generation is pretty simple (object elements with
+/// key "car"). Numbers always come through as f64 (float).
+fn json_value_into_ipld(val: Value) -> Ipld {
+ match val {
+ Value::Null => Ipld::Null,
+ Value::Bool(b) => Ipld::Bool(b),
+ Value::String(s) => Ipld::String(s),
+ // TODO: handle numbers better?
+ Value::Number(v) => Ipld::Float(v.as_f64().unwrap()),
+ Value::Array(l) => Ipld::List(l.into_iter().map(|v| json_value_into_ipld(v)).collect()),
+ Value::Object(m) => {
+ let map: BTreeMap<String, Ipld> = BTreeMap::from_iter(m.into_iter().map(|(k, v)| {
+ if k == "car" && v.is_string() {
+ (k, Ipld::Link(Cid::from_str(v.as_str().unwrap()).unwrap()))
+ } else {
+ (k, json_value_into_ipld(v))
+ }
+ }));
+ Ipld::Map(map)
+ }
+ }
+}
+
fn xrpc_required_param(request: &Request, key: &str) -> Result<String> {
Ok(request.get_param(key).ok_or(XrpcError::BadRequest(format!(
"require '{}' query parameter",
@@ -138,7 +170,11 @@ fn xrpc_required_param(request: &Request, key: &str) -> Result<String> {
}
/// Returns DID of validated user
-fn xrpc_check_auth_header(srv: &mut AtpService, request: &Request) -> Result<String> {
+fn xrpc_check_auth_header(
+ srv: &mut AtpService,
+ request: &Request,
+ req_did: Option<&Did>,
+) -> Result<Did> {
let header = request
.header("Authorization")
.ok_or(XrpcError::Forbidden(format!("require auth header")))?;
@@ -146,10 +182,17 @@ fn xrpc_check_auth_header(srv: &mut AtpService, request: &Request) -> Result<Str
Err(XrpcError::Forbidden(format!("require bearer token")))?;
}
let jwt = header.split(" ").nth(1).unwrap();
- match srv.atp_db.check_auth_token(&jwt)? {
- Some(did) => Ok(did),
+ let did = match srv.atp_db.check_auth_token(&jwt)? {
+ Some(did) => did,
None => Err(XrpcError::Forbidden(format!("session token not found")))?,
+ };
+ let did = Did::from_str(&did)?;
+ if req_did.is_some() && Some(&did) != req_did {
+ Err(XrpcError::Forbidden(format!(
+ "can only modify your own repo"
+ )))?;
}
+ Ok(did)
}
fn xrpc_get_handler(
@@ -161,8 +204,8 @@ fn xrpc_get_handler(
"com.atproto.getAccountsConfig" => {
Ok(json!({"availableUserDomains": ["test"], "inviteCodeRequired": false}))
}
- "com.atproto.getRecord" => {
- let did = xrpc_required_param(request, "did")?;
+ "com.atproto.repoGetRecord" => {
+ let did = Did::from_str(&xrpc_required_param(request, "user")?)?;
let collection = xrpc_required_param(request, "collection")?;
let rkey = xrpc_required_param(request, "rkey")?;
let mut srv = srv.lock().expect("service mutex");
@@ -178,7 +221,7 @@ fn xrpc_get_handler(
}
}
"com.atproto.syncGetRoot" => {
- let did = xrpc_required_param(request, "did")?;
+ let did = Did::from_str(&xrpc_required_param(request, "did")?)?;
let mut srv = srv.lock().expect("service mutex");
srv.repo
.lookup_commit(&did)?
@@ -188,15 +231,17 @@ fn xrpc_get_handler(
"com.atproto.repoListRecords" => {
// TODO: limit, before, after, tid, reverse
// TODO: handle non-DID 'user'
- // TODO: validate 'collection' as an NSID
// TODO: limit result set size
- let did = xrpc_required_param(request, "user")?;
- let collection = xrpc_required_param(request, "collection")?;
+ let did = Did::from_str(&xrpc_required_param(request, "user")?)?;
+ let collection = Nsid::from_str(&xrpc_required_param(request, "collection")?)?;
let mut record_list: Vec<Value> = vec![];
let mut srv = srv.lock().expect("service mutex");
- let full_map = srv.repo.mst_to_map(&did)?;
+ let commit_cid = &srv.repo.lookup_commit(&did)?.unwrap();
+ let last_commit = srv.repo.get_commit(&commit_cid)?;
+ let full_map = srv.repo.mst_to_map(&last_commit.mst_cid)?;
let prefix = format!("/{}/", collection);
for (mst_key, cid) in full_map.iter() {
+ debug!("{}", mst_key);
if mst_key.starts_with(&prefix) {
let record = srv.repo.get_ipld(cid)?;
record_list.push(json!({
@@ -209,15 +254,15 @@ fn xrpc_get_handler(
Ok(json!({ "records": record_list }))
}
"com.atproto.repoDescribe" => {
- let did = xrpc_required_param(request, "user")?;
+ let did = Did::from_str(&xrpc_required_param(request, "user")?)?;
// TODO: resolve username?
- let username = did.clone();
+ let username = did.to_string();
let mut srv = srv.lock().expect("service mutex");
let did_doc = srv.atp_db.get_did_doc(&did)?;
let collections: Vec<String> = srv.repo.collections(&did)?;
let desc = RepoDescribe {
name: username,
- did: did,
+ did: did.to_string(),
didDoc: did_doc,
collections: collections,
nameIsCorrect: true,
@@ -292,7 +337,7 @@ fn xrpc_post_handler(
Ok(json!(sess))
}
"com.atproto.createSession" => {
- let req: AccountRequest = rouille::input::json_input(request)
+ let req: SessionRequest = rouille::input::json_input(request)
.map_err(|e| XrpcError::BadRequest(format!("failed to parse JSON body: {}", e)))?;
let mut srv = srv.lock().unwrap();
let keypair = srv.pds_keypair.clone();
@@ -304,7 +349,7 @@ fn xrpc_post_handler(
}
"com.atproto.deleteSession" => {
let mut srv = srv.lock().unwrap();
- let _did = xrpc_check_auth_header(&mut srv, request)?;
+ let _did = xrpc_check_auth_header(&mut srv, request, None)?;
let header = request
.header("Authorization")
.ok_or(XrpcError::Forbidden(format!("require auth header")))?;
@@ -319,6 +364,99 @@ fn xrpc_post_handler(
};
Ok(json!({}))
}
+ "com.atproto.repoBatchWrite" => {
+ let batch: RepoBatchWriteBody = rouille::input::json_input(request)?;
+ // TODO: validate edits against schemas
+ let did = Did::from_str(&xrpc_required_param(request, "did")?)?;
+ let mut srv = srv.lock().unwrap();
+ let _auth_did = &xrpc_check_auth_header(&mut srv, request, Some(&did))?;
+ let commit_cid = &srv.repo.lookup_commit(&did)?.unwrap();
+ let last_commit = srv.repo.get_commit(&commit_cid)?;
+ let mut mutations: Vec<Mutation> = Default::default();
+ for w in batch.writes.iter() {
+ let m = match w.op_type.as_str() {
+ "create" => Mutation::Create(
+ Nsid::from_str(&w.collection)?,
+ // TODO: user input unwrap here
+ w.rkey
+ .as_ref()
+ .map(|t| Tid::from_str(&t).unwrap())
+ .unwrap_or_else(|| srv.tid_gen.next()),
+ json_value_into_ipld(w.value.clone()),
+ ),
+ "update" => Mutation::Update(
+ Nsid::from_str(&w.collection)?,
+ Tid::from_str(w.rkey.as_ref().unwrap())?,
+ json_value_into_ipld(w.value.clone()),
+ ),
+ "delete" => Mutation::Delete(
+ Nsid::from_str(&w.collection)?,
+ Tid::from_str(w.rkey.as_ref().unwrap())?,
+ ),
+ _ => Err(anyhow!("unhandled operation type: {}", w.op_type))?,
+ };
+ mutations.push(m);
+ }
+ let new_mst_cid = srv.repo.update_mst(&last_commit.mst_cid, &mutations)?;
+ let new_root_cid = srv.repo.write_root(
+ &last_commit.meta_cid,
+ Some(&last_commit.commit_cid),
+ &new_mst_cid,
+ )?;
+ srv.repo.write_commit(&did, &new_root_cid, "dummy-sig")?;
+ // TODO: next handle updates to database
+ Ok(json!({}))
+ }
+ "com.atproto.repoCreateRecord" => {
+ // TODO: validate edits against schemas
+ let did = Did::from_str(&xrpc_required_param(request, "did")?)?;
+ let collection = Nsid::from_str(&xrpc_required_param(request, "collection")?)?;
+ let record: Value = rouille::input::json_input(request)?;
+ let mut srv = srv.lock().unwrap();
+ let _auth_did = &xrpc_check_auth_header(&mut srv, request, Some(&did))?;
+ debug!("reading commit");
+ let commit_cid = &srv.repo.lookup_commit(&did)?.unwrap();
+ let last_commit = srv.repo.get_commit(&commit_cid)?;
+ let mutations: Vec<Mutation> = vec![Mutation::Create(
+ collection,
+ srv.tid_gen.next(),
+ json_value_into_ipld(record),
+ )];
+ debug!("mutating tree");
+ let new_mst_cid = srv
+ .repo
+ .update_mst(&last_commit.mst_cid, &mutations)
+ .context("updating MST in repo")?;
+ debug!("writing new root");
+ let new_root_cid = srv.repo.write_root(
+ &last_commit.meta_cid,
+ Some(&last_commit.commit_cid),
+ &new_mst_cid,
+ )?;
+ debug!("writing new commit");
+ srv.repo.write_commit(&did, &new_root_cid, "dummy-sig")?;
+ // TODO: next handle updates to database
+ Ok(json!({}))
+ }
+ "com.atproto.repoDeleteRecord" => {
+ let did = Did::from_str(&xrpc_required_param(request, "did")?)?;
+ let collection = Nsid::from_str(&xrpc_required_param(request, "collection")?)?;
+ let tid = Tid::from_str(&xrpc_required_param(request, "rkey")?)?;
+ let mut srv = srv.lock().unwrap();
+ let _auth_did = &xrpc_check_auth_header(&mut srv, request, Some(&did))?;
+ let commit_cid = &srv.repo.lookup_commit(&did)?.unwrap();
+ let last_commit = srv.repo.get_commit(&commit_cid)?;
+ let mutations: Vec<Mutation> = vec![Mutation::Delete(collection, tid)];
+ let new_mst_cid = srv.repo.update_mst(&last_commit.mst_cid, &mutations)?;
+ let new_root_cid = srv.repo.write_root(
+ &last_commit.meta_cid,
+ Some(&last_commit.commit_cid),
+ &new_mst_cid,
+ )?;
+ srv.repo.write_commit(&did, &new_root_cid, "dummy-sig")?;
+ // TODO: next handle updates to database
+ Ok(json!({}))
+ }
_ => Err(anyhow!(XrpcError::NotFound(format!(
"XRPC endpoint handler not found: {}",
method
diff --git a/adenosine-pds/src/models.rs b/adenosine-pds/src/models.rs
index 9da6104..afadeea 100644
--- a/adenosine-pds/src/models.rs
+++ b/adenosine-pds/src/models.rs
@@ -35,3 +35,19 @@ pub struct RepoDescribe {
pub collections: Vec<String>,
pub nameIsCorrect: bool,
}
+
+#[allow(non_snake_case)]
+#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq)]
+pub struct RepoBatchWriteBody {
+ pub writes: Vec<RepoBatchWrite>,
+}
+
+#[allow(non_snake_case)]
+#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq)]
+pub struct RepoBatchWrite {
+ #[serde(rename = "type")]
+ pub op_type: String,
+ pub collection: String,
+ pub rkey: Option<String>,
+ pub value: serde_json::Value,
+}
diff --git a/adenosine-pds/src/repo.rs b/adenosine-pds/src/repo.rs
index 8c14a18..d8c4451 100644
--- a/adenosine-pds/src/repo.rs
+++ b/adenosine-pds/src/repo.rs
@@ -1,5 +1,6 @@
use crate::load_car_to_blockstore;
use crate::mst::{collect_mst_keys, generate_mst, CommitNode, MetadataNode, RootNode};
+use adenosine_cli::{Did, Nsid, Tid};
use anyhow::{anyhow, ensure, Context, Result};
use ipfs_sqlite_block_store::BlockStore;
use libipld::cbor::DagCborCodec;
@@ -7,6 +8,7 @@ use libipld::multihash::Code;
use libipld::prelude::Codec;
use libipld::store::DefaultParams;
use libipld::{Block, Cid, Ipld};
+use log::debug;
use std::borrow::Cow;
use std::collections::BTreeMap;
use std::collections::HashSet;
@@ -15,8 +17,10 @@ use std::str::FromStr;
pub struct RepoCommit {
pub sig: Box<[u8]>,
+ pub commit_cid: String,
pub did: String,
pub prev: Option<String>,
+ pub meta_cid: String,
pub mst_cid: String,
}
@@ -24,6 +28,12 @@ pub struct RepoStore {
db: BlockStore<libipld::DefaultParams>,
}
+pub enum Mutation {
+ Create(Nsid, Tid, Ipld),
+ Update(Nsid, Tid, Ipld),
+ Delete(Nsid, Tid),
+}
+
impl RepoStore {
pub fn open(db_path: &PathBuf) -> Result<Self> {
Ok(RepoStore {
@@ -127,6 +137,8 @@ impl RepoStore {
);
Ok(RepoCommit {
sig: commit_node.sig,
+ commit_cid: commit_cid.to_string(),
+ meta_cid: root_node.meta.to_string(),
did: metadata_node.did,
prev: root_node.prev.map(|cid| cid.to_string()),
mst_cid: root_node.data.to_string(),
@@ -150,8 +162,9 @@ impl RepoStore {
};
let map = self.mst_to_map(&commit.mst_cid)?;
let mut collections: HashSet<String> = Default::default();
+ // XXX: confirm that keys actually start with leading slash
for k in map.keys() {
- let coll = k.split("/").nth(0).unwrap();
+ let coll = k.split("/").nth(1).unwrap();
collections.insert(coll.to_string());
}
Ok(collections.into_iter().collect())
@@ -224,6 +237,27 @@ impl RepoStore {
Ok(cid_map)
}
+ pub fn update_mst(&mut self, mst_cid: &str, mutations: &Vec<Mutation>) -> Result<String> {
+ let mut cid_map = self.mst_to_cid_map(mst_cid)?;
+ for m in mutations.iter() {
+ match m {
+ Mutation::Create(collection, tid, val) => {
+ let cid = self.put_ipld(val)?;
+ cid_map.insert(format!("/{}/{}", collection, tid), Cid::from_str(&cid)?);
+ }
+ Mutation::Update(collection, tid, val) => {
+ let cid = self.put_ipld(val)?;
+ cid_map.insert(format!("/{}/{}", collection, tid), Cid::from_str(&cid)?);
+ }
+ Mutation::Delete(collection, tid) => {
+ cid_map.remove(&format!("/{}/{}", collection, tid));
+ }
+ }
+ }
+ let mst_cid = generate_mst(&mut self.db, &mut cid_map)?;
+ Ok(mst_cid.to_string())
+ }
+
/// Returns all the keys for a directory, as a sorted vec of strings
pub fn mst_to_map(&mut self, mst_cid: &str) -> Result<BTreeMap<String, String>> {
let cid_map = self.mst_to_cid_map(mst_cid)?;