diff options
Diffstat (limited to 'iroh-car/src')
| -rw-r--r-- | iroh-car/src/error.rs | 28 | ||||
| -rw-r--r-- | iroh-car/src/header.rs | 104 | ||||
| -rw-r--r-- | iroh-car/src/lib.rs | 11 | ||||
| -rw-r--r-- | iroh-car/src/reader.rs | 99 | ||||
| -rw-r--r-- | iroh-car/src/util.rs | 95 | ||||
| -rw-r--r-- | iroh-car/src/writer.rs | 71 | 
6 files changed, 408 insertions, 0 deletions
| diff --git a/iroh-car/src/error.rs b/iroh-car/src/error.rs new file mode 100644 index 0000000..3579413 --- /dev/null +++ b/iroh-car/src/error.rs @@ -0,0 +1,28 @@ +use thiserror::Error; + +/// Car utility error +#[derive(Debug, Error)] +pub enum Error { +    #[error("Failed to parse CAR file: {0}")] +    Parsing(String), +    #[error("Invalid CAR file: {0}")] +    InvalidFile(String), +    #[error("Io error: {0}")] +    Io(#[from] std::io::Error), +    #[error("Cbor encoding error: {0}")] +    Cbor(#[from] ipld::error::Error), +    #[error("ld read too large {0}")] +    LdReadTooLarge(usize), +} + +impl From<cid::Error> for Error { +    fn from(err: cid::Error) -> Error { +        Error::Parsing(err.to_string()) +    } +} + +impl From<cid::multihash::Error> for Error { +    fn from(err: cid::multihash::Error) -> Error { +        Error::Parsing(err.to_string()) +    } +} diff --git a/iroh-car/src/header.rs b/iroh-car/src/header.rs new file mode 100644 index 0000000..c004e35 --- /dev/null +++ b/iroh-car/src/header.rs @@ -0,0 +1,104 @@ +use cid::Cid; +use ipld::codec::Codec; +use ipld_cbor::DagCborCodec; + +use crate::error::Error; + +/// A car header. +#[derive(Debug, Clone, PartialEq, Eq)] +#[non_exhaustive] +pub enum CarHeader { +    V1(CarHeaderV1), +} + +impl CarHeader { +    pub fn new_v1(roots: Vec<Cid>) -> Self { +        Self::V1(roots.into()) +    } + +    pub fn decode(buffer: &[u8]) -> Result<Self, Error> { +        let header: CarHeaderV1 = DagCborCodec +            .decode(buffer) +            .map_err(|e| Error::Parsing(e.to_string()))?; + +        if header.roots.is_empty() { +            return Err(Error::Parsing("empty CAR file".to_owned())); +        } + +        if header.version != 1 { +            return Err(Error::InvalidFile( +                "Only CAR file version 1 is supported".to_string(), +            )); +        } + +        Ok(CarHeader::V1(header)) +    } + +    pub fn encode(&self) -> Result<Vec<u8>, Error> { +        match self { +            CarHeader::V1(ref header) => { +                let res = DagCborCodec.encode(header)?; +                Ok(res) +            } +        } +    } + +    pub fn roots(&self) -> &[Cid] { +        match self { +            CarHeader::V1(header) => &header.roots, +        } +    } + +    pub fn version(&self) -> u64 { +        match self { +            CarHeader::V1(_) => 1, +        } +    } +} + +/// CAR file header version 1. +#[derive(Debug, Clone, Default, ipld::DagCbor, PartialEq, Eq)] +pub struct CarHeaderV1 { +    #[ipld] +    pub roots: Vec<Cid>, +    #[ipld] +    pub version: u64, +} + +impl CarHeaderV1 { +    /// Creates a new CAR file header +    pub fn new(roots: Vec<Cid>, version: u64) -> Self { +        Self { roots, version } +    } +} + +impl From<Vec<Cid>> for CarHeaderV1 { +    fn from(roots: Vec<Cid>) -> Self { +        Self { roots, version: 1 } +    } +} + +#[cfg(test)] +mod tests { +    use ipld::codec::{Decode, Encode}; +    use ipld_cbor::DagCborCodec; +    use multihash::MultihashDigest; + +    use super::*; + +    #[test] +    fn symmetric_header_v1() { +        let digest = multihash::Code::Blake2b256.digest(b"test"); +        let cid = Cid::new_v1(DagCborCodec.into(), digest); + +        let header = CarHeaderV1::from(vec![cid]); + +        let mut bytes = Vec::new(); +        header.encode(DagCborCodec, &mut bytes).unwrap(); + +        assert_eq!( +            CarHeaderV1::decode(DagCborCodec, &mut std::io::Cursor::new(&bytes)).unwrap(), +            header +        ); +    } +} diff --git a/iroh-car/src/lib.rs b/iroh-car/src/lib.rs new file mode 100644 index 0000000..d4e5f66 --- /dev/null +++ b/iroh-car/src/lib.rs @@ -0,0 +1,11 @@ +//! Implementation of the [car](https://ipld.io/specs/transport/car/) format. + +mod error; +mod header; +mod reader; +mod util; +mod writer; + +pub use crate::header::CarHeader; +pub use crate::reader::CarReader; +pub use crate::writer::CarWriter; diff --git a/iroh-car/src/reader.rs b/iroh-car/src/reader.rs new file mode 100644 index 0000000..c0209be --- /dev/null +++ b/iroh-car/src/reader.rs @@ -0,0 +1,99 @@ +use cid::Cid; +use futures::Stream; +use tokio::io::AsyncRead; + +use crate::{ +    error::Error, +    header::CarHeader, +    util::{ld_read, read_node}, +}; + +/// Reads CAR files that are in a BufReader +pub struct CarReader<R> { +    reader: R, +    header: CarHeader, +    buffer: Vec<u8>, +} + +impl<R> CarReader<R> +where +    R: AsyncRead + Send + Unpin, +{ +    /// Creates a new CarReader and parses the CarHeader +    pub async fn new(mut reader: R) -> Result<Self, Error> { +        let mut buffer = Vec::new(); + +        match ld_read(&mut reader, &mut buffer).await? { +            Some(buf) => { +                let header = CarHeader::decode(buf)?; + +                Ok(CarReader { +                    reader, +                    header, +                    buffer, +                }) +            } +            None => Err(Error::Parsing( +                "failed to parse uvarint for header".to_string(), +            )), +        } +    } + +    /// Returns the header of this car file. +    pub fn header(&self) -> &CarHeader { +        &self.header +    } + +    /// Returns the next IPLD Block in the buffer +    pub async fn next_block(&mut self) -> Result<Option<(Cid, Vec<u8>)>, Error> { +        read_node(&mut self.reader, &mut self.buffer).await +    } + +    pub fn stream(self) -> impl Stream<Item = Result<(Cid, Vec<u8>), Error>> { +        futures::stream::try_unfold(self, |mut this| async move { +            let maybe_block = read_node(&mut this.reader, &mut this.buffer).await?; +            Ok(maybe_block.map(|b| (b, this))) +        }) +    } +} + +#[cfg(test)] +mod tests { +    use std::io::Cursor; + +    use cid::Cid; +    use futures::TryStreamExt; +    use ipld_cbor::DagCborCodec; +    use multihash::MultihashDigest; + +    use crate::{header::CarHeaderV1, writer::CarWriter}; + +    use super::*; + +    #[tokio::test] +    async fn car_write_read() { +        let digest_test = multihash::Code::Blake2b256.digest(b"test"); +        let cid_test = Cid::new_v1(DagCborCodec.into(), digest_test); + +        let digest_foo = multihash::Code::Blake2b256.digest(b"foo"); +        let cid_foo = Cid::new_v1(DagCborCodec.into(), digest_foo); + +        let header = CarHeader::V1(CarHeaderV1::from(vec![cid_foo])); + +        let mut buffer = Vec::new(); +        let mut writer = CarWriter::new(header, &mut buffer); +        writer.write(cid_test, b"test").await.unwrap(); +        writer.write(cid_foo, b"foo").await.unwrap(); +        writer.finish().await.unwrap(); + +        let reader = Cursor::new(&buffer); +        let car_reader = CarReader::new(reader).await.unwrap(); +        let files: Vec<_> = car_reader.stream().try_collect().await.unwrap(); + +        assert_eq!(files.len(), 2); +        assert_eq!(files[0].0, cid_test); +        assert_eq!(files[0].1, b"test"); +        assert_eq!(files[1].0, cid_foo); +        assert_eq!(files[1].1, b"foo"); +    } +} diff --git a/iroh-car/src/util.rs b/iroh-car/src/util.rs new file mode 100644 index 0000000..25be761 --- /dev/null +++ b/iroh-car/src/util.rs @@ -0,0 +1,95 @@ +use cid::Cid; +use integer_encoding::VarIntAsyncReader; +use tokio::io::{AsyncRead, AsyncReadExt}; + +use super::error::Error; + +/// Maximum size that is used for single node. +pub(crate) const MAX_ALLOC: usize = 4 * 1024 * 1024; + +pub(crate) async fn ld_read<R>(mut reader: R, buf: &mut Vec<u8>) -> Result<Option<&[u8]>, Error> +where +    R: AsyncRead + Send + Unpin, +{ +    let length: usize = match VarIntAsyncReader::read_varint_async(&mut reader).await { +        Ok(len) => len, +        Err(e) => { +            if e.kind() == std::io::ErrorKind::UnexpectedEof { +                return Ok(None); +            } +            return Err(Error::Parsing(e.to_string())); +        } +    }; + +    if length > MAX_ALLOC { +        return Err(Error::LdReadTooLarge(length)); +    } +    if length > buf.len() { +        buf.resize(length, 0); +    } + +    reader +        .read_exact(&mut buf[..length]) +        .await +        .map_err(|e| Error::Parsing(e.to_string()))?; + +    Ok(Some(&buf[..length])) +} + +pub(crate) async fn read_node<R>( +    buf_reader: &mut R, +    buf: &mut Vec<u8>, +) -> Result<Option<(Cid, Vec<u8>)>, Error> +where +    R: AsyncRead + Send + Unpin, +{ +    if let Some(buf) = ld_read(buf_reader, buf).await? { +        let mut cursor = std::io::Cursor::new(buf); +        let c = Cid::read_bytes(&mut cursor)?; +        let pos = cursor.position() as usize; + +        return Ok(Some((c, buf[pos..].to_vec()))); +    } +    Ok(None) +} + +#[cfg(test)] +mod tests { +    use integer_encoding::VarIntAsyncWriter; +    use tokio::io::{AsyncWrite, AsyncWriteExt}; + +    use super::*; + +    async fn ld_write<'a, W>(writer: &mut W, bytes: &[u8]) -> Result<(), Error> +    where +        W: AsyncWrite + Send + Unpin, +    { +        writer.write_varint_async(bytes.len()).await?; +        writer.write_all(bytes).await?; +        writer.flush().await?; +        Ok(()) +    } + +    #[tokio::test] +    async fn ld_read_write_good() { +        let mut buffer = Vec::<u8>::new(); +        ld_write(&mut buffer, b"test bytes").await.unwrap(); +        let reader = std::io::Cursor::new(buffer); + +        let mut buffer = vec![1u8; 1024]; +        let read = ld_read(reader, &mut buffer).await.unwrap().unwrap(); +        assert_eq!(read, b"test bytes"); +    } + +    #[tokio::test] +    async fn ld_read_write_fail() { +        let mut buffer = Vec::<u8>::new(); +        let size = MAX_ALLOC + 1; +        ld_write(&mut buffer, &vec![2u8; size]).await.unwrap(); +        let reader = std::io::Cursor::new(buffer); + +        let mut buffer = vec![1u8; 1024]; +        let read = ld_read(reader, &mut buffer).await; +        assert!(matches!(read, Err(Error::LdReadTooLarge(_)))); +    } +} diff --git a/iroh-car/src/writer.rs b/iroh-car/src/writer.rs new file mode 100644 index 0000000..9f17eb9 --- /dev/null +++ b/iroh-car/src/writer.rs @@ -0,0 +1,71 @@ +use cid::Cid; +use integer_encoding::VarIntAsyncWriter; +use tokio::io::{AsyncWrite, AsyncWriteExt}; + +use crate::{error::Error, header::CarHeader}; + +#[derive(Debug)] +pub struct CarWriter<W> { +    header: CarHeader, +    writer: W, +    cid_buffer: Vec<u8>, +    is_header_written: bool, +} + +impl<W> CarWriter<W> +where +    W: AsyncWrite + Send + Unpin, +{ +    pub fn new(header: CarHeader, writer: W) -> Self { +        CarWriter { +            header, +            writer, +            cid_buffer: Vec::new(), +            is_header_written: false, +        } +    } + +    /// Writes header and stream of data to writer in Car format. +    pub async fn write<T>(&mut self, cid: Cid, data: T) -> Result<(), Error> +    where +        T: AsRef<[u8]>, +    { +        if !self.is_header_written { +            // Write header bytes +            let header_bytes = self.header.encode()?; +            self.writer.write_varint_async(header_bytes.len()).await?; +            self.writer.write_all(&header_bytes).await?; +            self.is_header_written = true; +        } + +        // Write the given block. +        self.cid_buffer.clear(); +        cid.write_bytes(&mut self.cid_buffer).expect("vec write"); + +        let data = data.as_ref(); +        let len = self.cid_buffer.len() + data.len(); + +        self.writer.write_varint_async(len).await?; +        self.writer.write_all(&self.cid_buffer).await?; +        self.writer.write_all(data).await?; + +        Ok(()) +    } + +    /// Finishes writing, including flushing and returns the writer. +    pub async fn finish(mut self) -> Result<W, Error> { +        self.flush().await?; +        Ok(self.writer) +    } + +    /// Flushes the underlying writer. +    pub async fn flush(&mut self) -> Result<(), Error> { +        self.writer.flush().await?; +        Ok(()) +    } + +    /// Consumes the [`CarWriter`] and returns the underlying writer. +    pub fn into_inner(self) -> W { +        self.writer +    } +} | 
