From 9b4a5c6bfd2b816fc9b281bdd12e67a0e0245f4c Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Wed, 2 Nov 2022 21:10:57 -0700 Subject: pds: progress on repo mst wrapper --- adenosine-pds/src/car.rs | 13 +-- adenosine-pds/src/lib.rs | 34 +++++-- adenosine-pds/src/mst.rs | 24 +++-- adenosine-pds/src/repo.rs | 246 ++++++++++++++++++++++++++++++++++++++-------- 4 files changed, 255 insertions(+), 62 deletions(-) (limited to 'adenosine-pds/src') diff --git a/adenosine-pds/src/car.rs b/adenosine-pds/src/car.rs index 5731848..35cc3fd 100644 --- a/adenosine-pds/src/car.rs +++ b/adenosine-pds/src/car.rs @@ -3,7 +3,7 @@ use anyhow::Result; use futures::TryStreamExt; use ipfs_sqlite_block_store::BlockStore; use iroh_car::CarReader; -use libipld::block::Block; +use libipld::{Block, Cid}; use std::path::PathBuf; use tokio::fs::File; use tokio::io::BufReader; @@ -12,13 +12,14 @@ pub fn load_car_to_sqlite(db_path: &PathBuf, car_path: &PathBuf) -> Result<()> { let mut db: BlockStore = { BlockStore::open(db_path, Default::default())? }; - load_car_to_blockstore(&mut db, car_path) + load_car_to_blockstore(&mut db, car_path)?; + Ok(()) } pub fn load_car_to_blockstore( db: &mut BlockStore, car_path: &PathBuf, -) -> Result<()> { +) -> Result { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build()?; @@ -29,7 +30,7 @@ pub fn load_car_to_blockstore( async fn inner_car_loader( db: &mut BlockStore, car_path: &PathBuf, -) -> Result<()> { +) -> Result { println!( "{} - {}", std::env::current_dir()?.display(), @@ -52,10 +53,10 @@ async fn inner_car_loader( }) .await?; - // pin the header + // pin the header (?) if car_header.roots().len() >= 1 { db.alias(b"import".to_vec(), Some(&car_header.roots()[0]))?; } - Ok(()) + Ok(car_header.roots()[0]) } diff --git a/adenosine-pds/src/lib.rs b/adenosine-pds/src/lib.rs index 8263ddc..90cac3f 100644 --- a/adenosine-pds/src/lib.rs +++ b/adenosine-pds/src/lib.rs @@ -93,10 +93,8 @@ pub fn run_server(port: u16, blockstore_db_path: &PathBuf, atp_db_path: &PathBuf (GET) ["/"] => { Response::text("Not much to see here yet!") }, - (POST) ["/xrpc/com.atproto.createAccount"] => { - let req: AccountRequest = try_or_400!(rouille::input::json_input(request)); - let mut srv = srv.lock().unwrap(); - xrpc_wrap(srv.atp_db.create_account(&req.username, &req.password, &req.email)) + (POST) ["/xrpc/com.atproto.{endpoint}", endpoint: String] => { + xrpc_wrap(xrpc_post_atproto(&srv, &endpoint, request)) }, (GET) ["/xrpc/com.atproto.{endpoint}", endpoint: String] => { xrpc_wrap(xrpc_get_atproto(&srv, &endpoint, request)) @@ -120,11 +118,9 @@ fn xrpc_get_atproto( let did = request.get_param("user").unwrap(); let collection = request.get_param("collection").unwrap(); let rkey = request.get_param("rkey").unwrap(); - let repo_key = format!("/{}/{}", collection, rkey); let mut srv = srv.lock().expect("service mutex"); - let commit_cid = srv.repo.lookup_commit(&did)?.unwrap(); let key = format!("/{}/{}", collection, rkey); - match srv.repo.get_record_by_key(&commit_cid, &key) { + match srv.repo.get_atp_record(&did, &collection, &rkey) { // TODO: format as JSON, not text debug Ok(Some(ipld)) => Ok(json!({ "thing": format!("{:?}", ipld) })), Ok(None) => Err(anyhow!(XrpcError::NotFound(format!( @@ -148,3 +144,27 @@ fn xrpc_get_atproto( )))), } } + +fn xrpc_post_atproto( + srv: &Mutex, + method: &str, + request: &Request, +) -> Result { + match method { + "createAccount" => { + // TODO: failure here is a 400, not 500 + let req: AccountRequest = rouille::input::json_input(request) + .map_err(|e| XrpcError::BadRequest(format!("failed to parse JSON body: {}", e)))?; + let mut srv = srv.lock().unwrap(); + Ok(serde_json::to_value(srv.atp_db.create_account( + &req.username, + &req.password, + &req.email, + )?)?) + } + _ => Err(anyhow!(XrpcError::NotFound(format!( + "XRPC endpoint handler not found: com.atproto.{}", + method + )))), + } +} diff --git a/adenosine-pds/src/mst.rs b/adenosine-pds/src/mst.rs index db5e457..b71cc73 100644 --- a/adenosine-pds/src/mst.rs +++ b/adenosine-pds/src/mst.rs @@ -1,5 +1,5 @@ use crate::load_car_to_blockstore; -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, Context, Result}; use ipfs_sqlite_block_store::BlockStore; use libipld::cbor::DagCborCodec; use libipld::multihash::Code; @@ -61,10 +61,13 @@ struct WipNode { } fn get_mst_node(db: &mut BlockStore, cid: &Cid) -> Result { - let mst_node: MstNode = DagCborCodec.decode( - &db.get_block(cid)? - .ok_or(anyhow!("expected block in store"))?, - )?; + let block = &db + .get_block(cid)? + .ok_or(anyhow!("reading MST node from blockstore"))?; + //println!("{:?}", block); + let mst_node: MstNode = DagCborCodec + .decode(block) + .context("parsing MST DAG-CBOR IPLD node from blockstore")?; Ok(mst_node) } @@ -168,9 +171,9 @@ fn leading_zeros(key: &str) -> u8 { digest.len() as u8 } -fn generate_mst( +pub fn generate_mst( db: &mut BlockStore, - map: &mut BTreeMap, + map: &BTreeMap, ) -> Result { // construct a "WIP" tree let mut root: Option = None; @@ -192,7 +195,12 @@ fn generate_mst( }); } } - serialize_wip_tree(db, root.expect("non-empty MST tree")) + let empty_node = WipNode { + height: 0, + left: None, + entries: vec![], + }; + serialize_wip_tree(db, root.unwrap_or(empty_node)) } fn insert_entry(mut node: WipNode, entry: WipEntry) -> WipNode { diff --git a/adenosine-pds/src/repo.rs b/adenosine-pds/src/repo.rs index b74615b..419d23f 100644 --- a/adenosine-pds/src/repo.rs +++ b/adenosine-pds/src/repo.rs @@ -1,5 +1,6 @@ -use crate::mst::{collect_mst_keys, CommitNode, MetadataNode, RootNode}; -use anyhow::{anyhow, Result}; +use crate::load_car_to_blockstore; +use crate::mst::{collect_mst_keys, generate_mst, CommitNode, MetadataNode, RootNode}; +use anyhow::{anyhow, ensure, Context, Result}; use ipfs_sqlite_block_store::BlockStore; use libipld::cbor::DagCborCodec; use libipld::multihash::Code; @@ -57,18 +58,25 @@ impl RepoStore { } /// Returns CID that was inserted - pub fn put_ipld(&mut self, record: &Ipld) -> Result { + pub fn put_ipld>( + &mut self, + record: &S, + ) -> Result { let block = Block::::encode(DagCborCodec, Code::Sha2_256, record)?; let cid = block.cid().clone(); - self.db.put_block(block, None)?; + self.db + .put_block(block, None) + .context("writing IPLD DAG-CBOR record to blockstore")?; Ok(cid.to_string()) } /// Returns CID that was inserted - pub fn put_blob(&mut self, data: Vec) -> Result { - let block = Block::::encode(libipld::raw::RawCodec, Code::Sha2_256, &data)?; + pub fn put_blob(&mut self, data: &[u8]) -> Result { + let block = Block::::encode(libipld::raw::RawCodec, Code::Sha2_256, data)?; let cid = block.cid().clone(); - self.db.put_block(block, None)?; + self.db + .put_block(block, None) + .context("writing non-record blob to blockstore")?; Ok(cid.to_string()) } @@ -82,26 +90,40 @@ impl RepoStore { pub fn get_commit(&mut self, commit_cid: &str) -> Result { // read records by CID: commit, root, meta - let commit_node: CommitNode = DagCborCodec.decode( - &self - .db - .get_block(&Cid::from_str(commit_cid)?)? - .ok_or(anyhow!("expected commit block in store"))?, - )?; - let root_node: RootNode = DagCborCodec.decode( - &self - .db - .get_block(&commit_node.root)? - .ok_or(anyhow!("expected root block in store"))?, - )?; - let metadata_node: MetadataNode = DagCborCodec.decode( - &self - .db - .get_block(&root_node.meta)? - .ok_or(anyhow!("expected metadata block in store"))?, - )?; - assert_eq!(metadata_node.datastore, "mst"); - assert_eq!(metadata_node.version, 1); + let commit_node: CommitNode = DagCborCodec + .decode( + &self + .db + .get_block(&Cid::from_str(commit_cid)?)? + .ok_or(anyhow!("expected commit block in store"))?, + ) + .context("parsing commit IPLD node from blockstore")?; + let root_node: RootNode = DagCborCodec + .decode( + &self + .db + .get_block(&commit_node.root)? + .ok_or(anyhow!("expected root block in store"))?, + ) + .context("parsing root IPLD node from blockstore")?; + let metadata_node: MetadataNode = DagCborCodec + .decode( + &self + .db + .get_block(&root_node.meta)? + .ok_or(anyhow!("expected metadata block in store"))?, + ) + .context("parsing metadata IPLD node from blockstore")?; + ensure!( + metadata_node.datastore == "mst", + "unexpected repo metadata.datastore: {}", + metadata_node.datastore + ); + ensure!( + metadata_node.version == 1, + "unexpected repo metadata.version: {}", + metadata_node.version + ); Ok(RepoCommit { sig: commit_node.sig, did: metadata_node.did, @@ -110,8 +132,8 @@ impl RepoStore { }) } - pub fn get_record_by_key(&mut self, commit_cid: &str, key: &str) -> Result> { - let map = self.as_map(commit_cid)?; + pub fn get_mst_record_by_key(&mut self, mst_cid: &str, key: &str) -> Result> { + let map = self.mst_to_map(mst_cid)?; if let Some(cid) = map.get(key) { self.get_ipld(&cid.to_string()).map(|v| Some(v)) } else { @@ -119,35 +141,88 @@ impl RepoStore { } } - pub fn write_root(&mut self, did: &str, mst_cid: &str, prev: Option<&str>) -> Result { - unimplemented!() + pub fn get_atp_record( + &mut self, + did: &str, + collection: &str, + tid: &str, + ) -> Result> { + let commit = if let Some(c) = self.lookup_commit(did)? { + self.get_commit(&c)? + } else { + return Ok(None); + }; + let record_key = format!("/{}/{}", collection, tid); + self.get_mst_record_by_key(&commit.mst_cid, &record_key) + } + + pub fn write_metadata(&mut self, did: &str) -> Result { + self.put_ipld(&MetadataNode { + datastore: "mst".to_string(), + did: did.to_string(), + version: 1, + }) + } + + pub fn write_root( + &mut self, + did: &str, + meta_cid: &str, + prev: Option<&str>, + mst_cid: &str, + ) -> Result { + self.put_ipld(&RootNode { + auth_token: None, + // TODO: not unwrap here + prev: prev.map(|s| Cid::from_str(s).unwrap()), + // TODO: not 'metadata'? + meta: Cid::from_str(meta_cid)?, + data: Cid::from_str(mst_cid)?, + }) } pub fn write_commit(&mut self, did: &str, root_cid: &str, sig: &str) -> Result { - // TODO: also update alias to point to new commit - unimplemented!() + let commit_cid = self.put_ipld(&CommitNode { + root: Cid::from_str(root_cid)?, + sig: sig.as_bytes().to_vec().into_boxed_slice(), + })?; + self.db + .alias(did.as_bytes().to_vec(), Some(&Cid::from_str(&commit_cid)?))?; + Ok(commit_cid.to_string()) } - pub fn write_map(&self, map: Result>) -> Result { - unimplemented!() + pub fn mst_from_map(&mut self, map: &BTreeMap) -> Result { + // TODO: not unwrap in iter + let mut cid_map: BTreeMap = BTreeMap::from_iter( + map.iter() + .map(|(k, v)| (k.to_string(), Cid::from_str(&v).unwrap())), + ); + let mst_cid = generate_mst(&mut self.db, &mut cid_map)?; + Ok(mst_cid.to_string()) } - fn as_cid_map(&mut self, commit_cid: &str) -> Result> { - let commit = self.get_commit(commit_cid)?; + fn mst_to_cid_map(&mut self, mst_cid: &str) -> Result> { let mut cid_map: BTreeMap = Default::default(); - let mst_cid = Cid::from_str(&commit.mst_cid)?; - collect_mst_keys(&mut self.db, &mst_cid, &mut cid_map)?; + let mst_cid = Cid::from_str(mst_cid)?; + collect_mst_keys(&mut self.db, &mst_cid, &mut cid_map) + .context("reading repo MST from blockstore")?; Ok(cid_map) } /// Returns all the keys for a directory, as a sorted vec of strings - pub fn as_map(&mut self, commit_cid: &str) -> Result> { - let cid_map = self.as_cid_map(commit_cid)?; + pub fn mst_to_map(&mut self, mst_cid: &str) -> Result> { + let cid_map = self.mst_to_cid_map(mst_cid)?; let ret_map: BTreeMap = BTreeMap::from_iter(cid_map.into_iter().map(|(k, v)| (k, v.to_string()))); Ok(ret_map) } + /// returns the root commit from CAR file + pub fn load_car(&mut self, car_path: &PathBuf) -> Result { + let cid = load_car_to_blockstore(&mut self.db, car_path)?; + Ok(cid.to_string()) + } + /// Exports in CAR format to a Writer /// /// The "from" commit CID feature is not implemented. @@ -160,3 +235,92 @@ impl RepoStore { unimplemented!() } } + +#[test] +fn test_repo_mst() { + use libipld::ipld; + + let mut repo = RepoStore::open_ephemeral().unwrap(); + let did = "did:plc:dummy"; + + // basic blob and IPLD record put/get + let blob = b"beware the swamp thing"; + let blob_cid: String = repo.put_blob(blob).unwrap(); + + let record = ipld!({"some-thing": 123}); + let record_cid: String = repo.put_ipld(&record).unwrap(); + + repo.get_blob(&blob_cid).unwrap().unwrap(); + repo.get_ipld(&record_cid).unwrap(); + + // basic MST get/put + let mut map: BTreeMap = Default::default(); + let empty_map_cid: String = repo.mst_from_map(&map).unwrap(); + assert_eq!(map, repo.mst_to_map(&empty_map_cid).unwrap()); + assert!(repo + .get_mst_record_by_key(&empty_map_cid, "/records/1") + .unwrap() + .is_none()); + + map.insert("/blobs/1".to_string(), blob_cid.clone()); + map.insert("/blobs/2".to_string(), blob_cid.clone()); + map.insert("/records/1".to_string(), record_cid.clone()); + map.insert("/records/2".to_string(), record_cid.clone()); + let simple_map_cid: String = repo.mst_from_map(&map).unwrap(); + assert_eq!(map, repo.mst_to_map(&simple_map_cid).unwrap()); + + // create root and commit IPLD nodes + let meta_cid = repo.write_metadata(did).unwrap(); + let simple_root_cid = repo + .write_root(did, &meta_cid, None, &simple_map_cid) + .unwrap(); + let simple_commit_cid = repo + .write_commit(did, &simple_root_cid, "dummy-sig") + .unwrap(); + assert_eq!( + Some(record.clone()), + repo.get_mst_record_by_key(&simple_map_cid, "/records/1") + .unwrap() + ); + assert_eq!( + Some(record.clone()), + repo.get_atp_record(did, "records", "1").unwrap() + ); + assert!(repo + .get_mst_record_by_key(&simple_map_cid, "/records/3") + .unwrap() + .is_none()); + assert!(repo.get_atp_record(did, "records", "3").unwrap().is_none()); + assert_eq!( + Some(simple_commit_cid.clone()), + repo.lookup_commit(did).unwrap() + ); + + map.insert("/records/3".to_string(), record_cid.clone()); + let simple3_map_cid: String = repo.mst_from_map(&map).unwrap(); + let simple3_root_cid = repo + .write_root(did, &meta_cid, Some(&simple_commit_cid), &simple3_map_cid) + .unwrap(); + let simple3_commit_cid = repo + .write_commit(did, &simple3_root_cid, "dummy-sig3") + .unwrap(); + assert_eq!(map, repo.mst_to_map(&simple3_map_cid).unwrap()); + assert_eq!( + Some(record.clone()), + repo.get_mst_record_by_key(&simple3_map_cid, "/records/3") + .unwrap() + ); + assert_eq!( + Some(record.clone()), + repo.get_atp_record(did, "records", "3").unwrap() + ); + let commit = repo.get_commit(&simple3_commit_cid).unwrap(); + assert_eq!(commit.sig.to_vec(), b"dummy-sig3".to_vec()); + assert_eq!(commit.did, did); + assert_eq!(commit.prev, Some(simple_commit_cid)); + assert_eq!(commit.mst_cid, simple3_map_cid); + assert_eq!( + Some(simple3_commit_cid.clone()), + repo.lookup_commit(did).unwrap() + ); +} -- cgit v1.2.3