diff options
Diffstat (limited to 'adenosine-pds/src/car.rs')
-rw-r--r-- | adenosine-pds/src/car.rs | 61 |
1 files changed, 61 insertions, 0 deletions
diff --git a/adenosine-pds/src/car.rs b/adenosine-pds/src/car.rs new file mode 100644 index 0000000..5731848 --- /dev/null +++ b/adenosine-pds/src/car.rs @@ -0,0 +1,61 @@ +use anyhow::Result; + +use futures::TryStreamExt; +use ipfs_sqlite_block_store::BlockStore; +use iroh_car::CarReader; +use libipld::block::Block; +use std::path::PathBuf; +use tokio::fs::File; +use tokio::io::BufReader; + +pub fn load_car_to_sqlite(db_path: &PathBuf, car_path: &PathBuf) -> Result<()> { + let mut db: BlockStore<libipld::DefaultParams> = + { BlockStore::open(db_path, Default::default())? }; + + load_car_to_blockstore(&mut db, car_path) +} + +pub fn load_car_to_blockstore( + db: &mut BlockStore<libipld::DefaultParams>, + car_path: &PathBuf, +) -> Result<()> { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()?; + rt.block_on(inner_car_loader(db, car_path)) +} + +// this async function is wrapped in the sync version above +async fn inner_car_loader( + db: &mut BlockStore<libipld::DefaultParams>, + car_path: &PathBuf, +) -> Result<()> { + println!( + "{} - {}", + std::env::current_dir()?.display(), + car_path.display() + ); + let car_reader = { + let file = File::open(car_path).await?; + let buf_reader = BufReader::new(file); + CarReader::new(buf_reader).await? + }; + let car_header = car_reader.header().clone(); + + car_reader + .stream() + .try_for_each(|(cid, raw)| { + // TODO: error handling here instead of unwrap (?) + let block = Block::new(cid, raw).unwrap(); + db.put_block(block, None).unwrap(); + futures::future::ready(Ok(())) + }) + .await?; + + // pin the header + if car_header.roots().len() >= 1 { + db.alias(b"import".to_vec(), Some(&car_header.roots()[0]))?; + } + + Ok(()) +} |