diff options
| author | Bryan Newbold <bnewbold@robocracy.org> | 2022-11-04 19:16:01 -0700 | 
|---|---|---|
| committer | Bryan Newbold <bnewbold@robocracy.org> | 2022-11-04 19:16:01 -0700 | 
| commit | bc8493998d90799551c5e0703bbb4a6e69d2478a (patch) | |
| tree | 482d0360ae7f395568e3cdf0fd48d2e5313e9eb9 /adenosine-pds | |
| parent | 5f220855db95d006e4168356759a5b871899d759 (diff) | |
| download | adenosine-bc8493998d90799551c5e0703bbb4a6e69d2478a.tar.gz adenosine-bc8493998d90799551c5e0703bbb4a6e69d2478a.zip | |
pds: basic repo CRUD coming together
Diffstat (limited to 'adenosine-pds')
| -rw-r--r-- | adenosine-pds/plan.txt | 115 | ||||
| -rw-r--r-- | adenosine-pds/src/crypto.rs | 9 | ||||
| -rw-r--r-- | adenosine-pds/src/db.rs | 7 | ||||
| -rw-r--r-- | adenosine-pds/src/lib.rs | 170 | ||||
| -rw-r--r-- | adenosine-pds/src/models.rs | 16 | ||||
| -rw-r--r-- | adenosine-pds/src/repo.rs | 36 | 
6 files changed, 293 insertions, 60 deletions
| diff --git a/adenosine-pds/plan.txt b/adenosine-pds/plan.txt index c27299e..05b4b97 100644 --- a/adenosine-pds/plan.txt +++ b/adenosine-pds/plan.txt @@ -18,67 +18,92 @@ x basic crypto and did:plc stuff    x did:key read/write helpers    x test that did:plc generated as expected    x signature read/write helpers -- TODO: why are the multiformat keys so long in did doc? +x single shared signing key for all users (not what I expected) +x sqlite schema (for application)  - MST code to read and mutate tree state +    x  check that empty tree works (eg, for account creation, and after deletes) +    x  with in-memory tests      => mutation batches -    => just read the whole tree and then write the whole tree -    => check that empty tree works (eg, for account creation, and after deletes) -    => with in-memory tests  - service-level config      domain suffixes (eg, just ".test" for now)      account registration allowed or not      CLI account creation (?)      PDS signing key --  +- figure out auth JWT 'sub' situation (not a UCAN? CLI should ignore?) +- switch to Argon2 for passwords? meh +x IPLD objects to JSON value  - implement basic non-authenticated CRUD on repository, test with CLI      com.atproto -        createAccount -        repoDescribe -        repoGetRecord -        repoListRecords -        repoBatchWrite -            repoCreateRecord -            repoPutRecord -            repoDeleteRecord -        syncGetRoot +      x getAccountsConfig +      x createAccount +      x repoGetRecord +      x repoListRecords +      x syncGetRoot +      x repoDescribe +      x createSession +      x repoBatchWrite +      x     repoCreateRecord +      x     repoPutRecord +      x     repoDeleteRecord          syncGetRepo          syncUpdateRepo -- single shared signing key for all users (not what I expected) -- helper web methods - -    xrpc_wrap<S: Serialize>(resp: Result<S>) -> Response - -    xrpc_get_atproto(endpoint: &str, req) -> Result<Value> -    xrpc_post_atproto(endpoint: &str, req) -> Result<Value> - -    xrpc_wrap(xrpc_get_atproto(srv, "asdf", req)) - -? python test script -- sqlite schema (for application) -- write wrapper which updates MST *and* updates other tables in a transaction +x Did, Tid, Nsid types, with TID generator object? +    => xrpc_parse wrappers +- push types back "down" (less String typing in function signatures) +    => eg Cid in repo +- aturi canonicalization helper (re-writes usernames to DID?) +x helper web methods +- python XRPC API test script +- PDS CLI helpers +    create-account <username> <password> <email> +    reset-password <username> <password> +    list-repos +    list-accounts  +    import-car <car-file> [<did|alias>} +- mutate wrapper which updates MST *and* updates other tables in transactions  - JSON schema type generation (separate crate?)  - HTTP API handler implementing many endpoints      com.atproto -        createSession -        getAccountsConfig          getSession          resolveName      app.bsky +        updateProfile +        getProfile +          getHomeFeed          getAuthorFeed -        getLikedBy -        getNotificationCount -        getNotifications          getPostThread -        getProfile -        getRepostedBy +          getUserFollowers          getUserFollows -        getUsersSearch + +        getLikedBy +        getRepostedBy + +        getNotifications +        getNotificationCount          postNotificationsSeen -        updateProfile + +        getUsersSearch + +later: +- TODO: why are the multiformat keys so long in did doc? +- correct JWT helper stuff (?)  - did:web handler? +more PDS CLI helpers +    reindex <did> +        => locally re-generate cache tables +    update-repo <did> [<pds-server>] [<plc-server>] +        => or some full URI or something? +    update <did> [<plc-server>] +        => does this work yet? +    update-all-dids +        => would re-fetch did cods for all non-local users (?) +    spider +        => collect a big list of DIDs +        => fetch all the remote repos for those DIDs +  other utils/helpers:  - pack/unpack a repo CAR into JSON files in a directory tree (plus a commit.json with sig?) @@ -206,3 +231,21 @@ bsky_notification  TODO:  - bsky_badge (etc)  - bsky_media_embed + + +---- + +what is needed to be useful? +- workflow to self-hosting a single user with did:web +- host multiple users with did:plc and wildcard domain +- allow users to bring their own did:web identifier, which is not controlled directly by the PDS +    => some method to register with existing DID +    => export did doc +- some mechanism of automatically pulling from other PDS + +what might be interesting? +- basic read-only web interface +    => tech "explorer" for MST etc +- RSS feeds +- activitypub export +- PDS CLI that can inspect tree structure diff --git a/adenosine-pds/src/crypto.rs b/adenosine-pds/src/crypto.rs index 0720c07..1fa6f4c 100644 --- a/adenosine-pds/src/crypto.rs +++ b/adenosine-pds/src/crypto.rs @@ -6,7 +6,6 @@ use p256;  use p256::ecdsa::signature::{Signer, Verifier};  use std::str::FromStr;  use ucan::builder::UcanBuilder; -use ucan::crypto::KeyMaterial;  // Need to:  // @@ -66,12 +65,12 @@ impl KeyPair {      }      /// This is currently just an un-validated token; we don't actually verify these. -    pub fn ucan(&self) -> Result<String> { +    pub fn ucan(&self, did: &str) -> Result<String> {          let key_material = self.ucan_keymaterial();          let rt = tokio::runtime::Builder::new_current_thread()              .enable_all()              .build()?; -        rt.block_on(build_ucan(key_material)) +        rt.block_on(build_ucan(key_material, did))      }      pub fn to_hex(&self) -> String { @@ -85,10 +84,10 @@ impl KeyPair {      }  } -async fn build_ucan(key_material: P256KeyMaterial) -> Result<String> { +async fn build_ucan(key_material: P256KeyMaterial, did: &str) -> Result<String> {      let token_string = UcanBuilder::default()          .issued_by(&key_material) -        .for_audience(key_material.get_did().await.unwrap().as_str()) +        .for_audience(did)          .with_nonce()          .with_lifetime(60 * 60 * 24 * 90)          .build()? diff --git a/adenosine-pds/src/db.rs b/adenosine-pds/src/db.rs index 17006cb..e6b957c 100644 --- a/adenosine-pds/src/db.rs +++ b/adenosine-pds/src/db.rs @@ -9,6 +9,9 @@ use serde_json::Value;  use std::path::PathBuf;  use std::str::FromStr; +/// Default is 12, but that is quite slow (on my laptop at least) +const BCRYPT_COST: u32 = 8; +  #[cfg(test)]  mod tests {      use super::*; @@ -78,7 +81,7 @@ impl AtpDatabase {          recovery_pubkey: &str,      ) -> Result<()> {          debug!("bcrypt hashing password (can be slow)..."); -        let password_bcrypt = bcrypt::hash(password, bcrypt::DEFAULT_COST)?; +        let password_bcrypt = bcrypt::hash(password, BCRYPT_COST)?;          let mut stmt = self.conn.prepare_cached(              "INSERT INTO account (username, password_bcrypt, email, did, recovery_pubkey) VALUES (?1, ?2, ?3, ?4, ?5)",          )?; @@ -107,7 +110,7 @@ impl AtpDatabase {          if !bcrypt::verify(password, &password_bcrypt)? {              return Err(anyhow!("password did not match"));          } -        let jwt = keypair.ucan()?; +        let jwt = keypair.ucan(&did)?;          let mut stmt = self              .conn              .prepare_cached("INSERT INTO session (did, jwt) VALUES (?1, ?2)")?; diff --git a/adenosine-pds/src/lib.rs b/adenosine-pds/src/lib.rs index 0d73881..913b089 100644 --- a/adenosine-pds/src/lib.rs +++ b/adenosine-pds/src/lib.rs @@ -1,10 +1,15 @@ +use adenosine_cli::{AtUri, Did, Nsid, Tid, TidLord}; +use anyhow::Context;  use anyhow::{anyhow, Result}; +use libipld::Cid;  use libipld::Ipld;  use log::{debug, error, info, warn};  use rouille::{router, Request, Response};  use serde_json::{json, Value}; +use std::collections::BTreeMap;  use std::fmt;  use std::path::PathBuf; +use std::str::FromStr;  use std::sync::Mutex;  mod car; @@ -20,7 +25,7 @@ pub use car::{load_car_to_blockstore, load_car_to_sqlite};  pub use crypto::{KeyPair, PubKey};  pub use db::AtpDatabase;  pub use models::*; -pub use repo::{RepoCommit, RepoStore}; +pub use repo::{Mutation, RepoCommit, RepoStore};  pub use ucan_p256::P256KeyMaterial;  struct AtpService { @@ -28,6 +33,7 @@ struct AtpService {      pub atp_db: AtpDatabase,      pub pds_keypair: KeyPair,      pub pds_public_url: String, +    pub tid_gen: TidLord,  }  #[derive(Debug)] @@ -78,6 +84,7 @@ pub fn run_server(          atp_db: AtpDatabase::open(atp_db_path)?,          pds_keypair: keypair,          pds_public_url: format!("http://localhost:{}", port).to_string(), +        tid_gen: TidLord::new(),      });      let log_ok = |req: &Request, _resp: &Response, elap: std::time::Duration| { @@ -130,6 +137,31 @@ fn ipld_into_json_value(val: Ipld) -> Value {      }  } +/// Crude reverse generation +/// +/// Does not handle base64 to bytes, and the link generation is pretty simple (object elements with +/// key "car"). Numbers always come through as f64 (float). +fn json_value_into_ipld(val: Value) -> Ipld { +    match val { +        Value::Null => Ipld::Null, +        Value::Bool(b) => Ipld::Bool(b), +        Value::String(s) => Ipld::String(s), +        // TODO: handle numbers better? +        Value::Number(v) => Ipld::Float(v.as_f64().unwrap()), +        Value::Array(l) => Ipld::List(l.into_iter().map(|v| json_value_into_ipld(v)).collect()), +        Value::Object(m) => { +            let map: BTreeMap<String, Ipld> = BTreeMap::from_iter(m.into_iter().map(|(k, v)| { +                if k == "car" && v.is_string() { +                    (k, Ipld::Link(Cid::from_str(v.as_str().unwrap()).unwrap())) +                } else { +                    (k, json_value_into_ipld(v)) +                } +            })); +            Ipld::Map(map) +        } +    } +} +  fn xrpc_required_param(request: &Request, key: &str) -> Result<String> {      Ok(request.get_param(key).ok_or(XrpcError::BadRequest(format!(          "require '{}' query parameter", @@ -138,7 +170,11 @@ fn xrpc_required_param(request: &Request, key: &str) -> Result<String> {  }  /// Returns DID of validated user -fn xrpc_check_auth_header(srv: &mut AtpService, request: &Request) -> Result<String> { +fn xrpc_check_auth_header( +    srv: &mut AtpService, +    request: &Request, +    req_did: Option<&Did>, +) -> Result<Did> {      let header = request          .header("Authorization")          .ok_or(XrpcError::Forbidden(format!("require auth header")))?; @@ -146,10 +182,17 @@ fn xrpc_check_auth_header(srv: &mut AtpService, request: &Request) -> Result<Str          Err(XrpcError::Forbidden(format!("require bearer token")))?;      }      let jwt = header.split(" ").nth(1).unwrap(); -    match srv.atp_db.check_auth_token(&jwt)? { -        Some(did) => Ok(did), +    let did = match srv.atp_db.check_auth_token(&jwt)? { +        Some(did) => did,          None => Err(XrpcError::Forbidden(format!("session token not found")))?, +    }; +    let did = Did::from_str(&did)?; +    if req_did.is_some() && Some(&did) != req_did { +        Err(XrpcError::Forbidden(format!( +            "can only modify your own repo" +        )))?;      } +    Ok(did)  }  fn xrpc_get_handler( @@ -161,8 +204,8 @@ fn xrpc_get_handler(          "com.atproto.getAccountsConfig" => {              Ok(json!({"availableUserDomains": ["test"], "inviteCodeRequired": false}))          } -        "com.atproto.getRecord" => { -            let did = xrpc_required_param(request, "did")?; +        "com.atproto.repoGetRecord" => { +            let did = Did::from_str(&xrpc_required_param(request, "user")?)?;              let collection = xrpc_required_param(request, "collection")?;              let rkey = xrpc_required_param(request, "rkey")?;              let mut srv = srv.lock().expect("service mutex"); @@ -178,7 +221,7 @@ fn xrpc_get_handler(              }          }          "com.atproto.syncGetRoot" => { -            let did = xrpc_required_param(request, "did")?; +            let did = Did::from_str(&xrpc_required_param(request, "did")?)?;              let mut srv = srv.lock().expect("service mutex");              srv.repo                  .lookup_commit(&did)? @@ -188,15 +231,17 @@ fn xrpc_get_handler(          "com.atproto.repoListRecords" => {              // TODO: limit, before, after, tid, reverse              // TODO: handle non-DID 'user' -            // TODO: validate 'collection' as an NSID              // TODO: limit result set size -            let did = xrpc_required_param(request, "user")?; -            let collection = xrpc_required_param(request, "collection")?; +            let did = Did::from_str(&xrpc_required_param(request, "user")?)?; +            let collection = Nsid::from_str(&xrpc_required_param(request, "collection")?)?;              let mut record_list: Vec<Value> = vec![];              let mut srv = srv.lock().expect("service mutex"); -            let full_map = srv.repo.mst_to_map(&did)?; +            let commit_cid = &srv.repo.lookup_commit(&did)?.unwrap(); +            let last_commit = srv.repo.get_commit(&commit_cid)?; +            let full_map = srv.repo.mst_to_map(&last_commit.mst_cid)?;              let prefix = format!("/{}/", collection);              for (mst_key, cid) in full_map.iter() { +                debug!("{}", mst_key);                  if mst_key.starts_with(&prefix) {                      let record = srv.repo.get_ipld(cid)?;                      record_list.push(json!({ @@ -209,15 +254,15 @@ fn xrpc_get_handler(              Ok(json!({ "records": record_list }))          }          "com.atproto.repoDescribe" => { -            let did = xrpc_required_param(request, "user")?; +            let did = Did::from_str(&xrpc_required_param(request, "user")?)?;              // TODO: resolve username? -            let username = did.clone(); +            let username = did.to_string();              let mut srv = srv.lock().expect("service mutex");              let did_doc = srv.atp_db.get_did_doc(&did)?;              let collections: Vec<String> = srv.repo.collections(&did)?;              let desc = RepoDescribe {                  name: username, -                did: did, +                did: did.to_string(),                  didDoc: did_doc,                  collections: collections,                  nameIsCorrect: true, @@ -292,7 +337,7 @@ fn xrpc_post_handler(              Ok(json!(sess))          }          "com.atproto.createSession" => { -            let req: AccountRequest = rouille::input::json_input(request) +            let req: SessionRequest = rouille::input::json_input(request)                  .map_err(|e| XrpcError::BadRequest(format!("failed to parse JSON body: {}", e)))?;              let mut srv = srv.lock().unwrap();              let keypair = srv.pds_keypair.clone(); @@ -304,7 +349,7 @@ fn xrpc_post_handler(          }          "com.atproto.deleteSession" => {              let mut srv = srv.lock().unwrap(); -            let _did = xrpc_check_auth_header(&mut srv, request)?; +            let _did = xrpc_check_auth_header(&mut srv, request, None)?;              let header = request                  .header("Authorization")                  .ok_or(XrpcError::Forbidden(format!("require auth header")))?; @@ -319,6 +364,99 @@ fn xrpc_post_handler(              };              Ok(json!({}))          } +        "com.atproto.repoBatchWrite" => { +            let batch: RepoBatchWriteBody = rouille::input::json_input(request)?; +            // TODO: validate edits against schemas +            let did = Did::from_str(&xrpc_required_param(request, "did")?)?; +            let mut srv = srv.lock().unwrap(); +            let _auth_did = &xrpc_check_auth_header(&mut srv, request, Some(&did))?; +            let commit_cid = &srv.repo.lookup_commit(&did)?.unwrap(); +            let last_commit = srv.repo.get_commit(&commit_cid)?; +            let mut mutations: Vec<Mutation> = Default::default(); +            for w in batch.writes.iter() { +                let m = match w.op_type.as_str() { +                    "create" => Mutation::Create( +                        Nsid::from_str(&w.collection)?, +                        // TODO: user input unwrap here +                        w.rkey +                            .as_ref() +                            .map(|t| Tid::from_str(&t).unwrap()) +                            .unwrap_or_else(|| srv.tid_gen.next()), +                        json_value_into_ipld(w.value.clone()), +                    ), +                    "update" => Mutation::Update( +                        Nsid::from_str(&w.collection)?, +                        Tid::from_str(w.rkey.as_ref().unwrap())?, +                        json_value_into_ipld(w.value.clone()), +                    ), +                    "delete" => Mutation::Delete( +                        Nsid::from_str(&w.collection)?, +                        Tid::from_str(w.rkey.as_ref().unwrap())?, +                    ), +                    _ => Err(anyhow!("unhandled operation type: {}", w.op_type))?, +                }; +                mutations.push(m); +            } +            let new_mst_cid = srv.repo.update_mst(&last_commit.mst_cid, &mutations)?; +            let new_root_cid = srv.repo.write_root( +                &last_commit.meta_cid, +                Some(&last_commit.commit_cid), +                &new_mst_cid, +            )?; +            srv.repo.write_commit(&did, &new_root_cid, "dummy-sig")?; +            // TODO: next handle updates to database +            Ok(json!({})) +        } +        "com.atproto.repoCreateRecord" => { +            // TODO: validate edits against schemas +            let did = Did::from_str(&xrpc_required_param(request, "did")?)?; +            let collection = Nsid::from_str(&xrpc_required_param(request, "collection")?)?; +            let record: Value = rouille::input::json_input(request)?; +            let mut srv = srv.lock().unwrap(); +            let _auth_did = &xrpc_check_auth_header(&mut srv, request, Some(&did))?; +            debug!("reading commit"); +            let commit_cid = &srv.repo.lookup_commit(&did)?.unwrap(); +            let last_commit = srv.repo.get_commit(&commit_cid)?; +            let mutations: Vec<Mutation> = vec![Mutation::Create( +                collection, +                srv.tid_gen.next(), +                json_value_into_ipld(record), +            )]; +            debug!("mutating tree"); +            let new_mst_cid = srv +                .repo +                .update_mst(&last_commit.mst_cid, &mutations) +                .context("updating MST in repo")?; +            debug!("writing new root"); +            let new_root_cid = srv.repo.write_root( +                &last_commit.meta_cid, +                Some(&last_commit.commit_cid), +                &new_mst_cid, +            )?; +            debug!("writing new commit"); +            srv.repo.write_commit(&did, &new_root_cid, "dummy-sig")?; +            // TODO: next handle updates to database +            Ok(json!({})) +        } +        "com.atproto.repoDeleteRecord" => { +            let did = Did::from_str(&xrpc_required_param(request, "did")?)?; +            let collection = Nsid::from_str(&xrpc_required_param(request, "collection")?)?; +            let tid = Tid::from_str(&xrpc_required_param(request, "rkey")?)?; +            let mut srv = srv.lock().unwrap(); +            let _auth_did = &xrpc_check_auth_header(&mut srv, request, Some(&did))?; +            let commit_cid = &srv.repo.lookup_commit(&did)?.unwrap(); +            let last_commit = srv.repo.get_commit(&commit_cid)?; +            let mutations: Vec<Mutation> = vec![Mutation::Delete(collection, tid)]; +            let new_mst_cid = srv.repo.update_mst(&last_commit.mst_cid, &mutations)?; +            let new_root_cid = srv.repo.write_root( +                &last_commit.meta_cid, +                Some(&last_commit.commit_cid), +                &new_mst_cid, +            )?; +            srv.repo.write_commit(&did, &new_root_cid, "dummy-sig")?; +            // TODO: next handle updates to database +            Ok(json!({})) +        }          _ => Err(anyhow!(XrpcError::NotFound(format!(              "XRPC endpoint handler not found: {}",              method diff --git a/adenosine-pds/src/models.rs b/adenosine-pds/src/models.rs index 9da6104..afadeea 100644 --- a/adenosine-pds/src/models.rs +++ b/adenosine-pds/src/models.rs @@ -35,3 +35,19 @@ pub struct RepoDescribe {      pub collections: Vec<String>,      pub nameIsCorrect: bool,  } + +#[allow(non_snake_case)] +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq)] +pub struct RepoBatchWriteBody { +    pub writes: Vec<RepoBatchWrite>, +} + +#[allow(non_snake_case)] +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq)] +pub struct RepoBatchWrite { +    #[serde(rename = "type")] +    pub op_type: String, +    pub collection: String, +    pub rkey: Option<String>, +    pub value: serde_json::Value, +} diff --git a/adenosine-pds/src/repo.rs b/adenosine-pds/src/repo.rs index 8c14a18..d8c4451 100644 --- a/adenosine-pds/src/repo.rs +++ b/adenosine-pds/src/repo.rs @@ -1,5 +1,6 @@  use crate::load_car_to_blockstore;  use crate::mst::{collect_mst_keys, generate_mst, CommitNode, MetadataNode, RootNode}; +use adenosine_cli::{Did, Nsid, Tid};  use anyhow::{anyhow, ensure, Context, Result};  use ipfs_sqlite_block_store::BlockStore;  use libipld::cbor::DagCborCodec; @@ -7,6 +8,7 @@ use libipld::multihash::Code;  use libipld::prelude::Codec;  use libipld::store::DefaultParams;  use libipld::{Block, Cid, Ipld}; +use log::debug;  use std::borrow::Cow;  use std::collections::BTreeMap;  use std::collections::HashSet; @@ -15,8 +17,10 @@ use std::str::FromStr;  pub struct RepoCommit {      pub sig: Box<[u8]>, +    pub commit_cid: String,      pub did: String,      pub prev: Option<String>, +    pub meta_cid: String,      pub mst_cid: String,  } @@ -24,6 +28,12 @@ pub struct RepoStore {      db: BlockStore<libipld::DefaultParams>,  } +pub enum Mutation { +    Create(Nsid, Tid, Ipld), +    Update(Nsid, Tid, Ipld), +    Delete(Nsid, Tid), +} +  impl RepoStore {      pub fn open(db_path: &PathBuf) -> Result<Self> {          Ok(RepoStore { @@ -127,6 +137,8 @@ impl RepoStore {          );          Ok(RepoCommit {              sig: commit_node.sig, +            commit_cid: commit_cid.to_string(), +            meta_cid: root_node.meta.to_string(),              did: metadata_node.did,              prev: root_node.prev.map(|cid| cid.to_string()),              mst_cid: root_node.data.to_string(), @@ -150,8 +162,9 @@ impl RepoStore {          };          let map = self.mst_to_map(&commit.mst_cid)?;          let mut collections: HashSet<String> = Default::default(); +        // XXX: confirm that keys actually start with leading slash          for k in map.keys() { -            let coll = k.split("/").nth(0).unwrap(); +            let coll = k.split("/").nth(1).unwrap();              collections.insert(coll.to_string());          }          Ok(collections.into_iter().collect()) @@ -224,6 +237,27 @@ impl RepoStore {          Ok(cid_map)      } +    pub fn update_mst(&mut self, mst_cid: &str, mutations: &Vec<Mutation>) -> Result<String> { +        let mut cid_map = self.mst_to_cid_map(mst_cid)?; +        for m in mutations.iter() { +            match m { +                Mutation::Create(collection, tid, val) => { +                    let cid = self.put_ipld(val)?; +                    cid_map.insert(format!("/{}/{}", collection, tid), Cid::from_str(&cid)?); +                } +                Mutation::Update(collection, tid, val) => { +                    let cid = self.put_ipld(val)?; +                    cid_map.insert(format!("/{}/{}", collection, tid), Cid::from_str(&cid)?); +                } +                Mutation::Delete(collection, tid) => { +                    cid_map.remove(&format!("/{}/{}", collection, tid)); +                } +            } +        } +        let mst_cid = generate_mst(&mut self.db, &mut cid_map)?; +        Ok(mst_cid.to_string()) +    } +      /// Returns all the keys for a directory, as a sorted vec of strings      pub fn mst_to_map(&mut self, mst_cid: &str) -> Result<BTreeMap<String, String>> {          let cid_map = self.mst_to_cid_map(mst_cid)?; | 
