diff options
Diffstat (limited to 'adenosine-pds/src')
-rw-r--r-- | adenosine-pds/src/bin/adenosine-pds-dump-mst.rs | 129 | ||||
-rw-r--r-- | adenosine-pds/src/bin/adenosine-pds-import.rs | 53 | ||||
-rw-r--r-- | adenosine-pds/src/bin/adenosine-pds.rs | 33 | ||||
-rw-r--r-- | adenosine-pds/src/lib.rs | 71 | ||||
-rw-r--r-- | adenosine-pds/src/mst.rs (renamed from adenosine-pds/src/bin/adenosine-pds-repro.rs) | 112 |
5 files changed, 162 insertions, 236 deletions
diff --git a/adenosine-pds/src/bin/adenosine-pds-dump-mst.rs b/adenosine-pds/src/bin/adenosine-pds-dump-mst.rs deleted file mode 100644 index 1f1b947..0000000 --- a/adenosine-pds/src/bin/adenosine-pds-dump-mst.rs +++ /dev/null @@ -1,129 +0,0 @@ -/// Helper program to print MST keys/docs from a sqlite repo -use anyhow::{anyhow, Result}; -use ipfs_sqlite_block_store::BlockStore; -use libipld::cbor::DagCborCodec; -use libipld::prelude::Codec; -use libipld::{Cid, DagCbor}; - -use std::env; - -#[derive(Debug, DagCbor, PartialEq, Eq)] -struct CommitNode { - root: Cid, - sig: Box<[u8]>, -} - -#[derive(Debug, DagCbor, PartialEq, Eq)] -struct RootNode { - auth_token: Option<String>, - prev: Option<Cid>, - // TODO: not 'metadata'? - meta: Cid, - data: Cid, -} - -#[derive(Debug, DagCbor, PartialEq, Eq)] -struct MetadataNode { - datastore: String, // "mst" - did: String, - version: u8, // 1 -} - -#[derive(Debug, DagCbor, PartialEq, Eq)] -struct MstEntry { - k: String, - p: u32, - v: Cid, - t: Option<Cid>, -} - -#[derive(Debug, DagCbor, PartialEq)] -struct MstNode { - l: Option<Cid>, - e: Vec<MstEntry>, -} - -fn get_mst_node(db: &mut BlockStore<libipld::DefaultParams>, cid: &Cid) -> Result<MstNode> { - let mst_node: MstNode = DagCborCodec.decode( - &db.get_block(cid)? - .ok_or(anyhow!("expected block in store"))?, - )?; - Ok(mst_node) -} - -fn print_mst_keys(db: &mut BlockStore<libipld::DefaultParams>, cid: &Cid) -> Result<()> { - let node = get_mst_node(db, cid)?; - if let Some(ref left) = node.l { - print_mst_keys(db, left)?; - } - let mut key: String = "".to_string(); - for entry in node.e.iter() { - key = format!("{}{}", &key[0..entry.p as usize], entry.k); - println!("{}\t-> {}", key, entry.v); - if let Some(ref right) = entry.t { - print_mst_keys(db, right)?; - } - } - Ok(()) -} - -async fn dump_mst_keys(db_path: &str) -> Result<()> { - let mut db: BlockStore<libipld::DefaultParams> = { - let path = std::path::PathBuf::from(db_path); - let path = ipfs_sqlite_block_store::DbPath::File(path); - BlockStore::open_path(path, Default::default())? - }; - - let all_aliases: Vec<(Vec<u8>, Cid)> = db.aliases()?; - if all_aliases.is_empty() { - println!("expected at least one alias in block store"); - std::process::exit(-1); - } - let (alias, commit_cid) = all_aliases[0].clone(); - println!( - "starting from {} [{}]", - commit_cid, - String::from_utf8_lossy(&alias) - ); - - // NOTE: the faster way to develop would have been to decode to libipld::ipld::Ipld first? meh - - //println!("raw commit: {:?}", &db.get_block(&commit_cid)?.ok_or(anyhow!("expected commit block in store"))?); - let commit: CommitNode = DagCborCodec.decode( - &db.get_block(&commit_cid)? - .ok_or(anyhow!("expected commit block in store"))?, - )?; - println!("Commit: {:?}", commit); - //println!("raw root: {:?}", &db.get_block(&commit.root)?.ok_or(anyhow!("expected commit block in store"))?); - let root: RootNode = DagCborCodec.decode( - &db.get_block(&commit.root)? - .ok_or(anyhow!("expected root block in store"))?, - )?; - println!("Root: {:?}", root); - let metadata: MetadataNode = DagCborCodec.decode( - &db.get_block(&root.meta)? - .ok_or(anyhow!("expected metadata block in store"))?, - )?; - println!("Metadata: {:?}", metadata); - let mst_node: MstNode = DagCborCodec.decode( - &db.get_block(&root.data)? - .ok_or(anyhow!("expected block in store"))?, - )?; - println!("MST root node: {:?}", mst_node); - - println!("============"); - - print_mst_keys(&mut db, &root.data)?; - Ok(()) -} - -#[tokio::main] -async fn main() -> Result<()> { - let args: Vec<String> = env::args().collect(); - if args.len() != 2 { - println!("expected 1 args: <db_path>"); - std::process::exit(-1); - } - let db_path = &args[1]; - dump_mst_keys(db_path).await -} diff --git a/adenosine-pds/src/bin/adenosine-pds-import.rs b/adenosine-pds/src/bin/adenosine-pds-import.rs deleted file mode 100644 index 37ea6eb..0000000 --- a/adenosine-pds/src/bin/adenosine-pds-import.rs +++ /dev/null @@ -1,53 +0,0 @@ -/// Helper program to import an IPLD CARv1 file in to sqlite data store -use anyhow::{anyhow, Result}; -use futures::TryStreamExt; -use ipfs_sqlite_block_store::BlockStore; -use iroh_car::CarReader; -use libipld::block::Block; -use tokio::fs::File; -use tokio::io::BufReader; - -use std::env; - -async fn load_car_to_sqlite(db_path: &str, car_path: &str) -> Result<()> { - let car_reader = { - let file = File::open(car_path).await?; - let buf_reader = BufReader::new(file); - CarReader::new(buf_reader).await? - }; - let car_header = car_reader.header().clone(); - let mut db: BlockStore<libipld::DefaultParams> = { - let path = std::path::PathBuf::from(db_path); - let path = ipfs_sqlite_block_store::DbPath::File(path); - BlockStore::open_path(path, Default::default())? - }; - - car_reader - .stream() - .try_for_each(|(cid, raw)| { - // TODO: error handling here instead of unwrap (?) - let block = Block::new(cid, raw).unwrap(); - db.put_block(block, None).unwrap(); - futures::future::ready(Ok(())) - }) - .await?; - - // pin the header - if car_header.roots().len() >= 1 { - db.alias(b"import".to_vec(), Some(&car_header.roots()[0]))?; - } - - Ok(()) -} - -#[tokio::main] -async fn main() -> Result<()> { - let args: Vec<String> = env::args().collect(); - if args.len() != 3 { - println!("expected 2 args: <db_path> <car_path>"); - std::process::exit(-1); - } - let db_path = &args[1]; - let car_path = &args[2]; - load_car_to_sqlite(db_path, car_path).await -} diff --git a/adenosine-pds/src/bin/adenosine-pds.rs b/adenosine-pds/src/bin/adenosine-pds.rs index 5d0e638..44c4cef 100644 --- a/adenosine-pds/src/bin/adenosine-pds.rs +++ b/adenosine-pds/src/bin/adenosine-pds.rs @@ -4,7 +4,6 @@ use anyhow::Result; use log::{self, debug}; use structopt::StructOpt; - #[derive(StructOpt)] #[structopt( rename_all = "kebab-case", @@ -12,7 +11,6 @@ use structopt::StructOpt; )] struct Opt { // TODO: different path type for structopt? - /// File path of sqlite database for storing IPLD blocks (aka, repository content) #[structopt( parse(from_os_str), @@ -44,12 +42,18 @@ struct Opt { #[derive(StructOpt)] enum Command { /// Start ATP server as a foreground process - Serve, + Serve { + #[structopt(long, default_value = "3030")] + port: u16, + }, - /// Import a CAR file (TODO) - Import, + /// Helper to import an IPLD CARv1 file in to sqlite data store + Import { + /// CARv1 file path to import from + car_path: std::path::PathBuf, + }, - /// Dump info from databases (TODO) + /// Helper to print MST keys/docs from a sqlite repo Inspect, } @@ -74,16 +78,13 @@ fn main() -> Result<()> { debug!("config parsed, starting up"); match opt.cmd { - Command::Serve {} => { + Command::Serve { port } => { // TODO: log some config stuff? - run_server() - }, - Command::Import {} => { - unimplemented!() - }, - Command::Inspect {} => { - unimplemented!() - }, + run_server(port) + } + Command::Import { car_path } => { + load_car_to_sqlite(&opt.blockstore_db_path, &car_path) + } + Command::Inspect {} => dump_mst_keys(&opt.blockstore_db_path), } } - diff --git a/adenosine-pds/src/lib.rs b/adenosine-pds/src/lib.rs index f321d3f..1785640 100644 --- a/adenosine-pds/src/lib.rs +++ b/adenosine-pds/src/lib.rs @@ -1,20 +1,35 @@ - use anyhow::Result; -use log::{info, error}; -use rouille::{Request, Response, router}; +use log::{error, info}; +use rouille::{router, Request, Response}; + +use futures::TryStreamExt; +use ipfs_sqlite_block_store::BlockStore; +use iroh_car::CarReader; +use libipld::block::Block; +use std::path::PathBuf; +use tokio::fs::File; +use tokio::io::BufReader; -pub fn run_server() -> Result<()> { +mod mst; +pub use mst::{dump_mst_keys, repro_mst}; + +pub fn run_server(port: u16) -> Result<()> { // TODO: log access requests // TODO: some static files? https://github.com/tomaka/rouille/blob/master/examples/static-files.rs - let log_ok = |req: &Request, resp: &Response, elap: std::time::Duration| { + let log_ok = |req: &Request, _resp: &Response, elap: std::time::Duration| { info!("{} {} ({:?})", req.method(), req.raw_url(), elap); }; let log_err = |req: &Request, elap: std::time::Duration| { - error!("HTTP handler panicked: {} {} ({:?})", req.method(), req.raw_url(), elap); + error!( + "HTTP handler panicked: {} {} ({:?})", + req.method(), + req.raw_url(), + elap + ); }; - rouille::start_server("localhost:3030", move |request| { + rouille::start_server(format!("localhost:{}", port), move |request| { rouille::log_custom(request, log_ok, log_err, || { router!(request, (GET) ["/"] => { @@ -33,3 +48,45 @@ pub fn run_server() -> Result<()> { }) }); } + +pub fn load_car_to_sqlite(db_path: &PathBuf, car_path: &PathBuf) -> Result<()> { + let mut db: BlockStore<libipld::DefaultParams> = + { BlockStore::open(db_path, Default::default())? }; + + load_car_to_blockstore(&mut db, car_path) +} + +pub fn load_car_to_blockstore(db: &mut BlockStore<libipld::DefaultParams>, car_path: &PathBuf) -> Result<()> { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()?; + rt.block_on(inner_car_loader(db, car_path)) +} + +// this async function is wrapped in the sync version above +async fn inner_car_loader(db: &mut BlockStore<libipld::DefaultParams>, car_path: &PathBuf) -> Result<()> { + println!("{} - {}", std::env::current_dir()?.display(), car_path.display()); + let car_reader = { + let file = File::open(car_path).await?; + let buf_reader = BufReader::new(file); + CarReader::new(buf_reader).await? + }; + let car_header = car_reader.header().clone(); + + car_reader + .stream() + .try_for_each(|(cid, raw)| { + // TODO: error handling here instead of unwrap (?) + let block = Block::new(cid, raw).unwrap(); + db.put_block(block, None).unwrap(); + futures::future::ready(Ok(())) + }) + .await?; + + // pin the header + if car_header.roots().len() >= 1 { + db.alias(b"import".to_vec(), Some(&car_header.roots()[0]))?; + } + + Ok(()) +} diff --git a/adenosine-pds/src/bin/adenosine-pds-repro.rs b/adenosine-pds/src/mst.rs index 05048ce..a2a394f 100644 --- a/adenosine-pds/src/bin/adenosine-pds-repro.rs +++ b/adenosine-pds/src/mst.rs @@ -1,5 +1,3 @@ -/// Development helper which loads MST keys and CIDs, re-generates MST structure, then compares -/// root node with what was originally found. use anyhow::{anyhow, Result}; use ipfs_sqlite_block_store::BlockStore; use libipld::cbor::DagCborCodec; @@ -8,9 +6,10 @@ use libipld::prelude::Codec; use libipld::store::DefaultParams; use libipld::Block; use libipld::{Cid, DagCbor}; +use log::{debug, error, info}; use std::collections::BTreeMap; - -use std::env; +use std::path::PathBuf; +use crate::load_car_to_blockstore; #[derive(Debug, DagCbor, PartialEq, Eq)] struct CommitNode { @@ -69,6 +68,71 @@ fn get_mst_node(db: &mut BlockStore<libipld::DefaultParams>, cid: &Cid) -> Resul Ok(mst_node) } +fn print_mst_keys(db: &mut BlockStore<libipld::DefaultParams>, cid: &Cid) -> Result<()> { + let node = get_mst_node(db, cid)?; + if let Some(ref left) = node.l { + print_mst_keys(db, left)?; + } + let mut key: String = "".to_string(); + for entry in node.e.iter() { + key = format!("{}{}", &key[0..entry.p as usize], entry.k); + println!("{}\t-> {}", key, entry.v); + if let Some(ref right) = entry.t { + print_mst_keys(db, right)?; + } + } + Ok(()) +} + +pub fn dump_mst_keys(db_path: &PathBuf) -> Result<()> { + let mut db: BlockStore<libipld::DefaultParams> = + { BlockStore::open(db_path, Default::default())? }; + + let all_aliases: Vec<(Vec<u8>, Cid)> = db.aliases()?; + if all_aliases.is_empty() { + error!("expected at least one alias in block store"); + std::process::exit(-1); + } + let (alias, commit_cid) = all_aliases[0].clone(); + info!( + "starting from {} [{}]", + commit_cid, + String::from_utf8_lossy(&alias) + ); + + // NOTE: the faster way to develop would have been to decode to libipld::ipld::Ipld first? meh + + debug!( + "raw commit: {:?}", + &db.get_block(&commit_cid)? + .ok_or(anyhow!("expected commit block in store"))? + ); + let commit: CommitNode = DagCborCodec.decode( + &db.get_block(&commit_cid)? + .ok_or(anyhow!("expected commit block in store"))?, + )?; + debug!("Commit: {:?}", commit); + let root: RootNode = DagCborCodec.decode( + &db.get_block(&commit.root)? + .ok_or(anyhow!("expected root block in store"))?, + )?; + debug!("Root: {:?}", root); + let metadata: MetadataNode = DagCborCodec.decode( + &db.get_block(&root.meta)? + .ok_or(anyhow!("expected metadata block in store"))?, + )?; + debug!("Metadata: {:?}", metadata); + let mst_node: MstNode = DagCborCodec.decode( + &db.get_block(&root.data)? + .ok_or(anyhow!("expected block in store"))?, + )?; + debug!("MST root node: {:?}", mst_node); + debug!("============"); + + print_mst_keys(&mut db, &root.data)?; + Ok(()) +} + fn collect_mst_keys( db: &mut BlockStore<libipld::DefaultParams>, cid: &Cid, @@ -174,7 +238,7 @@ fn common_prefix_len(a: &str, b: &str) -> usize { } } // strings are the same, up to common length - a.len() + std::cmp::min(a.len(), b.len()) } #[test] @@ -222,16 +286,19 @@ fn serialize_wip_tree( Ok(cid) } -async fn repro_mst(db_path: &str) -> Result<()> { +pub fn repro_mst(car_path: &PathBuf) -> Result<()> { + + // open a temp block store let mut db: BlockStore<libipld::DefaultParams> = { - let path = std::path::PathBuf::from(db_path); - let path = ipfs_sqlite_block_store::DbPath::File(path); - BlockStore::open_path(path, Default::default())? + BlockStore::open_path(ipfs_sqlite_block_store::DbPath::Memory, Default::default())? }; + // load CAR contents from file + load_car_to_blockstore(&mut db, car_path)?; + let all_aliases: Vec<(Vec<u8>, Cid)> = db.aliases()?; if all_aliases.is_empty() { - println!("expected at least one alias in block store"); + error!("expected at least one alias in block store"); std::process::exit(-1); } let (_alias, commit_cid) = all_aliases[0].clone(); @@ -253,34 +320,17 @@ async fn repro_mst(db_path: &str) -> Result<()> { let mut repo_map: BTreeMap<String, Cid> = BTreeMap::new(); collect_mst_keys(&mut db, &root_node.data, &mut repo_map)?; - for (k, v) in repo_map.iter() { - println!("{}\t-> {}", k, v); - } - // now re-generate nodes let updated = generate_mst(&mut db, &mut repo_map)?; - println!("original root: {}", root_node.data); - println!("regenerated : {}", updated); + info!("original root: {}", root_node.data); + info!("regenerated : {}", updated); if root_node.data == updated { - println!("SUCCESS! (amazing)"); + Ok(()) } else { println!("FAILED"); let a = get_mst_node(&mut db, &root_node.data)?; let b = get_mst_node(&mut db, &updated)?; - println!("A: {:?}", a); - println!("B: {:?}", b); - }; - Ok(()) -} - -#[tokio::main] -async fn main() -> Result<()> { - let args: Vec<String> = env::args().collect(); - if args.len() != 2 { - println!("expected 1 args: <db_path>"); - std::process::exit(-1); + Err(anyhow!("FAILED to reproduce MST: {:?} != {:?}", a, b)) } - let db_path = &args[1]; - repro_mst(db_path).await } |