aboutsummaryrefslogtreecommitdiffstats
path: root/adenosine-pds/src/car.rs
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2022-11-07 17:20:28 -0800
committerBryan Newbold <bnewbold@robocracy.org>2022-11-07 17:20:28 -0800
commit9c8aa3d684575b5b5f169b8f6aca75919283d251 (patch)
tree58bc8ca8407b18e5d708d3b68c1f506e29e88be4 /adenosine-pds/src/car.rs
parent2f414df00e378728701e4061cdb3bebad5df798a (diff)
downloadadenosine-9c8aa3d684575b5b5f169b8f6aca75919283d251.tar.gz
adenosine-9c8aa3d684575b5b5f169b8f6aca75919283d251.zip
pds: implement CAR import/export at repo level
Diffstat (limited to 'adenosine-pds/src/car.rs')
-rw-r--r--adenosine-pds/src/car.rs86
1 files changed, 61 insertions, 25 deletions
diff --git a/adenosine-pds/src/car.rs b/adenosine-pds/src/car.rs
index 43b4e1f..832c87e 100644
--- a/adenosine-pds/src/car.rs
+++ b/adenosine-pds/src/car.rs
@@ -1,50 +1,76 @@
use anyhow::Result;
-use crate::vendored::iroh_car::CarReader;
+use crate::vendored::iroh_car::{CarHeader, CarReader, CarWriter};
use futures::TryStreamExt;
use ipfs_sqlite_block_store::BlockStore;
use libipld::{Block, Cid};
use std::path::PathBuf;
use tokio::fs::File;
-use tokio::io::BufReader;
+use tokio::io::{AsyncRead, BufReader};
-pub fn load_car_to_sqlite(db_path: &PathBuf, car_path: &PathBuf, alias: &str) -> Result<()> {
- let mut db: BlockStore<libipld::DefaultParams> =
- { BlockStore::open(db_path, Default::default())? };
-
- load_car_to_blockstore(&mut db, car_path, alias)?;
- Ok(())
+/// Synchronous wrapper for loading in-memory CAR bytes (`&[u8]`) into a blockstore.
+///
+/// Does not do any pinning, even temporarily. Returns the root CID indicated in the CAR file
+/// header.
+pub fn load_car_bytes_to_blockstore(
+ db: &mut BlockStore<libipld::DefaultParams>,
+ car_bytes: &[u8],
+) -> Result<Cid> {
+ let rt = tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()?;
+ rt.block_on(inner_car_bytes_loader(db, car_bytes))
}
-pub fn load_car_to_blockstore(
+/// Synchronous wrapper for loading on-disk CAR file (by path) into a blockstore.
+///
+/// Does not do any pinning, even temporarily. Returns the root CID indicated in the CAR file
+/// header.
+pub fn load_car_path_to_blockstore(
db: &mut BlockStore<libipld::DefaultParams>,
car_path: &PathBuf,
- alias: &str,
) -> Result<Cid> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
- rt.block_on(inner_car_loader(db, car_path, alias))
+ rt.block_on(inner_car_path_loader(db, car_path))
+}
+
+pub fn read_car_bytes_from_blockstore(
+ db: &mut BlockStore<libipld::DefaultParams>,
+ root: &Cid,
+) -> Result<Vec<u8>> {
+ let rt = tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()?;
+ rt.block_on(inner_car_bytes_reader(db, root))
}
-// this async function is wrapped in the sync version above
-async fn inner_car_loader(
+async fn inner_car_bytes_loader(
+ db: &mut BlockStore<libipld::DefaultParams>,
+ car_bytes: &[u8],
+) -> Result<Cid> {
+ let car_reader = CarReader::new(car_bytes).await?;
+ inner_car_loader(db, car_reader).await
+}
+
+async fn inner_car_path_loader(
db: &mut BlockStore<libipld::DefaultParams>,
car_path: &PathBuf,
- alias: &str,
) -> Result<Cid> {
- println!(
- "{} - {}",
- std::env::current_dir()?.display(),
- car_path.display()
- );
let car_reader = {
let file = File::open(car_path).await?;
let buf_reader = BufReader::new(file);
CarReader::new(buf_reader).await?
};
- let car_header = car_reader.header().clone();
+ inner_car_loader(db, car_reader).await
+}
+async fn inner_car_loader<R: AsyncRead + Send + Unpin>(
+ db: &mut BlockStore<libipld::DefaultParams>,
+ car_reader: CarReader<R>,
+) -> Result<Cid> {
+ let car_header = car_reader.header().clone();
car_reader
.stream()
.try_for_each(|(cid, raw)| {
@@ -54,11 +80,21 @@ async fn inner_car_loader(
futures::future::ready(Ok(()))
})
.await?;
+ Ok(car_header.roots()[0])
+}
- // pin the header (?)
- if !car_header.roots().is_empty() {
- db.alias(alias.as_bytes(), Some(&car_header.roots()[0]))?;
- }
+async fn inner_car_bytes_reader(
+ db: &mut BlockStore<libipld::DefaultParams>,
+ root: &Cid,
+) -> Result<Vec<u8>> {
+ let car_header = CarHeader::new_v1(vec![root.clone()]);
+ let buf: Vec<u8> = Default::default();
+ let mut car_writer = CarWriter::new(car_header, buf);
- Ok(car_header.roots()[0])
+ let cid_list = db.get_descendants::<Vec<_>>(root)?;
+ for cid in cid_list {
+ let block = db.get_block(&cid)?.expect("block content");
+ car_writer.write(cid, block).await?;
+ }
+ Ok(car_writer.finish().await?)
}