diff options
author | Bryan Newbold <bnewbold@robocracy.org> | 2022-11-07 17:20:28 -0800 |
---|---|---|
committer | Bryan Newbold <bnewbold@robocracy.org> | 2022-11-07 17:20:28 -0800 |
commit | 9c8aa3d684575b5b5f169b8f6aca75919283d251 (patch) | |
tree | 58bc8ca8407b18e5d708d3b68c1f506e29e88be4 /adenosine-pds/src/car.rs | |
parent | 2f414df00e378728701e4061cdb3bebad5df798a (diff) | |
download | adenosine-9c8aa3d684575b5b5f169b8f6aca75919283d251.tar.gz adenosine-9c8aa3d684575b5b5f169b8f6aca75919283d251.zip |
pds: implement CAR import/export at repo level
Diffstat (limited to 'adenosine-pds/src/car.rs')
-rw-r--r-- | adenosine-pds/src/car.rs | 86 |
1 files changed, 61 insertions, 25 deletions
diff --git a/adenosine-pds/src/car.rs b/adenosine-pds/src/car.rs index 43b4e1f..832c87e 100644 --- a/adenosine-pds/src/car.rs +++ b/adenosine-pds/src/car.rs @@ -1,50 +1,76 @@ use anyhow::Result; -use crate::vendored::iroh_car::CarReader; +use crate::vendored::iroh_car::{CarHeader, CarReader, CarWriter}; use futures::TryStreamExt; use ipfs_sqlite_block_store::BlockStore; use libipld::{Block, Cid}; use std::path::PathBuf; use tokio::fs::File; -use tokio::io::BufReader; +use tokio::io::{AsyncRead, BufReader}; -pub fn load_car_to_sqlite(db_path: &PathBuf, car_path: &PathBuf, alias: &str) -> Result<()> { - let mut db: BlockStore<libipld::DefaultParams> = - { BlockStore::open(db_path, Default::default())? }; - - load_car_to_blockstore(&mut db, car_path, alias)?; - Ok(()) +/// Synchronous wrapper for loading in-memory CAR bytes (`&[u8]`) into a blockstore. +/// +/// Does not do any pinning, even temporarily. Returns the root CID indicated in the CAR file +/// header. +pub fn load_car_bytes_to_blockstore( + db: &mut BlockStore<libipld::DefaultParams>, + car_bytes: &[u8], +) -> Result<Cid> { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()?; + rt.block_on(inner_car_bytes_loader(db, car_bytes)) } -pub fn load_car_to_blockstore( +/// Synchronous wrapper for loading on-disk CAR file (by path) into a blockstore. +/// +/// Does not do any pinning, even temporarily. Returns the root CID indicated in the CAR file +/// header. +pub fn load_car_path_to_blockstore( db: &mut BlockStore<libipld::DefaultParams>, car_path: &PathBuf, - alias: &str, ) -> Result<Cid> { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build()?; - rt.block_on(inner_car_loader(db, car_path, alias)) + rt.block_on(inner_car_path_loader(db, car_path)) +} + +pub fn read_car_bytes_from_blockstore( + db: &mut BlockStore<libipld::DefaultParams>, + root: &Cid, +) -> Result<Vec<u8>> { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()?; + rt.block_on(inner_car_bytes_reader(db, root)) } -// this async function is wrapped in the sync version above -async fn inner_car_loader( +async fn inner_car_bytes_loader( + db: &mut BlockStore<libipld::DefaultParams>, + car_bytes: &[u8], +) -> Result<Cid> { + let car_reader = CarReader::new(car_bytes).await?; + inner_car_loader(db, car_reader).await +} + +async fn inner_car_path_loader( db: &mut BlockStore<libipld::DefaultParams>, car_path: &PathBuf, - alias: &str, ) -> Result<Cid> { - println!( - "{} - {}", - std::env::current_dir()?.display(), - car_path.display() - ); let car_reader = { let file = File::open(car_path).await?; let buf_reader = BufReader::new(file); CarReader::new(buf_reader).await? }; - let car_header = car_reader.header().clone(); + inner_car_loader(db, car_reader).await +} +async fn inner_car_loader<R: AsyncRead + Send + Unpin>( + db: &mut BlockStore<libipld::DefaultParams>, + car_reader: CarReader<R>, +) -> Result<Cid> { + let car_header = car_reader.header().clone(); car_reader .stream() .try_for_each(|(cid, raw)| { @@ -54,11 +80,21 @@ async fn inner_car_loader( futures::future::ready(Ok(())) }) .await?; + Ok(car_header.roots()[0]) +} - // pin the header (?) - if !car_header.roots().is_empty() { - db.alias(alias.as_bytes(), Some(&car_header.roots()[0]))?; - } +async fn inner_car_bytes_reader( + db: &mut BlockStore<libipld::DefaultParams>, + root: &Cid, +) -> Result<Vec<u8>> { + let car_header = CarHeader::new_v1(vec![root.clone()]); + let buf: Vec<u8> = Default::default(); + let mut car_writer = CarWriter::new(car_header, buf); - Ok(car_header.roots()[0]) + let cid_list = db.get_descendants::<Vec<_>>(root)?; + for cid in cid_list { + let block = db.get_block(&cid)?.expect("block content"); + car_writer.write(cid, block).await?; + } + Ok(car_writer.finish().await?) } |