From 9c8aa3d684575b5b5f169b8f6aca75919283d251 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Mon, 7 Nov 2022 17:20:28 -0800 Subject: pds: implement CAR import/export at repo level --- adenosine-pds/src/bin/adenosine-pds.rs | 4 +- adenosine-pds/src/car.rs | 86 ++++++++++++++++++++++++---------- adenosine-pds/src/lib.rs | 50 +++++++++++++++----- adenosine-pds/src/mst.rs | 48 ------------------- adenosine-pds/src/repo.rs | 74 +++++++++++++++++++++++++---- adenosine-pds/src/web.rs | 7 ++- 6 files changed, 169 insertions(+), 100 deletions(-) (limited to 'adenosine-pds/src') diff --git a/adenosine-pds/src/bin/adenosine-pds.rs b/adenosine-pds/src/bin/adenosine-pds.rs index beb423a..f654dc1 100644 --- a/adenosine-pds/src/bin/adenosine-pds.rs +++ b/adenosine-pds/src/bin/adenosine-pds.rs @@ -105,7 +105,9 @@ fn main() -> Result<()> { } // TODO: handle alias Command::Import { car_path, alias } => { - load_car_to_sqlite(&opt.blockstore_db_path, &car_path, &alias) + let mut repo = RepoStore::open(&opt.blockstore_db_path)?; + repo.import_car_path(&car_path, Some(alias))?; + Ok(()) } Command::Inspect {} => mst::dump_mst_keys(&opt.blockstore_db_path), Command::GenerateSecret {} => { diff --git a/adenosine-pds/src/car.rs b/adenosine-pds/src/car.rs index 43b4e1f..832c87e 100644 --- a/adenosine-pds/src/car.rs +++ b/adenosine-pds/src/car.rs @@ -1,50 +1,76 @@ use anyhow::Result; -use crate::vendored::iroh_car::CarReader; +use crate::vendored::iroh_car::{CarHeader, CarReader, CarWriter}; use futures::TryStreamExt; use ipfs_sqlite_block_store::BlockStore; use libipld::{Block, Cid}; use std::path::PathBuf; use tokio::fs::File; -use tokio::io::BufReader; +use tokio::io::{AsyncRead, BufReader}; -pub fn load_car_to_sqlite(db_path: &PathBuf, car_path: &PathBuf, alias: &str) -> Result<()> { - let mut db: BlockStore = - { BlockStore::open(db_path, Default::default())? }; - - load_car_to_blockstore(&mut db, car_path, alias)?; - Ok(()) +/// Synchronous wrapper for loading in-memory CAR bytes (`&[u8]`) into a blockstore. +/// +/// Does not do any pinning, even temporarily. Returns the root CID indicated in the CAR file +/// header. +pub fn load_car_bytes_to_blockstore( + db: &mut BlockStore, + car_bytes: &[u8], +) -> Result { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()?; + rt.block_on(inner_car_bytes_loader(db, car_bytes)) } -pub fn load_car_to_blockstore( +/// Synchronous wrapper for loading on-disk CAR file (by path) into a blockstore. +/// +/// Does not do any pinning, even temporarily. Returns the root CID indicated in the CAR file +/// header. +pub fn load_car_path_to_blockstore( db: &mut BlockStore, car_path: &PathBuf, - alias: &str, ) -> Result { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build()?; - rt.block_on(inner_car_loader(db, car_path, alias)) + rt.block_on(inner_car_path_loader(db, car_path)) +} + +pub fn read_car_bytes_from_blockstore( + db: &mut BlockStore, + root: &Cid, +) -> Result> { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()?; + rt.block_on(inner_car_bytes_reader(db, root)) } -// this async function is wrapped in the sync version above -async fn inner_car_loader( +async fn inner_car_bytes_loader( + db: &mut BlockStore, + car_bytes: &[u8], +) -> Result { + let car_reader = CarReader::new(car_bytes).await?; + inner_car_loader(db, car_reader).await +} + +async fn inner_car_path_loader( db: &mut BlockStore, car_path: &PathBuf, - alias: &str, ) -> Result { - println!( - "{} - {}", - std::env::current_dir()?.display(), - car_path.display() - ); let car_reader = { let file = File::open(car_path).await?; let buf_reader = BufReader::new(file); CarReader::new(buf_reader).await? }; - let car_header = car_reader.header().clone(); + inner_car_loader(db, car_reader).await +} +async fn inner_car_loader( + db: &mut BlockStore, + car_reader: CarReader, +) -> Result { + let car_header = car_reader.header().clone(); car_reader .stream() .try_for_each(|(cid, raw)| { @@ -54,11 +80,21 @@ async fn inner_car_loader( futures::future::ready(Ok(())) }) .await?; + Ok(car_header.roots()[0]) +} - // pin the header (?) - if !car_header.roots().is_empty() { - db.alias(alias.as_bytes(), Some(&car_header.roots()[0]))?; - } +async fn inner_car_bytes_reader( + db: &mut BlockStore, + root: &Cid, +) -> Result> { + let car_header = CarHeader::new_v1(vec![root.clone()]); + let buf: Vec = Default::default(); + let mut car_writer = CarWriter::new(car_header, buf); - Ok(car_header.roots()[0]) + let cid_list = db.get_descendants::>(root)?; + for cid in cid_list { + let block = db.get_block(&cid)?.expect("block content"); + car_writer.write(cid, block).await?; + } + Ok(car_writer.finish().await?) } diff --git a/adenosine-pds/src/lib.rs b/adenosine-pds/src/lib.rs index ee281e1..009175d 100644 --- a/adenosine-pds/src/lib.rs +++ b/adenosine-pds/src/lib.rs @@ -1,5 +1,6 @@ use adenosine_cli::identifiers::{Did, Nsid, Tid, TidLord}; use anyhow::{anyhow, Result}; +use askama::Template; use libipld::Cid; use libipld::Ipld; use log::{debug, error, info, warn}; @@ -10,7 +11,6 @@ use std::fmt; use std::path::PathBuf; use std::str::FromStr; use std::sync::Mutex; -use askama::Template; mod car; mod crypto; @@ -23,7 +23,6 @@ mod ucan_p256; mod vendored; mod web; -pub use car::{load_car_to_blockstore, load_car_to_sqlite}; pub use crypto::{KeyPair, PubKey}; pub use db::AtpDatabase; pub use models::*; @@ -90,7 +89,10 @@ fn web_wrap(resp: Result) -> Response { None => 500, }; warn!("HTTP {}: {}", code, msg); - Response::html(format!("

{}

{}", code, msg)) + Response::html(format!( + "

{}

{}", + code, msg + )) } } } @@ -521,10 +523,17 @@ fn profile_handler(srv: &Mutex, did: &str, _request: &Request) -> Re did: did, profile: json!({}), feed: vec![], - }.render()?) + } + .render()?) } -fn post_handler(srv: &Mutex, did: &str, collection: &str, tid: &str, _request: &Request) -> Result { +fn post_handler( + srv: &Mutex, + did: &str, + collection: &str, + tid: &str, + _request: &Request, +) -> Result { let did = Did::from_str(did)?; let collection = Nsid::from_str(collection)?; let rkey = Tid::from_str(tid)?; @@ -534,7 +543,8 @@ fn post_handler(srv: &Mutex, did: &str, collection: &str, tid: &str, // TODO: format as JSON, not text debug Ok(Some(ipld)) => ipld_into_json_value(ipld), Ok(None) => Err(anyhow!(XrpcError::NotFound(format!( - "could not find record: /{}/{}", collection, rkey + "could not find record: /{}/{}", + collection, rkey ))))?, Err(e) => Err(e)?, }; @@ -546,7 +556,8 @@ fn post_handler(srv: &Mutex, did: &str, collection: &str, tid: &str, tid: rkey, post_text: post["text"].as_str().unwrap().to_string(), // TODO: unwrap post_created_at: "some-time".to_string(), - }.render()?) + } + .render()?) } fn repo_handler(srv: &Mutex, did: &str, _request: &Request) -> Result { @@ -570,10 +581,16 @@ fn repo_handler(srv: &Mutex, did: &str, _request: &Request) -> Resul did: did, commit: commit, describe: desc, - }.render()?) + } + .render()?) } -fn collection_handler(srv: &Mutex, did: &str, collection: &str, _request: &Request) -> Result { +fn collection_handler( + srv: &Mutex, + did: &str, + collection: &str, + _request: &Request, +) -> Result { let did = Did::from_str(did)?; let collection = Nsid::from_str(collection)?; @@ -601,11 +618,17 @@ fn collection_handler(srv: &Mutex, did: &str, collection: &str, _req did: did, collection: collection, records: record_list, - }.render()?) + } + .render()?) } -fn record_handler(srv: &Mutex, did: &str, collection: &str, tid: &str, _request: &Request) -> Result { - +fn record_handler( + srv: &Mutex, + did: &str, + collection: &str, + tid: &str, + _request: &Request, +) -> Result { let did = Did::from_str(did)?; let collection = Nsid::from_str(collection)?; let rkey = Tid::from_str(tid)?; @@ -626,5 +649,6 @@ fn record_handler(srv: &Mutex, did: &str, collection: &str, tid: &st collection, tid: rkey, record, - }.render()?) + } + .render()?) } diff --git a/adenosine-pds/src/mst.rs b/adenosine-pds/src/mst.rs index 324d139..969f584 100644 --- a/adenosine-pds/src/mst.rs +++ b/adenosine-pds/src/mst.rs @@ -1,4 +1,3 @@ -use crate::load_car_to_blockstore; use anyhow::{anyhow, Context, Result}; use ipfs_sqlite_block_store::BlockStore; use libipld::cbor::DagCborCodec; @@ -297,50 +296,3 @@ fn serialize_wip_tree( db.put_block(block, None)?; Ok(cid) } - -pub fn repro_mst(car_path: &PathBuf) -> Result<()> { - // open a temp block store - let mut db: BlockStore = - { BlockStore::open_path(ipfs_sqlite_block_store::DbPath::Memory, Default::default())? }; - - // load CAR contents from file - load_car_to_blockstore(&mut db, car_path, "repro-import")?; - - let all_aliases: Vec<(Vec, Cid)> = db.aliases()?; - if all_aliases.is_empty() { - error!("expected at least one alias in block store"); - std::process::exit(-1); - } - let (_alias, commit_cid) = all_aliases[0].clone(); - - let commit_node: CommitNode = DagCborCodec.decode( - &db.get_block(&commit_cid)? - .ok_or(anyhow!("expected commit block in store"))?, - )?; - let root_node: RootNode = DagCborCodec.decode( - &db.get_block(&commit_node.root)? - .ok_or(anyhow!("expected root block in store"))?, - )?; - let _metadata_node: MetadataNode = DagCborCodec.decode( - &db.get_block(&root_node.meta)? - .ok_or(anyhow!("expected metadata block in store"))?, - )?; - - // collect key/value sorted map of string/cid, as BTree - let mut repo_map: BTreeMap = BTreeMap::new(); - collect_mst_keys(&mut db, &root_node.data, &mut repo_map)?; - - // now re-generate nodes - let updated = generate_mst(&mut db, &repo_map)?; - - info!("original root: {}", root_node.data); - info!("regenerated : {}", updated); - if root_node.data == updated { - Ok(()) - } else { - println!("FAILED"); - let a = get_mst_node(&mut db, &root_node.data)?; - let b = get_mst_node(&mut db, &updated)?; - Err(anyhow!("FAILED to reproduce MST: {:?} != {:?}", a, b)) - } -} diff --git a/adenosine-pds/src/repo.rs b/adenosine-pds/src/repo.rs index 7b6f7e6..1b24be8 100644 --- a/adenosine-pds/src/repo.rs +++ b/adenosine-pds/src/repo.rs @@ -1,5 +1,8 @@ +use crate::car::{ + load_car_bytes_to_blockstore, load_car_path_to_blockstore, read_car_bytes_from_blockstore, +}; use crate::mst::{collect_mst_keys, generate_mst, CommitNode, MetadataNode, RootNode}; -use crate::{load_car_to_blockstore, KeyPair}; +use crate::KeyPair; use adenosine_cli::identifiers::{Did, Nsid, Tid}; use anyhow::{anyhow, ensure, Context, Result}; use ipfs_sqlite_block_store::BlockStore; @@ -18,6 +21,7 @@ use std::str::FromStr; pub struct RepoCommit { pub sig: Box<[u8]>, pub commit_cid: Cid, + pub root_cid: Cid, pub did: Did, pub prev: Option, pub meta_cid: Cid, @@ -130,6 +134,7 @@ impl RepoStore { Ok(RepoCommit { sig: commit_node.sig, commit_cid: commit_cid.clone(), + root_cid: commit_node.root.clone(), meta_cid: root_node.meta, did: Did::from_str(&metadata_node.did)?, prev: root_node.prev, @@ -259,22 +264,73 @@ impl RepoStore { self.write_commit(&did, new_root_cid, &sig) } - /// returns the root commit from CAR file - pub fn load_car(&mut self, car_path: &PathBuf, alias: &str) -> Result { - let cid = load_car_to_blockstore(&mut self.db, car_path, alias)?; + /// Reads in a full MST tree starting at a repo commit, then re-builds and re-writes the tree + /// in to the repo, and verifies that both the MST root CIDs and the repo root CIDs are identical. + pub fn verify_repo_mst(&mut self, commit_cid: &Cid) -> Result<()> { + // load existing commit and MST tree + let existing_commit = self.get_commit(commit_cid)?; + let repo_map = self.mst_to_map(&existing_commit.mst_cid)?; + + // write MST tree, and verify root CID + let new_mst_cid = self.mst_from_map(&repo_map)?; + if new_mst_cid != existing_commit.mst_cid { + Err(anyhow!( + "MST root CID did not verify: {} != {}", + existing_commit.mst_cid, + new_mst_cid + ))?; + } + + let new_root_cid = + self.write_root(existing_commit.meta_cid, existing_commit.prev, new_mst_cid)?; + if new_root_cid != existing_commit.root_cid { + Err(anyhow!( + "repo root CID did not verify: {} != {}", + existing_commit.root_cid, + new_root_cid + ))?; + } + + Ok(()) + } + + /// Import blocks from a CAR file in memory, optionally setting an alias pointing to the input + /// (eg, a DID identifier). + /// + /// Does not currently do any validation of, eg, signatures. It is naive and incomplete to use + /// this to simply import CAR content from users, remote servers, etc. + /// + /// Returns the root commit from the CAR file, which may or may not actually be a "commit" + /// block. + pub fn import_car_bytes(&mut self, car_bytes: &[u8], alias: Option) -> Result { + let cid = load_car_bytes_to_blockstore(&mut self.db, car_bytes)?; + self.verify_repo_mst(&cid)?; + if let Some(alias) = alias { + self.db.alias(alias.as_bytes().to_vec(), Some(&cid))?; + } + Ok(cid) + } + + /// Similar to import_car_bytes(), but reads from a local file on disk instead of from memory. + pub fn import_car_path(&mut self, car_path: &PathBuf, alias: Option) -> Result { + let cid = load_car_path_to_blockstore(&mut self.db, car_path)?; + self.verify_repo_mst(&cid)?; + if let Some(alias) = alias { + self.db.alias(alias.as_bytes().to_vec(), Some(&cid))?; + } Ok(cid) } /// Exports in CAR format to a Writer /// /// The "from" commit CID feature is not implemented. - pub fn write_car( + pub fn export_car( &mut self, - _did: &Did, + commit_cid: &Cid, _from_commit_cid: Option<&Cid>, - _out: &mut W, - ) -> Result<()> { - unimplemented!() + ) -> Result> { + // TODO: from_commit_cid + read_car_bytes_from_blockstore(&mut self.db, &commit_cid) } } diff --git a/adenosine-pds/src/web.rs b/adenosine-pds/src/web.rs index 81e62be..e783b5a 100644 --- a/adenosine-pds/src/web.rs +++ b/adenosine-pds/src/web.rs @@ -1,9 +1,8 @@ - +use crate::models::*; +use crate::repo::RepoCommit; use adenosine_cli::identifiers::{Did, Nsid, Tid}; -use serde_json; use askama::Template; -use crate::repo::RepoCommit; -use crate::models::*; +use serde_json; #[derive(Template)] #[template(path = "home.html")] -- cgit v1.2.3