diff options
Diffstat (limited to 'iroh-car/src')
| -rw-r--r-- | iroh-car/src/error.rs | 28 | ||||
| -rw-r--r-- | iroh-car/src/header.rs | 104 | ||||
| -rw-r--r-- | iroh-car/src/lib.rs | 11 | ||||
| -rw-r--r-- | iroh-car/src/reader.rs | 99 | ||||
| -rw-r--r-- | iroh-car/src/util.rs | 95 | ||||
| -rw-r--r-- | iroh-car/src/writer.rs | 71 | 
6 files changed, 0 insertions, 408 deletions
| diff --git a/iroh-car/src/error.rs b/iroh-car/src/error.rs deleted file mode 100644 index 3579413..0000000 --- a/iroh-car/src/error.rs +++ /dev/null @@ -1,28 +0,0 @@ -use thiserror::Error; - -/// Car utility error -#[derive(Debug, Error)] -pub enum Error { -    #[error("Failed to parse CAR file: {0}")] -    Parsing(String), -    #[error("Invalid CAR file: {0}")] -    InvalidFile(String), -    #[error("Io error: {0}")] -    Io(#[from] std::io::Error), -    #[error("Cbor encoding error: {0}")] -    Cbor(#[from] ipld::error::Error), -    #[error("ld read too large {0}")] -    LdReadTooLarge(usize), -} - -impl From<cid::Error> for Error { -    fn from(err: cid::Error) -> Error { -        Error::Parsing(err.to_string()) -    } -} - -impl From<cid::multihash::Error> for Error { -    fn from(err: cid::multihash::Error) -> Error { -        Error::Parsing(err.to_string()) -    } -} diff --git a/iroh-car/src/header.rs b/iroh-car/src/header.rs deleted file mode 100644 index c004e35..0000000 --- a/iroh-car/src/header.rs +++ /dev/null @@ -1,104 +0,0 @@ -use cid::Cid; -use ipld::codec::Codec; -use ipld_cbor::DagCborCodec; - -use crate::error::Error; - -/// A car header. -#[derive(Debug, Clone, PartialEq, Eq)] -#[non_exhaustive] -pub enum CarHeader { -    V1(CarHeaderV1), -} - -impl CarHeader { -    pub fn new_v1(roots: Vec<Cid>) -> Self { -        Self::V1(roots.into()) -    } - -    pub fn decode(buffer: &[u8]) -> Result<Self, Error> { -        let header: CarHeaderV1 = DagCborCodec -            .decode(buffer) -            .map_err(|e| Error::Parsing(e.to_string()))?; - -        if header.roots.is_empty() { -            return Err(Error::Parsing("empty CAR file".to_owned())); -        } - -        if header.version != 1 { -            return Err(Error::InvalidFile( -                "Only CAR file version 1 is supported".to_string(), -            )); -        } - -        Ok(CarHeader::V1(header)) -    } - -    pub fn encode(&self) -> Result<Vec<u8>, Error> { -        match self { -            CarHeader::V1(ref header) => { -                let res = DagCborCodec.encode(header)?; -                Ok(res) -            } -        } -    } - -    pub fn roots(&self) -> &[Cid] { -        match self { -            CarHeader::V1(header) => &header.roots, -        } -    } - -    pub fn version(&self) -> u64 { -        match self { -            CarHeader::V1(_) => 1, -        } -    } -} - -/// CAR file header version 1. -#[derive(Debug, Clone, Default, ipld::DagCbor, PartialEq, Eq)] -pub struct CarHeaderV1 { -    #[ipld] -    pub roots: Vec<Cid>, -    #[ipld] -    pub version: u64, -} - -impl CarHeaderV1 { -    /// Creates a new CAR file header -    pub fn new(roots: Vec<Cid>, version: u64) -> Self { -        Self { roots, version } -    } -} - -impl From<Vec<Cid>> for CarHeaderV1 { -    fn from(roots: Vec<Cid>) -> Self { -        Self { roots, version: 1 } -    } -} - -#[cfg(test)] -mod tests { -    use ipld::codec::{Decode, Encode}; -    use ipld_cbor::DagCborCodec; -    use multihash::MultihashDigest; - -    use super::*; - -    #[test] -    fn symmetric_header_v1() { -        let digest = multihash::Code::Blake2b256.digest(b"test"); -        let cid = Cid::new_v1(DagCborCodec.into(), digest); - -        let header = CarHeaderV1::from(vec![cid]); - -        let mut bytes = Vec::new(); -        header.encode(DagCborCodec, &mut bytes).unwrap(); - -        assert_eq!( -            CarHeaderV1::decode(DagCborCodec, &mut std::io::Cursor::new(&bytes)).unwrap(), -            header -        ); -    } -} diff --git a/iroh-car/src/lib.rs b/iroh-car/src/lib.rs deleted file mode 100644 index d4e5f66..0000000 --- a/iroh-car/src/lib.rs +++ /dev/null @@ -1,11 +0,0 @@ -//! Implementation of the [car](https://ipld.io/specs/transport/car/) format. - -mod error; -mod header; -mod reader; -mod util; -mod writer; - -pub use crate::header::CarHeader; -pub use crate::reader::CarReader; -pub use crate::writer::CarWriter; diff --git a/iroh-car/src/reader.rs b/iroh-car/src/reader.rs deleted file mode 100644 index c0209be..0000000 --- a/iroh-car/src/reader.rs +++ /dev/null @@ -1,99 +0,0 @@ -use cid::Cid; -use futures::Stream; -use tokio::io::AsyncRead; - -use crate::{ -    error::Error, -    header::CarHeader, -    util::{ld_read, read_node}, -}; - -/// Reads CAR files that are in a BufReader -pub struct CarReader<R> { -    reader: R, -    header: CarHeader, -    buffer: Vec<u8>, -} - -impl<R> CarReader<R> -where -    R: AsyncRead + Send + Unpin, -{ -    /// Creates a new CarReader and parses the CarHeader -    pub async fn new(mut reader: R) -> Result<Self, Error> { -        let mut buffer = Vec::new(); - -        match ld_read(&mut reader, &mut buffer).await? { -            Some(buf) => { -                let header = CarHeader::decode(buf)?; - -                Ok(CarReader { -                    reader, -                    header, -                    buffer, -                }) -            } -            None => Err(Error::Parsing( -                "failed to parse uvarint for header".to_string(), -            )), -        } -    } - -    /// Returns the header of this car file. -    pub fn header(&self) -> &CarHeader { -        &self.header -    } - -    /// Returns the next IPLD Block in the buffer -    pub async fn next_block(&mut self) -> Result<Option<(Cid, Vec<u8>)>, Error> { -        read_node(&mut self.reader, &mut self.buffer).await -    } - -    pub fn stream(self) -> impl Stream<Item = Result<(Cid, Vec<u8>), Error>> { -        futures::stream::try_unfold(self, |mut this| async move { -            let maybe_block = read_node(&mut this.reader, &mut this.buffer).await?; -            Ok(maybe_block.map(|b| (b, this))) -        }) -    } -} - -#[cfg(test)] -mod tests { -    use std::io::Cursor; - -    use cid::Cid; -    use futures::TryStreamExt; -    use ipld_cbor::DagCborCodec; -    use multihash::MultihashDigest; - -    use crate::{header::CarHeaderV1, writer::CarWriter}; - -    use super::*; - -    #[tokio::test] -    async fn car_write_read() { -        let digest_test = multihash::Code::Blake2b256.digest(b"test"); -        let cid_test = Cid::new_v1(DagCborCodec.into(), digest_test); - -        let digest_foo = multihash::Code::Blake2b256.digest(b"foo"); -        let cid_foo = Cid::new_v1(DagCborCodec.into(), digest_foo); - -        let header = CarHeader::V1(CarHeaderV1::from(vec![cid_foo])); - -        let mut buffer = Vec::new(); -        let mut writer = CarWriter::new(header, &mut buffer); -        writer.write(cid_test, b"test").await.unwrap(); -        writer.write(cid_foo, b"foo").await.unwrap(); -        writer.finish().await.unwrap(); - -        let reader = Cursor::new(&buffer); -        let car_reader = CarReader::new(reader).await.unwrap(); -        let files: Vec<_> = car_reader.stream().try_collect().await.unwrap(); - -        assert_eq!(files.len(), 2); -        assert_eq!(files[0].0, cid_test); -        assert_eq!(files[0].1, b"test"); -        assert_eq!(files[1].0, cid_foo); -        assert_eq!(files[1].1, b"foo"); -    } -} diff --git a/iroh-car/src/util.rs b/iroh-car/src/util.rs deleted file mode 100644 index 25be761..0000000 --- a/iroh-car/src/util.rs +++ /dev/null @@ -1,95 +0,0 @@ -use cid::Cid; -use integer_encoding::VarIntAsyncReader; -use tokio::io::{AsyncRead, AsyncReadExt}; - -use super::error::Error; - -/// Maximum size that is used for single node. -pub(crate) const MAX_ALLOC: usize = 4 * 1024 * 1024; - -pub(crate) async fn ld_read<R>(mut reader: R, buf: &mut Vec<u8>) -> Result<Option<&[u8]>, Error> -where -    R: AsyncRead + Send + Unpin, -{ -    let length: usize = match VarIntAsyncReader::read_varint_async(&mut reader).await { -        Ok(len) => len, -        Err(e) => { -            if e.kind() == std::io::ErrorKind::UnexpectedEof { -                return Ok(None); -            } -            return Err(Error::Parsing(e.to_string())); -        } -    }; - -    if length > MAX_ALLOC { -        return Err(Error::LdReadTooLarge(length)); -    } -    if length > buf.len() { -        buf.resize(length, 0); -    } - -    reader -        .read_exact(&mut buf[..length]) -        .await -        .map_err(|e| Error::Parsing(e.to_string()))?; - -    Ok(Some(&buf[..length])) -} - -pub(crate) async fn read_node<R>( -    buf_reader: &mut R, -    buf: &mut Vec<u8>, -) -> Result<Option<(Cid, Vec<u8>)>, Error> -where -    R: AsyncRead + Send + Unpin, -{ -    if let Some(buf) = ld_read(buf_reader, buf).await? { -        let mut cursor = std::io::Cursor::new(buf); -        let c = Cid::read_bytes(&mut cursor)?; -        let pos = cursor.position() as usize; - -        return Ok(Some((c, buf[pos..].to_vec()))); -    } -    Ok(None) -} - -#[cfg(test)] -mod tests { -    use integer_encoding::VarIntAsyncWriter; -    use tokio::io::{AsyncWrite, AsyncWriteExt}; - -    use super::*; - -    async fn ld_write<'a, W>(writer: &mut W, bytes: &[u8]) -> Result<(), Error> -    where -        W: AsyncWrite + Send + Unpin, -    { -        writer.write_varint_async(bytes.len()).await?; -        writer.write_all(bytes).await?; -        writer.flush().await?; -        Ok(()) -    } - -    #[tokio::test] -    async fn ld_read_write_good() { -        let mut buffer = Vec::<u8>::new(); -        ld_write(&mut buffer, b"test bytes").await.unwrap(); -        let reader = std::io::Cursor::new(buffer); - -        let mut buffer = vec![1u8; 1024]; -        let read = ld_read(reader, &mut buffer).await.unwrap().unwrap(); -        assert_eq!(read, b"test bytes"); -    } - -    #[tokio::test] -    async fn ld_read_write_fail() { -        let mut buffer = Vec::<u8>::new(); -        let size = MAX_ALLOC + 1; -        ld_write(&mut buffer, &vec![2u8; size]).await.unwrap(); -        let reader = std::io::Cursor::new(buffer); - -        let mut buffer = vec![1u8; 1024]; -        let read = ld_read(reader, &mut buffer).await; -        assert!(matches!(read, Err(Error::LdReadTooLarge(_)))); -    } -} diff --git a/iroh-car/src/writer.rs b/iroh-car/src/writer.rs deleted file mode 100644 index 9f17eb9..0000000 --- a/iroh-car/src/writer.rs +++ /dev/null @@ -1,71 +0,0 @@ -use cid::Cid; -use integer_encoding::VarIntAsyncWriter; -use tokio::io::{AsyncWrite, AsyncWriteExt}; - -use crate::{error::Error, header::CarHeader}; - -#[derive(Debug)] -pub struct CarWriter<W> { -    header: CarHeader, -    writer: W, -    cid_buffer: Vec<u8>, -    is_header_written: bool, -} - -impl<W> CarWriter<W> -where -    W: AsyncWrite + Send + Unpin, -{ -    pub fn new(header: CarHeader, writer: W) -> Self { -        CarWriter { -            header, -            writer, -            cid_buffer: Vec::new(), -            is_header_written: false, -        } -    } - -    /// Writes header and stream of data to writer in Car format. -    pub async fn write<T>(&mut self, cid: Cid, data: T) -> Result<(), Error> -    where -        T: AsRef<[u8]>, -    { -        if !self.is_header_written { -            // Write header bytes -            let header_bytes = self.header.encode()?; -            self.writer.write_varint_async(header_bytes.len()).await?; -            self.writer.write_all(&header_bytes).await?; -            self.is_header_written = true; -        } - -        // Write the given block. -        self.cid_buffer.clear(); -        cid.write_bytes(&mut self.cid_buffer).expect("vec write"); - -        let data = data.as_ref(); -        let len = self.cid_buffer.len() + data.len(); - -        self.writer.write_varint_async(len).await?; -        self.writer.write_all(&self.cid_buffer).await?; -        self.writer.write_all(data).await?; - -        Ok(()) -    } - -    /// Finishes writing, including flushing and returns the writer. -    pub async fn finish(mut self) -> Result<W, Error> { -        self.flush().await?; -        Ok(self.writer) -    } - -    /// Flushes the underlying writer. -    pub async fn flush(&mut self) -> Result<(), Error> { -        self.writer.flush().await?; -        Ok(()) -    } - -    /// Consumes the [`CarWriter`] and returns the underlying writer. -    pub fn into_inner(self) -> W { -        self.writer -    } -} | 
