diff options
Diffstat (limited to 'adenosine-pds/src')
| -rw-r--r-- | adenosine-pds/src/bin/adenosine-pds-dump-mst.rs | 129 | ||||
| -rw-r--r-- | adenosine-pds/src/bin/adenosine-pds-import.rs | 53 | ||||
| -rw-r--r-- | adenosine-pds/src/bin/adenosine-pds.rs | 33 | ||||
| -rw-r--r-- | adenosine-pds/src/lib.rs | 71 | ||||
| -rw-r--r-- | adenosine-pds/src/mst.rs (renamed from adenosine-pds/src/bin/adenosine-pds-repro.rs) | 112 | 
5 files changed, 162 insertions, 236 deletions
| diff --git a/adenosine-pds/src/bin/adenosine-pds-dump-mst.rs b/adenosine-pds/src/bin/adenosine-pds-dump-mst.rs deleted file mode 100644 index 1f1b947..0000000 --- a/adenosine-pds/src/bin/adenosine-pds-dump-mst.rs +++ /dev/null @@ -1,129 +0,0 @@ -/// Helper program to print MST keys/docs from a sqlite repo -use anyhow::{anyhow, Result}; -use ipfs_sqlite_block_store::BlockStore; -use libipld::cbor::DagCborCodec; -use libipld::prelude::Codec; -use libipld::{Cid, DagCbor}; - -use std::env; - -#[derive(Debug, DagCbor, PartialEq, Eq)] -struct CommitNode { -    root: Cid, -    sig: Box<[u8]>, -} - -#[derive(Debug, DagCbor, PartialEq, Eq)] -struct RootNode { -    auth_token: Option<String>, -    prev: Option<Cid>, -    // TODO: not 'metadata'? -    meta: Cid, -    data: Cid, -} - -#[derive(Debug, DagCbor, PartialEq, Eq)] -struct MetadataNode { -    datastore: String, // "mst" -    did: String, -    version: u8, // 1 -} - -#[derive(Debug, DagCbor, PartialEq, Eq)] -struct MstEntry { -    k: String, -    p: u32, -    v: Cid, -    t: Option<Cid>, -} - -#[derive(Debug, DagCbor, PartialEq)] -struct MstNode { -    l: Option<Cid>, -    e: Vec<MstEntry>, -} - -fn get_mst_node(db: &mut BlockStore<libipld::DefaultParams>, cid: &Cid) -> Result<MstNode> { -    let mst_node: MstNode = DagCborCodec.decode( -        &db.get_block(cid)? -            .ok_or(anyhow!("expected block in store"))?, -    )?; -    Ok(mst_node) -} - -fn print_mst_keys(db: &mut BlockStore<libipld::DefaultParams>, cid: &Cid) -> Result<()> { -    let node = get_mst_node(db, cid)?; -    if let Some(ref left) = node.l { -        print_mst_keys(db, left)?; -    } -    let mut key: String = "".to_string(); -    for entry in node.e.iter() { -        key = format!("{}{}", &key[0..entry.p as usize], entry.k); -        println!("{}\t-> {}", key, entry.v); -        if let Some(ref right) = entry.t { -            print_mst_keys(db, right)?; -        } -    } -    Ok(()) -} - -async fn dump_mst_keys(db_path: &str) -> Result<()> { -    let mut db: BlockStore<libipld::DefaultParams> = { -        let path = std::path::PathBuf::from(db_path); -        let path = ipfs_sqlite_block_store::DbPath::File(path); -        BlockStore::open_path(path, Default::default())? -    }; - -    let all_aliases: Vec<(Vec<u8>, Cid)> = db.aliases()?; -    if all_aliases.is_empty() { -        println!("expected at least one alias in block store"); -        std::process::exit(-1); -    } -    let (alias, commit_cid) = all_aliases[0].clone(); -    println!( -        "starting from {} [{}]", -        commit_cid, -        String::from_utf8_lossy(&alias) -    ); - -    // NOTE: the faster way to develop would have been to decode to libipld::ipld::Ipld first? meh - -    //println!("raw commit: {:?}", &db.get_block(&commit_cid)?.ok_or(anyhow!("expected commit block in store"))?); -    let commit: CommitNode = DagCborCodec.decode( -        &db.get_block(&commit_cid)? -            .ok_or(anyhow!("expected commit block in store"))?, -    )?; -    println!("Commit: {:?}", commit); -    //println!("raw root: {:?}", &db.get_block(&commit.root)?.ok_or(anyhow!("expected commit block in store"))?); -    let root: RootNode = DagCborCodec.decode( -        &db.get_block(&commit.root)? -            .ok_or(anyhow!("expected root block in store"))?, -    )?; -    println!("Root: {:?}", root); -    let metadata: MetadataNode = DagCborCodec.decode( -        &db.get_block(&root.meta)? -            .ok_or(anyhow!("expected metadata block in store"))?, -    )?; -    println!("Metadata: {:?}", metadata); -    let mst_node: MstNode = DagCborCodec.decode( -        &db.get_block(&root.data)? -            .ok_or(anyhow!("expected block in store"))?, -    )?; -    println!("MST root node: {:?}", mst_node); - -    println!("============"); - -    print_mst_keys(&mut db, &root.data)?; -    Ok(()) -} - -#[tokio::main] -async fn main() -> Result<()> { -    let args: Vec<String> = env::args().collect(); -    if args.len() != 2 { -        println!("expected 1 args: <db_path>"); -        std::process::exit(-1); -    } -    let db_path = &args[1]; -    dump_mst_keys(db_path).await -} diff --git a/adenosine-pds/src/bin/adenosine-pds-import.rs b/adenosine-pds/src/bin/adenosine-pds-import.rs deleted file mode 100644 index 37ea6eb..0000000 --- a/adenosine-pds/src/bin/adenosine-pds-import.rs +++ /dev/null @@ -1,53 +0,0 @@ -/// Helper program to import an IPLD CARv1 file in to sqlite data store -use anyhow::{anyhow, Result}; -use futures::TryStreamExt; -use ipfs_sqlite_block_store::BlockStore; -use iroh_car::CarReader; -use libipld::block::Block; -use tokio::fs::File; -use tokio::io::BufReader; - -use std::env; - -async fn load_car_to_sqlite(db_path: &str, car_path: &str) -> Result<()> { -    let car_reader = { -        let file = File::open(car_path).await?; -        let buf_reader = BufReader::new(file); -        CarReader::new(buf_reader).await? -    }; -    let car_header = car_reader.header().clone(); -    let mut db: BlockStore<libipld::DefaultParams> = { -        let path = std::path::PathBuf::from(db_path); -        let path = ipfs_sqlite_block_store::DbPath::File(path); -        BlockStore::open_path(path, Default::default())? -    }; - -    car_reader -        .stream() -        .try_for_each(|(cid, raw)| { -            // TODO: error handling here instead of unwrap (?) -            let block = Block::new(cid, raw).unwrap(); -            db.put_block(block, None).unwrap(); -            futures::future::ready(Ok(())) -        }) -        .await?; - -    // pin the header -    if car_header.roots().len() >= 1 { -        db.alias(b"import".to_vec(), Some(&car_header.roots()[0]))?; -    } - -    Ok(()) -} - -#[tokio::main] -async fn main() -> Result<()> { -    let args: Vec<String> = env::args().collect(); -    if args.len() != 3 { -        println!("expected 2 args: <db_path> <car_path>"); -        std::process::exit(-1); -    } -    let db_path = &args[1]; -    let car_path = &args[2]; -    load_car_to_sqlite(db_path, car_path).await -} diff --git a/adenosine-pds/src/bin/adenosine-pds.rs b/adenosine-pds/src/bin/adenosine-pds.rs index 5d0e638..44c4cef 100644 --- a/adenosine-pds/src/bin/adenosine-pds.rs +++ b/adenosine-pds/src/bin/adenosine-pds.rs @@ -4,7 +4,6 @@ use anyhow::Result;  use log::{self, debug};  use structopt::StructOpt; -  #[derive(StructOpt)]  #[structopt(      rename_all = "kebab-case", @@ -12,7 +11,6 @@ use structopt::StructOpt;  )]  struct Opt {      // TODO: different path type for structopt? -      /// File path of sqlite database for storing IPLD blocks (aka, repository content)      #[structopt(          parse(from_os_str), @@ -44,12 +42,18 @@ struct Opt {  #[derive(StructOpt)]  enum Command {      /// Start ATP server as a foreground process -    Serve, +    Serve { +        #[structopt(long, default_value = "3030")] +        port: u16, +    }, -    /// Import a CAR file (TODO) -    Import, +    /// Helper to import an IPLD CARv1 file in to sqlite data store +    Import { +        /// CARv1 file path to import from +        car_path: std::path::PathBuf, +    }, -    /// Dump info from databases (TODO) +    /// Helper to print MST keys/docs from a sqlite repo      Inspect,  } @@ -74,16 +78,13 @@ fn main() -> Result<()> {      debug!("config parsed, starting up");      match opt.cmd { -        Command::Serve {} => { +        Command::Serve { port } => {              // TODO: log some config stuff? -            run_server() -        }, -        Command::Import {} => { -            unimplemented!() -        }, -        Command::Inspect {} => { -            unimplemented!() -        }, +            run_server(port) +        } +        Command::Import { car_path } => { +            load_car_to_sqlite(&opt.blockstore_db_path, &car_path) +        } +        Command::Inspect {} => dump_mst_keys(&opt.blockstore_db_path),      }  } - diff --git a/adenosine-pds/src/lib.rs b/adenosine-pds/src/lib.rs index f321d3f..1785640 100644 --- a/adenosine-pds/src/lib.rs +++ b/adenosine-pds/src/lib.rs @@ -1,20 +1,35 @@ -  use anyhow::Result; -use log::{info, error}; -use rouille::{Request, Response, router}; +use log::{error, info}; +use rouille::{router, Request, Response}; + +use futures::TryStreamExt; +use ipfs_sqlite_block_store::BlockStore; +use iroh_car::CarReader; +use libipld::block::Block; +use std::path::PathBuf; +use tokio::fs::File; +use tokio::io::BufReader; -pub fn run_server() -> Result<()> { +mod mst; +pub use mst::{dump_mst_keys, repro_mst}; + +pub fn run_server(port: u16) -> Result<()> {      // TODO: log access requests      // TODO: some static files? https://github.com/tomaka/rouille/blob/master/examples/static-files.rs -    let log_ok = |req: &Request, resp: &Response, elap: std::time::Duration| { +    let log_ok = |req: &Request, _resp: &Response, elap: std::time::Duration| {          info!("{} {} ({:?})", req.method(), req.raw_url(), elap);      };      let log_err = |req: &Request, elap: std::time::Duration| { -        error!("HTTP handler panicked: {} {} ({:?})", req.method(), req.raw_url(), elap); +        error!( +            "HTTP handler panicked: {} {} ({:?})", +            req.method(), +            req.raw_url(), +            elap +        );      }; -    rouille::start_server("localhost:3030", move |request| { +    rouille::start_server(format!("localhost:{}", port), move |request| {          rouille::log_custom(request, log_ok, log_err, || {              router!(request,                  (GET) ["/"] => { @@ -33,3 +48,45 @@ pub fn run_server() -> Result<()> {          })      });  } + +pub fn load_car_to_sqlite(db_path: &PathBuf, car_path: &PathBuf) -> Result<()> { +    let mut db: BlockStore<libipld::DefaultParams> = +        { BlockStore::open(db_path, Default::default())? }; + +    load_car_to_blockstore(&mut db, car_path) +} + +pub fn load_car_to_blockstore(db: &mut BlockStore<libipld::DefaultParams>, car_path: &PathBuf) -> Result<()> { +    let rt = tokio::runtime::Builder::new_current_thread() +        .enable_all() +        .build()?; +    rt.block_on(inner_car_loader(db, car_path)) +} + +// this async function is wrapped in the sync version above +async fn inner_car_loader(db: &mut BlockStore<libipld::DefaultParams>, car_path: &PathBuf) -> Result<()> { +    println!("{} - {}", std::env::current_dir()?.display(), car_path.display()); +    let car_reader = { +        let file = File::open(car_path).await?; +        let buf_reader = BufReader::new(file); +        CarReader::new(buf_reader).await? +    }; +    let car_header = car_reader.header().clone(); + +    car_reader +        .stream() +        .try_for_each(|(cid, raw)| { +            // TODO: error handling here instead of unwrap (?) +            let block = Block::new(cid, raw).unwrap(); +            db.put_block(block, None).unwrap(); +            futures::future::ready(Ok(())) +        }) +        .await?; + +    // pin the header +    if car_header.roots().len() >= 1 { +        db.alias(b"import".to_vec(), Some(&car_header.roots()[0]))?; +    } + +    Ok(()) +} diff --git a/adenosine-pds/src/bin/adenosine-pds-repro.rs b/adenosine-pds/src/mst.rs index 05048ce..a2a394f 100644 --- a/adenosine-pds/src/bin/adenosine-pds-repro.rs +++ b/adenosine-pds/src/mst.rs @@ -1,5 +1,3 @@ -/// Development helper which loads MST keys and CIDs, re-generates MST structure, then compares -/// root node with what was originally found.  use anyhow::{anyhow, Result};  use ipfs_sqlite_block_store::BlockStore;  use libipld::cbor::DagCborCodec; @@ -8,9 +6,10 @@ use libipld::prelude::Codec;  use libipld::store::DefaultParams;  use libipld::Block;  use libipld::{Cid, DagCbor}; +use log::{debug, error, info};  use std::collections::BTreeMap; - -use std::env; +use std::path::PathBuf; +use crate::load_car_to_blockstore;  #[derive(Debug, DagCbor, PartialEq, Eq)]  struct CommitNode { @@ -69,6 +68,71 @@ fn get_mst_node(db: &mut BlockStore<libipld::DefaultParams>, cid: &Cid) -> Resul      Ok(mst_node)  } +fn print_mst_keys(db: &mut BlockStore<libipld::DefaultParams>, cid: &Cid) -> Result<()> { +    let node = get_mst_node(db, cid)?; +    if let Some(ref left) = node.l { +        print_mst_keys(db, left)?; +    } +    let mut key: String = "".to_string(); +    for entry in node.e.iter() { +        key = format!("{}{}", &key[0..entry.p as usize], entry.k); +        println!("{}\t-> {}", key, entry.v); +        if let Some(ref right) = entry.t { +            print_mst_keys(db, right)?; +        } +    } +    Ok(()) +} + +pub fn dump_mst_keys(db_path: &PathBuf) -> Result<()> { +    let mut db: BlockStore<libipld::DefaultParams> = +        { BlockStore::open(db_path, Default::default())? }; + +    let all_aliases: Vec<(Vec<u8>, Cid)> = db.aliases()?; +    if all_aliases.is_empty() { +        error!("expected at least one alias in block store"); +        std::process::exit(-1); +    } +    let (alias, commit_cid) = all_aliases[0].clone(); +    info!( +        "starting from {} [{}]", +        commit_cid, +        String::from_utf8_lossy(&alias) +    ); + +    // NOTE: the faster way to develop would have been to decode to libipld::ipld::Ipld first? meh + +    debug!( +        "raw commit: {:?}", +        &db.get_block(&commit_cid)? +            .ok_or(anyhow!("expected commit block in store"))? +    ); +    let commit: CommitNode = DagCborCodec.decode( +        &db.get_block(&commit_cid)? +            .ok_or(anyhow!("expected commit block in store"))?, +    )?; +    debug!("Commit: {:?}", commit); +    let root: RootNode = DagCborCodec.decode( +        &db.get_block(&commit.root)? +            .ok_or(anyhow!("expected root block in store"))?, +    )?; +    debug!("Root: {:?}", root); +    let metadata: MetadataNode = DagCborCodec.decode( +        &db.get_block(&root.meta)? +            .ok_or(anyhow!("expected metadata block in store"))?, +    )?; +    debug!("Metadata: {:?}", metadata); +    let mst_node: MstNode = DagCborCodec.decode( +        &db.get_block(&root.data)? +            .ok_or(anyhow!("expected block in store"))?, +    )?; +    debug!("MST root node: {:?}", mst_node); +    debug!("============"); + +    print_mst_keys(&mut db, &root.data)?; +    Ok(()) +} +  fn collect_mst_keys(      db: &mut BlockStore<libipld::DefaultParams>,      cid: &Cid, @@ -174,7 +238,7 @@ fn common_prefix_len(a: &str, b: &str) -> usize {          }      }      // strings are the same, up to common length -    a.len() +    std::cmp::min(a.len(), b.len())  }  #[test] @@ -222,16 +286,19 @@ fn serialize_wip_tree(      Ok(cid)  } -async fn repro_mst(db_path: &str) -> Result<()> { +pub fn repro_mst(car_path: &PathBuf) -> Result<()> { + +    // open a temp block store      let mut db: BlockStore<libipld::DefaultParams> = { -        let path = std::path::PathBuf::from(db_path); -        let path = ipfs_sqlite_block_store::DbPath::File(path); -        BlockStore::open_path(path, Default::default())? +        BlockStore::open_path(ipfs_sqlite_block_store::DbPath::Memory, Default::default())?      }; +    // load CAR contents from file +    load_car_to_blockstore(&mut db, car_path)?; +      let all_aliases: Vec<(Vec<u8>, Cid)> = db.aliases()?;      if all_aliases.is_empty() { -        println!("expected at least one alias in block store"); +        error!("expected at least one alias in block store");          std::process::exit(-1);      }      let (_alias, commit_cid) = all_aliases[0].clone(); @@ -253,34 +320,17 @@ async fn repro_mst(db_path: &str) -> Result<()> {      let mut repo_map: BTreeMap<String, Cid> = BTreeMap::new();      collect_mst_keys(&mut db, &root_node.data, &mut repo_map)?; -    for (k, v) in repo_map.iter() { -        println!("{}\t-> {}", k, v); -    } -      // now re-generate nodes      let updated = generate_mst(&mut db, &mut repo_map)?; -    println!("original root: {}", root_node.data); -    println!("regenerated  : {}", updated); +    info!("original root: {}", root_node.data); +    info!("regenerated  : {}", updated);      if root_node.data == updated { -        println!("SUCCESS! (amazing)"); +        Ok(())      } else {          println!("FAILED");          let a = get_mst_node(&mut db, &root_node.data)?;          let b = get_mst_node(&mut db, &updated)?; -        println!("A: {:?}", a); -        println!("B: {:?}", b); -    }; -    Ok(()) -} - -#[tokio::main] -async fn main() -> Result<()> { -    let args: Vec<String> = env::args().collect(); -    if args.len() != 2 { -        println!("expected 1 args: <db_path>"); -        std::process::exit(-1); +        Err(anyhow!("FAILED to reproduce MST: {:?} != {:?}", a, b))      } -    let db_path = &args[1]; -    repro_mst(db_path).await  } | 
