aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2022-11-07 17:20:28 -0800
committerBryan Newbold <bnewbold@robocracy.org>2022-11-07 17:20:28 -0800
commit9c8aa3d684575b5b5f169b8f6aca75919283d251 (patch)
tree58bc8ca8407b18e5d708d3b68c1f506e29e88be4
parent2f414df00e378728701e4061cdb3bebad5df798a (diff)
downloadadenosine-9c8aa3d684575b5b5f169b8f6aca75919283d251.tar.gz
adenosine-9c8aa3d684575b5b5f169b8f6aca75919283d251.zip
pds: implement CAR import/export at repo level
-rw-r--r--adenosine-pds/src/bin/adenosine-pds.rs4
-rw-r--r--adenosine-pds/src/car.rs86
-rw-r--r--adenosine-pds/src/lib.rs50
-rw-r--r--adenosine-pds/src/mst.rs48
-rw-r--r--adenosine-pds/src/repo.rs74
-rw-r--r--adenosine-pds/src/web.rs7
-rw-r--r--adenosine-pds/tests/test_repro_mst.rs23
7 files changed, 189 insertions, 103 deletions
diff --git a/adenosine-pds/src/bin/adenosine-pds.rs b/adenosine-pds/src/bin/adenosine-pds.rs
index beb423a..f654dc1 100644
--- a/adenosine-pds/src/bin/adenosine-pds.rs
+++ b/adenosine-pds/src/bin/adenosine-pds.rs
@@ -105,7 +105,9 @@ fn main() -> Result<()> {
}
// TODO: handle alias
Command::Import { car_path, alias } => {
- load_car_to_sqlite(&opt.blockstore_db_path, &car_path, &alias)
+ let mut repo = RepoStore::open(&opt.blockstore_db_path)?;
+ repo.import_car_path(&car_path, Some(alias))?;
+ Ok(())
}
Command::Inspect {} => mst::dump_mst_keys(&opt.blockstore_db_path),
Command::GenerateSecret {} => {
diff --git a/adenosine-pds/src/car.rs b/adenosine-pds/src/car.rs
index 43b4e1f..832c87e 100644
--- a/adenosine-pds/src/car.rs
+++ b/adenosine-pds/src/car.rs
@@ -1,50 +1,76 @@
use anyhow::Result;
-use crate::vendored::iroh_car::CarReader;
+use crate::vendored::iroh_car::{CarHeader, CarReader, CarWriter};
use futures::TryStreamExt;
use ipfs_sqlite_block_store::BlockStore;
use libipld::{Block, Cid};
use std::path::PathBuf;
use tokio::fs::File;
-use tokio::io::BufReader;
+use tokio::io::{AsyncRead, BufReader};
-pub fn load_car_to_sqlite(db_path: &PathBuf, car_path: &PathBuf, alias: &str) -> Result<()> {
- let mut db: BlockStore<libipld::DefaultParams> =
- { BlockStore::open(db_path, Default::default())? };
-
- load_car_to_blockstore(&mut db, car_path, alias)?;
- Ok(())
+/// Synchronous wrapper for loading in-memory CAR bytes (`&[u8]`) into a blockstore.
+///
+/// Does not do any pinning, even temporarily. Returns the root CID indicated in the CAR file
+/// header.
+pub fn load_car_bytes_to_blockstore(
+ db: &mut BlockStore<libipld::DefaultParams>,
+ car_bytes: &[u8],
+) -> Result<Cid> {
+ let rt = tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()?;
+ rt.block_on(inner_car_bytes_loader(db, car_bytes))
}
-pub fn load_car_to_blockstore(
+/// Synchronous wrapper for loading on-disk CAR file (by path) into a blockstore.
+///
+/// Does not do any pinning, even temporarily. Returns the root CID indicated in the CAR file
+/// header.
+pub fn load_car_path_to_blockstore(
db: &mut BlockStore<libipld::DefaultParams>,
car_path: &PathBuf,
- alias: &str,
) -> Result<Cid> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
- rt.block_on(inner_car_loader(db, car_path, alias))
+ rt.block_on(inner_car_path_loader(db, car_path))
+}
+
+pub fn read_car_bytes_from_blockstore(
+ db: &mut BlockStore<libipld::DefaultParams>,
+ root: &Cid,
+) -> Result<Vec<u8>> {
+ let rt = tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()?;
+ rt.block_on(inner_car_bytes_reader(db, root))
}
-// this async function is wrapped in the sync version above
-async fn inner_car_loader(
+async fn inner_car_bytes_loader(
+ db: &mut BlockStore<libipld::DefaultParams>,
+ car_bytes: &[u8],
+) -> Result<Cid> {
+ let car_reader = CarReader::new(car_bytes).await?;
+ inner_car_loader(db, car_reader).await
+}
+
+async fn inner_car_path_loader(
db: &mut BlockStore<libipld::DefaultParams>,
car_path: &PathBuf,
- alias: &str,
) -> Result<Cid> {
- println!(
- "{} - {}",
- std::env::current_dir()?.display(),
- car_path.display()
- );
let car_reader = {
let file = File::open(car_path).await?;
let buf_reader = BufReader::new(file);
CarReader::new(buf_reader).await?
};
- let car_header = car_reader.header().clone();
+ inner_car_loader(db, car_reader).await
+}
+async fn inner_car_loader<R: AsyncRead + Send + Unpin>(
+ db: &mut BlockStore<libipld::DefaultParams>,
+ car_reader: CarReader<R>,
+) -> Result<Cid> {
+ let car_header = car_reader.header().clone();
car_reader
.stream()
.try_for_each(|(cid, raw)| {
@@ -54,11 +80,21 @@ async fn inner_car_loader(
futures::future::ready(Ok(()))
})
.await?;
+ Ok(car_header.roots()[0])
+}
- // pin the header (?)
- if !car_header.roots().is_empty() {
- db.alias(alias.as_bytes(), Some(&car_header.roots()[0]))?;
- }
+async fn inner_car_bytes_reader(
+ db: &mut BlockStore<libipld::DefaultParams>,
+ root: &Cid,
+) -> Result<Vec<u8>> {
+ let car_header = CarHeader::new_v1(vec![root.clone()]);
+ let buf: Vec<u8> = Default::default();
+ let mut car_writer = CarWriter::new(car_header, buf);
- Ok(car_header.roots()[0])
+ let cid_list = db.get_descendants::<Vec<_>>(root)?;
+ for cid in cid_list {
+ let block = db.get_block(&cid)?.expect("block content");
+ car_writer.write(cid, block).await?;
+ }
+ Ok(car_writer.finish().await?)
}
diff --git a/adenosine-pds/src/lib.rs b/adenosine-pds/src/lib.rs
index ee281e1..009175d 100644
--- a/adenosine-pds/src/lib.rs
+++ b/adenosine-pds/src/lib.rs
@@ -1,5 +1,6 @@
use adenosine_cli::identifiers::{Did, Nsid, Tid, TidLord};
use anyhow::{anyhow, Result};
+use askama::Template;
use libipld::Cid;
use libipld::Ipld;
use log::{debug, error, info, warn};
@@ -10,7 +11,6 @@ use std::fmt;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Mutex;
-use askama::Template;
mod car;
mod crypto;
@@ -23,7 +23,6 @@ mod ucan_p256;
mod vendored;
mod web;
-pub use car::{load_car_to_blockstore, load_car_to_sqlite};
pub use crypto::{KeyPair, PubKey};
pub use db::AtpDatabase;
pub use models::*;
@@ -90,7 +89,10 @@ fn web_wrap(resp: Result<String>) -> Response {
None => 500,
};
warn!("HTTP {}: {}", code, msg);
- Response::html(format!("<html><body><h1>{}</h1><p>{}</body></html>", code, msg))
+ Response::html(format!(
+ "<html><body><h1>{}</h1><p>{}</body></html>",
+ code, msg
+ ))
}
}
}
@@ -521,10 +523,17 @@ fn profile_handler(srv: &Mutex<AtpService>, did: &str, _request: &Request) -> Re
did: did,
profile: json!({}),
feed: vec![],
- }.render()?)
+ }
+ .render()?)
}
-fn post_handler(srv: &Mutex<AtpService>, did: &str, collection: &str, tid: &str, _request: &Request) -> Result<String> {
+fn post_handler(
+ srv: &Mutex<AtpService>,
+ did: &str,
+ collection: &str,
+ tid: &str,
+ _request: &Request,
+) -> Result<String> {
let did = Did::from_str(did)?;
let collection = Nsid::from_str(collection)?;
let rkey = Tid::from_str(tid)?;
@@ -534,7 +543,8 @@ fn post_handler(srv: &Mutex<AtpService>, did: &str, collection: &str, tid: &str,
// TODO: format as JSON, not text debug
Ok(Some(ipld)) => ipld_into_json_value(ipld),
Ok(None) => Err(anyhow!(XrpcError::NotFound(format!(
- "could not find record: /{}/{}", collection, rkey
+ "could not find record: /{}/{}",
+ collection, rkey
))))?,
Err(e) => Err(e)?,
};
@@ -546,7 +556,8 @@ fn post_handler(srv: &Mutex<AtpService>, did: &str, collection: &str, tid: &str,
tid: rkey,
post_text: post["text"].as_str().unwrap().to_string(), // TODO: unwrap
post_created_at: "some-time".to_string(),
- }.render()?)
+ }
+ .render()?)
}
fn repo_handler(srv: &Mutex<AtpService>, did: &str, _request: &Request) -> Result<String> {
@@ -570,10 +581,16 @@ fn repo_handler(srv: &Mutex<AtpService>, did: &str, _request: &Request) -> Resul
did: did,
commit: commit,
describe: desc,
- }.render()?)
+ }
+ .render()?)
}
-fn collection_handler(srv: &Mutex<AtpService>, did: &str, collection: &str, _request: &Request) -> Result<String> {
+fn collection_handler(
+ srv: &Mutex<AtpService>,
+ did: &str,
+ collection: &str,
+ _request: &Request,
+) -> Result<String> {
let did = Did::from_str(did)?;
let collection = Nsid::from_str(collection)?;
@@ -601,11 +618,17 @@ fn collection_handler(srv: &Mutex<AtpService>, did: &str, collection: &str, _req
did: did,
collection: collection,
records: record_list,
- }.render()?)
+ }
+ .render()?)
}
-fn record_handler(srv: &Mutex<AtpService>, did: &str, collection: &str, tid: &str, _request: &Request) -> Result<String> {
-
+fn record_handler(
+ srv: &Mutex<AtpService>,
+ did: &str,
+ collection: &str,
+ tid: &str,
+ _request: &Request,
+) -> Result<String> {
let did = Did::from_str(did)?;
let collection = Nsid::from_str(collection)?;
let rkey = Tid::from_str(tid)?;
@@ -626,5 +649,6 @@ fn record_handler(srv: &Mutex<AtpService>, did: &str, collection: &str, tid: &st
collection,
tid: rkey,
record,
- }.render()?)
+ }
+ .render()?)
}
diff --git a/adenosine-pds/src/mst.rs b/adenosine-pds/src/mst.rs
index 324d139..969f584 100644
--- a/adenosine-pds/src/mst.rs
+++ b/adenosine-pds/src/mst.rs
@@ -1,4 +1,3 @@
-use crate::load_car_to_blockstore;
use anyhow::{anyhow, Context, Result};
use ipfs_sqlite_block_store::BlockStore;
use libipld::cbor::DagCborCodec;
@@ -297,50 +296,3 @@ fn serialize_wip_tree(
db.put_block(block, None)?;
Ok(cid)
}
-
-pub fn repro_mst(car_path: &PathBuf) -> Result<()> {
- // open a temp block store
- let mut db: BlockStore<libipld::DefaultParams> =
- { BlockStore::open_path(ipfs_sqlite_block_store::DbPath::Memory, Default::default())? };
-
- // load CAR contents from file
- load_car_to_blockstore(&mut db, car_path, "repro-import")?;
-
- let all_aliases: Vec<(Vec<u8>, Cid)> = db.aliases()?;
- if all_aliases.is_empty() {
- error!("expected at least one alias in block store");
- std::process::exit(-1);
- }
- let (_alias, commit_cid) = all_aliases[0].clone();
-
- let commit_node: CommitNode = DagCborCodec.decode(
- &db.get_block(&commit_cid)?
- .ok_or(anyhow!("expected commit block in store"))?,
- )?;
- let root_node: RootNode = DagCborCodec.decode(
- &db.get_block(&commit_node.root)?
- .ok_or(anyhow!("expected root block in store"))?,
- )?;
- let _metadata_node: MetadataNode = DagCborCodec.decode(
- &db.get_block(&root_node.meta)?
- .ok_or(anyhow!("expected metadata block in store"))?,
- )?;
-
- // collect key/value sorted map of string/cid, as BTree
- let mut repo_map: BTreeMap<String, Cid> = BTreeMap::new();
- collect_mst_keys(&mut db, &root_node.data, &mut repo_map)?;
-
- // now re-generate nodes
- let updated = generate_mst(&mut db, &repo_map)?;
-
- info!("original root: {}", root_node.data);
- info!("regenerated : {}", updated);
- if root_node.data == updated {
- Ok(())
- } else {
- println!("FAILED");
- let a = get_mst_node(&mut db, &root_node.data)?;
- let b = get_mst_node(&mut db, &updated)?;
- Err(anyhow!("FAILED to reproduce MST: {:?} != {:?}", a, b))
- }
-}
diff --git a/adenosine-pds/src/repo.rs b/adenosine-pds/src/repo.rs
index 7b6f7e6..1b24be8 100644
--- a/adenosine-pds/src/repo.rs
+++ b/adenosine-pds/src/repo.rs
@@ -1,5 +1,8 @@
+use crate::car::{
+ load_car_bytes_to_blockstore, load_car_path_to_blockstore, read_car_bytes_from_blockstore,
+};
use crate::mst::{collect_mst_keys, generate_mst, CommitNode, MetadataNode, RootNode};
-use crate::{load_car_to_blockstore, KeyPair};
+use crate::KeyPair;
use adenosine_cli::identifiers::{Did, Nsid, Tid};
use anyhow::{anyhow, ensure, Context, Result};
use ipfs_sqlite_block_store::BlockStore;
@@ -18,6 +21,7 @@ use std::str::FromStr;
pub struct RepoCommit {
pub sig: Box<[u8]>,
pub commit_cid: Cid,
+ pub root_cid: Cid,
pub did: Did,
pub prev: Option<Cid>,
pub meta_cid: Cid,
@@ -130,6 +134,7 @@ impl RepoStore {
Ok(RepoCommit {
sig: commit_node.sig,
commit_cid: commit_cid.clone(),
+ root_cid: commit_node.root.clone(),
meta_cid: root_node.meta,
did: Did::from_str(&metadata_node.did)?,
prev: root_node.prev,
@@ -259,22 +264,73 @@ impl RepoStore {
self.write_commit(&did, new_root_cid, &sig)
}
- /// returns the root commit from CAR file
- pub fn load_car(&mut self, car_path: &PathBuf, alias: &str) -> Result<Cid> {
- let cid = load_car_to_blockstore(&mut self.db, car_path, alias)?;
+ /// Reads in a full MST tree starting at a repo commit, then re-builds and re-writes the tree
+ /// in to the repo, and verifies that both the MST root CIDs and the repo root CIDs are identical.
+ pub fn verify_repo_mst(&mut self, commit_cid: &Cid) -> Result<()> {
+ // load existing commit and MST tree
+ let existing_commit = self.get_commit(commit_cid)?;
+ let repo_map = self.mst_to_map(&existing_commit.mst_cid)?;
+
+ // write MST tree, and verify root CID
+ let new_mst_cid = self.mst_from_map(&repo_map)?;
+ if new_mst_cid != existing_commit.mst_cid {
+ Err(anyhow!(
+ "MST root CID did not verify: {} != {}",
+ existing_commit.mst_cid,
+ new_mst_cid
+ ))?;
+ }
+
+ let new_root_cid =
+ self.write_root(existing_commit.meta_cid, existing_commit.prev, new_mst_cid)?;
+ if new_root_cid != existing_commit.root_cid {
+ Err(anyhow!(
+ "repo root CID did not verify: {} != {}",
+ existing_commit.root_cid,
+ new_root_cid
+ ))?;
+ }
+
+ Ok(())
+ }
+
+ /// Import blocks from a CAR file in memory, optionally setting an alias pointing to the input
+ /// (eg, a DID identifier).
+ ///
+ /// Does not currently do any validation of, eg, signatures. It is naive and incomplete to use
+ /// this to simply import CAR content from users, remote servers, etc.
+ ///
+ /// Returns the root commit from the CAR file, which may or may not actually be a "commit"
+ /// block.
+ pub fn import_car_bytes(&mut self, car_bytes: &[u8], alias: Option<String>) -> Result<Cid> {
+ let cid = load_car_bytes_to_blockstore(&mut self.db, car_bytes)?;
+ self.verify_repo_mst(&cid)?;
+ if let Some(alias) = alias {
+ self.db.alias(alias.as_bytes().to_vec(), Some(&cid))?;
+ }
+ Ok(cid)
+ }
+
+ /// Similar to import_car_bytes(), but reads from a local file on disk instead of from memory.
+ pub fn import_car_path(&mut self, car_path: &PathBuf, alias: Option<String>) -> Result<Cid> {
+ let cid = load_car_path_to_blockstore(&mut self.db, car_path)?;
+ self.verify_repo_mst(&cid)?;
+ if let Some(alias) = alias {
+ self.db.alias(alias.as_bytes().to_vec(), Some(&cid))?;
+ }
Ok(cid)
}
/// Exports in CAR format to a Writer
///
/// The "from" commit CID feature is not implemented.
- pub fn write_car<W: std::io::Write>(
+ pub fn export_car(
&mut self,
- _did: &Did,
+ commit_cid: &Cid,
_from_commit_cid: Option<&Cid>,
- _out: &mut W,
- ) -> Result<()> {
- unimplemented!()
+ ) -> Result<Vec<u8>> {
+ // TODO: from_commit_cid
+ read_car_bytes_from_blockstore(&mut self.db, &commit_cid)
}
}
diff --git a/adenosine-pds/src/web.rs b/adenosine-pds/src/web.rs
index 81e62be..e783b5a 100644
--- a/adenosine-pds/src/web.rs
+++ b/adenosine-pds/src/web.rs
@@ -1,9 +1,8 @@
-
+use crate::models::*;
+use crate::repo::RepoCommit;
use adenosine_cli::identifiers::{Did, Nsid, Tid};
-use serde_json;
use askama::Template;
-use crate::repo::RepoCommit;
-use crate::models::*;
+use serde_json;
#[derive(Template)]
#[template(path = "home.html")]
diff --git a/adenosine-pds/tests/test_repro_mst.rs b/adenosine-pds/tests/test_repro_mst.rs
index 9a23c03..df88559 100644
--- a/adenosine-pds/tests/test_repro_mst.rs
+++ b/adenosine-pds/tests/test_repro_mst.rs
@@ -1,9 +1,26 @@
-use adenosine_pds::mst::repro_mst;
+use adenosine_pds::RepoStore;
use std::path::PathBuf;
use std::str::FromStr;
#[test]
fn test_repro_mst() {
- repro_mst(&PathBuf::from_str("./tests/example_repo.car").unwrap()).unwrap();
- repro_mst(&PathBuf::from_str("./tests/bigger.car").unwrap()).unwrap();
+ let mut repo = RepoStore::open_ephemeral().unwrap();
+ let cid = repo
+ .import_car_path(
+ &PathBuf::from_str("./tests/example_repo.car").unwrap(),
+ None,
+ )
+ .unwrap();
+ repo.verify_repo_mst(&cid).unwrap();
+ let cid = repo
+ .import_car_path(&PathBuf::from_str("./tests/bigger.car").unwrap(), None)
+ .unwrap();
+ repo.verify_repo_mst(&cid).unwrap();
+
+ // test round-tripping from export
+ let car_bytes = repo.export_car(&cid, None).unwrap();
+ let mut other_repo = RepoStore::open_ephemeral().unwrap();
+ let other_cid = other_repo.import_car_bytes(&car_bytes, None).unwrap();
+ other_repo.verify_repo_mst(&cid).unwrap();
+ assert_eq!(cid, other_cid);
}