diff options
| author | Bryan Newbold <bnewbold@robocracy.org> | 2022-11-07 17:20:28 -0800 | 
|---|---|---|
| committer | Bryan Newbold <bnewbold@robocracy.org> | 2022-11-07 17:20:28 -0800 | 
| commit | 9c8aa3d684575b5b5f169b8f6aca75919283d251 (patch) | |
| tree | 58bc8ca8407b18e5d708d3b68c1f506e29e88be4 /adenosine-pds/src | |
| parent | 2f414df00e378728701e4061cdb3bebad5df798a (diff) | |
| download | adenosine-9c8aa3d684575b5b5f169b8f6aca75919283d251.tar.gz adenosine-9c8aa3d684575b5b5f169b8f6aca75919283d251.zip  | |
pds: implement CAR import/export at repo level
Diffstat (limited to 'adenosine-pds/src')
| -rw-r--r-- | adenosine-pds/src/bin/adenosine-pds.rs | 4 | ||||
| -rw-r--r-- | adenosine-pds/src/car.rs | 86 | ||||
| -rw-r--r-- | adenosine-pds/src/lib.rs | 50 | ||||
| -rw-r--r-- | adenosine-pds/src/mst.rs | 48 | ||||
| -rw-r--r-- | adenosine-pds/src/repo.rs | 74 | ||||
| -rw-r--r-- | adenosine-pds/src/web.rs | 7 | 
6 files changed, 169 insertions, 100 deletions
diff --git a/adenosine-pds/src/bin/adenosine-pds.rs b/adenosine-pds/src/bin/adenosine-pds.rs index beb423a..f654dc1 100644 --- a/adenosine-pds/src/bin/adenosine-pds.rs +++ b/adenosine-pds/src/bin/adenosine-pds.rs @@ -105,7 +105,9 @@ fn main() -> Result<()> {          }          // TODO: handle alias          Command::Import { car_path, alias } => { -            load_car_to_sqlite(&opt.blockstore_db_path, &car_path, &alias) +            let mut repo = RepoStore::open(&opt.blockstore_db_path)?; +            repo.import_car_path(&car_path, Some(alias))?; +            Ok(())          }          Command::Inspect {} => mst::dump_mst_keys(&opt.blockstore_db_path),          Command::GenerateSecret {} => { diff --git a/adenosine-pds/src/car.rs b/adenosine-pds/src/car.rs index 43b4e1f..832c87e 100644 --- a/adenosine-pds/src/car.rs +++ b/adenosine-pds/src/car.rs @@ -1,50 +1,76 @@  use anyhow::Result; -use crate::vendored::iroh_car::CarReader; +use crate::vendored::iroh_car::{CarHeader, CarReader, CarWriter};  use futures::TryStreamExt;  use ipfs_sqlite_block_store::BlockStore;  use libipld::{Block, Cid};  use std::path::PathBuf;  use tokio::fs::File; -use tokio::io::BufReader; +use tokio::io::{AsyncRead, BufReader}; -pub fn load_car_to_sqlite(db_path: &PathBuf, car_path: &PathBuf, alias: &str) -> Result<()> { -    let mut db: BlockStore<libipld::DefaultParams> = -        { BlockStore::open(db_path, Default::default())? }; - -    load_car_to_blockstore(&mut db, car_path, alias)?; -    Ok(()) +/// Synchronous wrapper for loading in-memory CAR bytes (`&[u8]`) into a blockstore. +/// +/// Does not do any pinning, even temporarily. Returns the root CID indicated in the CAR file +/// header. +pub fn load_car_bytes_to_blockstore( +    db: &mut BlockStore<libipld::DefaultParams>, +    car_bytes: &[u8], +) -> Result<Cid> { +    let rt = tokio::runtime::Builder::new_current_thread() +        .enable_all() +        .build()?; +    rt.block_on(inner_car_bytes_loader(db, car_bytes))  } -pub fn load_car_to_blockstore( +/// Synchronous wrapper for loading on-disk CAR file (by path) into a blockstore. +/// +/// Does not do any pinning, even temporarily. Returns the root CID indicated in the CAR file +/// header. +pub fn load_car_path_to_blockstore(      db: &mut BlockStore<libipld::DefaultParams>,      car_path: &PathBuf, -    alias: &str,  ) -> Result<Cid> {      let rt = tokio::runtime::Builder::new_current_thread()          .enable_all()          .build()?; -    rt.block_on(inner_car_loader(db, car_path, alias)) +    rt.block_on(inner_car_path_loader(db, car_path)) +} + +pub fn read_car_bytes_from_blockstore( +    db: &mut BlockStore<libipld::DefaultParams>, +    root: &Cid, +) -> Result<Vec<u8>> { +    let rt = tokio::runtime::Builder::new_current_thread() +        .enable_all() +        .build()?; +    rt.block_on(inner_car_bytes_reader(db, root))  } -// this async function is wrapped in the sync version above -async fn inner_car_loader( +async fn inner_car_bytes_loader( +    db: &mut BlockStore<libipld::DefaultParams>, +    car_bytes: &[u8], +) -> Result<Cid> { +    let car_reader = CarReader::new(car_bytes).await?; +    inner_car_loader(db, car_reader).await +} + +async fn inner_car_path_loader(      db: &mut BlockStore<libipld::DefaultParams>,      car_path: &PathBuf, -    alias: &str,  ) -> Result<Cid> { -    println!( -        "{} - {}", -        std::env::current_dir()?.display(), -        car_path.display() -    );      let car_reader = {          let file = File::open(car_path).await?;          let buf_reader = BufReader::new(file);          CarReader::new(buf_reader).await?      }; -    let car_header = car_reader.header().clone(); +    inner_car_loader(db, car_reader).await +} +async fn inner_car_loader<R: AsyncRead + Send + Unpin>( +    db: &mut BlockStore<libipld::DefaultParams>, +    car_reader: CarReader<R>, +) -> Result<Cid> { +    let car_header = car_reader.header().clone();      car_reader          .stream()          .try_for_each(|(cid, raw)| { @@ -54,11 +80,21 @@ async fn inner_car_loader(              futures::future::ready(Ok(()))          })          .await?; +    Ok(car_header.roots()[0]) +} -    // pin the header (?) -    if !car_header.roots().is_empty() { -        db.alias(alias.as_bytes(), Some(&car_header.roots()[0]))?; -    } +async fn inner_car_bytes_reader( +    db: &mut BlockStore<libipld::DefaultParams>, +    root: &Cid, +) -> Result<Vec<u8>> { +    let car_header = CarHeader::new_v1(vec![root.clone()]); +    let buf: Vec<u8> = Default::default(); +    let mut car_writer = CarWriter::new(car_header, buf); -    Ok(car_header.roots()[0]) +    let cid_list = db.get_descendants::<Vec<_>>(root)?; +    for cid in cid_list { +        let block = db.get_block(&cid)?.expect("block content"); +        car_writer.write(cid, block).await?; +    } +    Ok(car_writer.finish().await?)  } diff --git a/adenosine-pds/src/lib.rs b/adenosine-pds/src/lib.rs index ee281e1..009175d 100644 --- a/adenosine-pds/src/lib.rs +++ b/adenosine-pds/src/lib.rs @@ -1,5 +1,6 @@  use adenosine_cli::identifiers::{Did, Nsid, Tid, TidLord};  use anyhow::{anyhow, Result}; +use askama::Template;  use libipld::Cid;  use libipld::Ipld;  use log::{debug, error, info, warn}; @@ -10,7 +11,6 @@ use std::fmt;  use std::path::PathBuf;  use std::str::FromStr;  use std::sync::Mutex; -use askama::Template;  mod car;  mod crypto; @@ -23,7 +23,6 @@ mod ucan_p256;  mod vendored;  mod web; -pub use car::{load_car_to_blockstore, load_car_to_sqlite};  pub use crypto::{KeyPair, PubKey};  pub use db::AtpDatabase;  pub use models::*; @@ -90,7 +89,10 @@ fn web_wrap(resp: Result<String>) -> Response {                  None => 500,              };              warn!("HTTP {}: {}", code, msg); -            Response::html(format!("<html><body><h1>{}</h1><p>{}</body></html>", code, msg)) +            Response::html(format!( +                "<html><body><h1>{}</h1><p>{}</body></html>", +                code, msg +            ))          }      }  } @@ -521,10 +523,17 @@ fn profile_handler(srv: &Mutex<AtpService>, did: &str, _request: &Request) -> Re          did: did,          profile: json!({}),          feed: vec![], -    }.render()?) +    } +    .render()?)  } -fn post_handler(srv: &Mutex<AtpService>, did: &str, collection: &str, tid: &str, _request: &Request) -> Result<String> { +fn post_handler( +    srv: &Mutex<AtpService>, +    did: &str, +    collection: &str, +    tid: &str, +    _request: &Request, +) -> Result<String> {      let did = Did::from_str(did)?;      let collection = Nsid::from_str(collection)?;      let rkey = Tid::from_str(tid)?; @@ -534,7 +543,8 @@ fn post_handler(srv: &Mutex<AtpService>, did: &str, collection: &str, tid: &str,          // TODO: format as JSON, not text debug          Ok(Some(ipld)) => ipld_into_json_value(ipld),          Ok(None) => Err(anyhow!(XrpcError::NotFound(format!( -            "could not find record: /{}/{}", collection, rkey +            "could not find record: /{}/{}", +            collection, rkey          ))))?,          Err(e) => Err(e)?,      }; @@ -546,7 +556,8 @@ fn post_handler(srv: &Mutex<AtpService>, did: &str, collection: &str, tid: &str,          tid: rkey,          post_text: post["text"].as_str().unwrap().to_string(), // TODO: unwrap          post_created_at: "some-time".to_string(), -    }.render()?) +    } +    .render()?)  }  fn repo_handler(srv: &Mutex<AtpService>, did: &str, _request: &Request) -> Result<String> { @@ -570,10 +581,16 @@ fn repo_handler(srv: &Mutex<AtpService>, did: &str, _request: &Request) -> Resul          did: did,          commit: commit,          describe: desc, -    }.render()?) +    } +    .render()?)  } -fn collection_handler(srv: &Mutex<AtpService>, did: &str, collection: &str, _request: &Request) -> Result<String> { +fn collection_handler( +    srv: &Mutex<AtpService>, +    did: &str, +    collection: &str, +    _request: &Request, +) -> Result<String> {      let did = Did::from_str(did)?;      let collection = Nsid::from_str(collection)?; @@ -601,11 +618,17 @@ fn collection_handler(srv: &Mutex<AtpService>, did: &str, collection: &str, _req          did: did,          collection: collection,          records: record_list, -    }.render()?) +    } +    .render()?)  } -fn record_handler(srv: &Mutex<AtpService>, did: &str, collection: &str, tid: &str, _request: &Request) -> Result<String> { - +fn record_handler( +    srv: &Mutex<AtpService>, +    did: &str, +    collection: &str, +    tid: &str, +    _request: &Request, +) -> Result<String> {      let did = Did::from_str(did)?;      let collection = Nsid::from_str(collection)?;      let rkey = Tid::from_str(tid)?; @@ -626,5 +649,6 @@ fn record_handler(srv: &Mutex<AtpService>, did: &str, collection: &str, tid: &st          collection,          tid: rkey,          record, -    }.render()?) +    } +    .render()?)  } diff --git a/adenosine-pds/src/mst.rs b/adenosine-pds/src/mst.rs index 324d139..969f584 100644 --- a/adenosine-pds/src/mst.rs +++ b/adenosine-pds/src/mst.rs @@ -1,4 +1,3 @@ -use crate::load_car_to_blockstore;  use anyhow::{anyhow, Context, Result};  use ipfs_sqlite_block_store::BlockStore;  use libipld::cbor::DagCborCodec; @@ -297,50 +296,3 @@ fn serialize_wip_tree(      db.put_block(block, None)?;      Ok(cid)  } - -pub fn repro_mst(car_path: &PathBuf) -> Result<()> { -    // open a temp block store -    let mut db: BlockStore<libipld::DefaultParams> = -        { BlockStore::open_path(ipfs_sqlite_block_store::DbPath::Memory, Default::default())? }; - -    // load CAR contents from file -    load_car_to_blockstore(&mut db, car_path, "repro-import")?; - -    let all_aliases: Vec<(Vec<u8>, Cid)> = db.aliases()?; -    if all_aliases.is_empty() { -        error!("expected at least one alias in block store"); -        std::process::exit(-1); -    } -    let (_alias, commit_cid) = all_aliases[0].clone(); - -    let commit_node: CommitNode = DagCborCodec.decode( -        &db.get_block(&commit_cid)? -            .ok_or(anyhow!("expected commit block in store"))?, -    )?; -    let root_node: RootNode = DagCborCodec.decode( -        &db.get_block(&commit_node.root)? -            .ok_or(anyhow!("expected root block in store"))?, -    )?; -    let _metadata_node: MetadataNode = DagCborCodec.decode( -        &db.get_block(&root_node.meta)? -            .ok_or(anyhow!("expected metadata block in store"))?, -    )?; - -    // collect key/value sorted map of string/cid, as BTree -    let mut repo_map: BTreeMap<String, Cid> = BTreeMap::new(); -    collect_mst_keys(&mut db, &root_node.data, &mut repo_map)?; - -    // now re-generate nodes -    let updated = generate_mst(&mut db, &repo_map)?; - -    info!("original root: {}", root_node.data); -    info!("regenerated  : {}", updated); -    if root_node.data == updated { -        Ok(()) -    } else { -        println!("FAILED"); -        let a = get_mst_node(&mut db, &root_node.data)?; -        let b = get_mst_node(&mut db, &updated)?; -        Err(anyhow!("FAILED to reproduce MST: {:?} != {:?}", a, b)) -    } -} diff --git a/adenosine-pds/src/repo.rs b/adenosine-pds/src/repo.rs index 7b6f7e6..1b24be8 100644 --- a/adenosine-pds/src/repo.rs +++ b/adenosine-pds/src/repo.rs @@ -1,5 +1,8 @@ +use crate::car::{ +    load_car_bytes_to_blockstore, load_car_path_to_blockstore, read_car_bytes_from_blockstore, +};  use crate::mst::{collect_mst_keys, generate_mst, CommitNode, MetadataNode, RootNode}; -use crate::{load_car_to_blockstore, KeyPair}; +use crate::KeyPair;  use adenosine_cli::identifiers::{Did, Nsid, Tid};  use anyhow::{anyhow, ensure, Context, Result};  use ipfs_sqlite_block_store::BlockStore; @@ -18,6 +21,7 @@ use std::str::FromStr;  pub struct RepoCommit {      pub sig: Box<[u8]>,      pub commit_cid: Cid, +    pub root_cid: Cid,      pub did: Did,      pub prev: Option<Cid>,      pub meta_cid: Cid, @@ -130,6 +134,7 @@ impl RepoStore {          Ok(RepoCommit {              sig: commit_node.sig,              commit_cid: commit_cid.clone(), +            root_cid: commit_node.root.clone(),              meta_cid: root_node.meta,              did: Did::from_str(&metadata_node.did)?,              prev: root_node.prev, @@ -259,22 +264,73 @@ impl RepoStore {          self.write_commit(&did, new_root_cid, &sig)      } -    /// returns the root commit from CAR file -    pub fn load_car(&mut self, car_path: &PathBuf, alias: &str) -> Result<Cid> { -        let cid = load_car_to_blockstore(&mut self.db, car_path, alias)?; +    /// Reads in a full MST tree starting at a repo commit, then re-builds and re-writes the tree +    /// in to the repo, and verifies that both the MST root CIDs and the repo root CIDs are identical. +    pub fn verify_repo_mst(&mut self, commit_cid: &Cid) -> Result<()> { +        // load existing commit and MST tree +        let existing_commit = self.get_commit(commit_cid)?; +        let repo_map = self.mst_to_map(&existing_commit.mst_cid)?; + +        // write MST tree, and verify root CID +        let new_mst_cid = self.mst_from_map(&repo_map)?; +        if new_mst_cid != existing_commit.mst_cid { +            Err(anyhow!( +                "MST root CID did not verify: {} != {}", +                existing_commit.mst_cid, +                new_mst_cid +            ))?; +        } + +        let new_root_cid = +            self.write_root(existing_commit.meta_cid, existing_commit.prev, new_mst_cid)?; +        if new_root_cid != existing_commit.root_cid { +            Err(anyhow!( +                "repo root CID did not verify: {} != {}", +                existing_commit.root_cid, +                new_root_cid +            ))?; +        } + +        Ok(()) +    } + +    /// Import blocks from a CAR file in memory, optionally setting an alias pointing to the input +    /// (eg, a DID identifier). +    /// +    /// Does not currently do any validation of, eg, signatures. It is naive and incomplete to use +    /// this to simply import CAR content from users, remote servers, etc. +    /// +    /// Returns the root commit from the CAR file, which may or may not actually be a "commit" +    /// block. +    pub fn import_car_bytes(&mut self, car_bytes: &[u8], alias: Option<String>) -> Result<Cid> { +        let cid = load_car_bytes_to_blockstore(&mut self.db, car_bytes)?; +        self.verify_repo_mst(&cid)?; +        if let Some(alias) = alias { +            self.db.alias(alias.as_bytes().to_vec(), Some(&cid))?; +        } +        Ok(cid) +    } + +    /// Similar to import_car_bytes(), but reads from a local file on disk instead of from memory. +    pub fn import_car_path(&mut self, car_path: &PathBuf, alias: Option<String>) -> Result<Cid> { +        let cid = load_car_path_to_blockstore(&mut self.db, car_path)?; +        self.verify_repo_mst(&cid)?; +        if let Some(alias) = alias { +            self.db.alias(alias.as_bytes().to_vec(), Some(&cid))?; +        }          Ok(cid)      }      /// Exports in CAR format to a Writer      ///      /// The "from" commit CID feature is not implemented. -    pub fn write_car<W: std::io::Write>( +    pub fn export_car(          &mut self, -        _did: &Did, +        commit_cid: &Cid,          _from_commit_cid: Option<&Cid>, -        _out: &mut W, -    ) -> Result<()> { -        unimplemented!() +    ) -> Result<Vec<u8>> { +        // TODO: from_commit_cid +        read_car_bytes_from_blockstore(&mut self.db, &commit_cid)      }  } diff --git a/adenosine-pds/src/web.rs b/adenosine-pds/src/web.rs index 81e62be..e783b5a 100644 --- a/adenosine-pds/src/web.rs +++ b/adenosine-pds/src/web.rs @@ -1,9 +1,8 @@ - +use crate::models::*; +use crate::repo::RepoCommit;  use adenosine_cli::identifiers::{Did, Nsid, Tid}; -use serde_json;  use askama::Template; -use crate::repo::RepoCommit; -use crate::models::*; +use serde_json;  #[derive(Template)]  #[template(path = "home.html")]  | 
