summaryrefslogtreecommitdiffstats
path: root/iroh-car/src
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2022-10-28 20:00:55 -0700
committerBryan Newbold <bnewbold@robocracy.org>2022-10-28 20:00:55 -0700
commitf99cff96758176701656285712246173db76ac58 (patch)
tree10a4f8c167c055b39a96a342f9a9334ec877414e /iroh-car/src
parent4b4eb3aa819ee16cd8d7074cfd21d94deab2276d (diff)
downloadadenosine-f99cff96758176701656285712246173db76ac58.tar.gz
adenosine-f99cff96758176701656285712246173db76ac58.zip
copy in iroh-car from iroh repo, verbatim
Diffstat (limited to 'iroh-car/src')
-rw-r--r--iroh-car/src/error.rs28
-rw-r--r--iroh-car/src/header.rs104
-rw-r--r--iroh-car/src/lib.rs11
-rw-r--r--iroh-car/src/reader.rs99
-rw-r--r--iroh-car/src/util.rs95
-rw-r--r--iroh-car/src/writer.rs71
6 files changed, 408 insertions, 0 deletions
diff --git a/iroh-car/src/error.rs b/iroh-car/src/error.rs
new file mode 100644
index 0000000..3579413
--- /dev/null
+++ b/iroh-car/src/error.rs
@@ -0,0 +1,28 @@
+use thiserror::Error;
+
+/// Car utility error
+#[derive(Debug, Error)]
+pub enum Error {
+ #[error("Failed to parse CAR file: {0}")]
+ Parsing(String),
+ #[error("Invalid CAR file: {0}")]
+ InvalidFile(String),
+ #[error("Io error: {0}")]
+ Io(#[from] std::io::Error),
+ #[error("Cbor encoding error: {0}")]
+ Cbor(#[from] ipld::error::Error),
+ #[error("ld read too large {0}")]
+ LdReadTooLarge(usize),
+}
+
+impl From<cid::Error> for Error {
+ fn from(err: cid::Error) -> Error {
+ Error::Parsing(err.to_string())
+ }
+}
+
+impl From<cid::multihash::Error> for Error {
+ fn from(err: cid::multihash::Error) -> Error {
+ Error::Parsing(err.to_string())
+ }
+}
diff --git a/iroh-car/src/header.rs b/iroh-car/src/header.rs
new file mode 100644
index 0000000..c004e35
--- /dev/null
+++ b/iroh-car/src/header.rs
@@ -0,0 +1,104 @@
+use cid::Cid;
+use ipld::codec::Codec;
+use ipld_cbor::DagCborCodec;
+
+use crate::error::Error;
+
+/// A car header.
+#[derive(Debug, Clone, PartialEq, Eq)]
+#[non_exhaustive]
+pub enum CarHeader {
+ V1(CarHeaderV1),
+}
+
+impl CarHeader {
+ pub fn new_v1(roots: Vec<Cid>) -> Self {
+ Self::V1(roots.into())
+ }
+
+ pub fn decode(buffer: &[u8]) -> Result<Self, Error> {
+ let header: CarHeaderV1 = DagCborCodec
+ .decode(buffer)
+ .map_err(|e| Error::Parsing(e.to_string()))?;
+
+ if header.roots.is_empty() {
+ return Err(Error::Parsing("empty CAR file".to_owned()));
+ }
+
+ if header.version != 1 {
+ return Err(Error::InvalidFile(
+ "Only CAR file version 1 is supported".to_string(),
+ ));
+ }
+
+ Ok(CarHeader::V1(header))
+ }
+
+ pub fn encode(&self) -> Result<Vec<u8>, Error> {
+ match self {
+ CarHeader::V1(ref header) => {
+ let res = DagCborCodec.encode(header)?;
+ Ok(res)
+ }
+ }
+ }
+
+ pub fn roots(&self) -> &[Cid] {
+ match self {
+ CarHeader::V1(header) => &header.roots,
+ }
+ }
+
+ pub fn version(&self) -> u64 {
+ match self {
+ CarHeader::V1(_) => 1,
+ }
+ }
+}
+
+/// CAR file header version 1.
+#[derive(Debug, Clone, Default, ipld::DagCbor, PartialEq, Eq)]
+pub struct CarHeaderV1 {
+ #[ipld]
+ pub roots: Vec<Cid>,
+ #[ipld]
+ pub version: u64,
+}
+
+impl CarHeaderV1 {
+ /// Creates a new CAR file header
+ pub fn new(roots: Vec<Cid>, version: u64) -> Self {
+ Self { roots, version }
+ }
+}
+
+impl From<Vec<Cid>> for CarHeaderV1 {
+ fn from(roots: Vec<Cid>) -> Self {
+ Self { roots, version: 1 }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use ipld::codec::{Decode, Encode};
+ use ipld_cbor::DagCborCodec;
+ use multihash::MultihashDigest;
+
+ use super::*;
+
+ #[test]
+ fn symmetric_header_v1() {
+ let digest = multihash::Code::Blake2b256.digest(b"test");
+ let cid = Cid::new_v1(DagCborCodec.into(), digest);
+
+ let header = CarHeaderV1::from(vec![cid]);
+
+ let mut bytes = Vec::new();
+ header.encode(DagCborCodec, &mut bytes).unwrap();
+
+ assert_eq!(
+ CarHeaderV1::decode(DagCborCodec, &mut std::io::Cursor::new(&bytes)).unwrap(),
+ header
+ );
+ }
+}
diff --git a/iroh-car/src/lib.rs b/iroh-car/src/lib.rs
new file mode 100644
index 0000000..d4e5f66
--- /dev/null
+++ b/iroh-car/src/lib.rs
@@ -0,0 +1,11 @@
+//! Implementation of the [car](https://ipld.io/specs/transport/car/) format.
+
+mod error;
+mod header;
+mod reader;
+mod util;
+mod writer;
+
+pub use crate::header::CarHeader;
+pub use crate::reader::CarReader;
+pub use crate::writer::CarWriter;
diff --git a/iroh-car/src/reader.rs b/iroh-car/src/reader.rs
new file mode 100644
index 0000000..c0209be
--- /dev/null
+++ b/iroh-car/src/reader.rs
@@ -0,0 +1,99 @@
+use cid::Cid;
+use futures::Stream;
+use tokio::io::AsyncRead;
+
+use crate::{
+ error::Error,
+ header::CarHeader,
+ util::{ld_read, read_node},
+};
+
+/// Reads CAR files that are in a BufReader
+pub struct CarReader<R> {
+ reader: R,
+ header: CarHeader,
+ buffer: Vec<u8>,
+}
+
+impl<R> CarReader<R>
+where
+ R: AsyncRead + Send + Unpin,
+{
+ /// Creates a new CarReader and parses the CarHeader
+ pub async fn new(mut reader: R) -> Result<Self, Error> {
+ let mut buffer = Vec::new();
+
+ match ld_read(&mut reader, &mut buffer).await? {
+ Some(buf) => {
+ let header = CarHeader::decode(buf)?;
+
+ Ok(CarReader {
+ reader,
+ header,
+ buffer,
+ })
+ }
+ None => Err(Error::Parsing(
+ "failed to parse uvarint for header".to_string(),
+ )),
+ }
+ }
+
+ /// Returns the header of this car file.
+ pub fn header(&self) -> &CarHeader {
+ &self.header
+ }
+
+ /// Returns the next IPLD Block in the buffer
+ pub async fn next_block(&mut self) -> Result<Option<(Cid, Vec<u8>)>, Error> {
+ read_node(&mut self.reader, &mut self.buffer).await
+ }
+
+ pub fn stream(self) -> impl Stream<Item = Result<(Cid, Vec<u8>), Error>> {
+ futures::stream::try_unfold(self, |mut this| async move {
+ let maybe_block = read_node(&mut this.reader, &mut this.buffer).await?;
+ Ok(maybe_block.map(|b| (b, this)))
+ })
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::io::Cursor;
+
+ use cid::Cid;
+ use futures::TryStreamExt;
+ use ipld_cbor::DagCborCodec;
+ use multihash::MultihashDigest;
+
+ use crate::{header::CarHeaderV1, writer::CarWriter};
+
+ use super::*;
+
+ #[tokio::test]
+ async fn car_write_read() {
+ let digest_test = multihash::Code::Blake2b256.digest(b"test");
+ let cid_test = Cid::new_v1(DagCborCodec.into(), digest_test);
+
+ let digest_foo = multihash::Code::Blake2b256.digest(b"foo");
+ let cid_foo = Cid::new_v1(DagCborCodec.into(), digest_foo);
+
+ let header = CarHeader::V1(CarHeaderV1::from(vec![cid_foo]));
+
+ let mut buffer = Vec::new();
+ let mut writer = CarWriter::new(header, &mut buffer);
+ writer.write(cid_test, b"test").await.unwrap();
+ writer.write(cid_foo, b"foo").await.unwrap();
+ writer.finish().await.unwrap();
+
+ let reader = Cursor::new(&buffer);
+ let car_reader = CarReader::new(reader).await.unwrap();
+ let files: Vec<_> = car_reader.stream().try_collect().await.unwrap();
+
+ assert_eq!(files.len(), 2);
+ assert_eq!(files[0].0, cid_test);
+ assert_eq!(files[0].1, b"test");
+ assert_eq!(files[1].0, cid_foo);
+ assert_eq!(files[1].1, b"foo");
+ }
+}
diff --git a/iroh-car/src/util.rs b/iroh-car/src/util.rs
new file mode 100644
index 0000000..25be761
--- /dev/null
+++ b/iroh-car/src/util.rs
@@ -0,0 +1,95 @@
+use cid::Cid;
+use integer_encoding::VarIntAsyncReader;
+use tokio::io::{AsyncRead, AsyncReadExt};
+
+use super::error::Error;
+
+/// Maximum size that is used for single node.
+pub(crate) const MAX_ALLOC: usize = 4 * 1024 * 1024;
+
+pub(crate) async fn ld_read<R>(mut reader: R, buf: &mut Vec<u8>) -> Result<Option<&[u8]>, Error>
+where
+ R: AsyncRead + Send + Unpin,
+{
+ let length: usize = match VarIntAsyncReader::read_varint_async(&mut reader).await {
+ Ok(len) => len,
+ Err(e) => {
+ if e.kind() == std::io::ErrorKind::UnexpectedEof {
+ return Ok(None);
+ }
+ return Err(Error::Parsing(e.to_string()));
+ }
+ };
+
+ if length > MAX_ALLOC {
+ return Err(Error::LdReadTooLarge(length));
+ }
+ if length > buf.len() {
+ buf.resize(length, 0);
+ }
+
+ reader
+ .read_exact(&mut buf[..length])
+ .await
+ .map_err(|e| Error::Parsing(e.to_string()))?;
+
+ Ok(Some(&buf[..length]))
+}
+
+pub(crate) async fn read_node<R>(
+ buf_reader: &mut R,
+ buf: &mut Vec<u8>,
+) -> Result<Option<(Cid, Vec<u8>)>, Error>
+where
+ R: AsyncRead + Send + Unpin,
+{
+ if let Some(buf) = ld_read(buf_reader, buf).await? {
+ let mut cursor = std::io::Cursor::new(buf);
+ let c = Cid::read_bytes(&mut cursor)?;
+ let pos = cursor.position() as usize;
+
+ return Ok(Some((c, buf[pos..].to_vec())));
+ }
+ Ok(None)
+}
+
+#[cfg(test)]
+mod tests {
+ use integer_encoding::VarIntAsyncWriter;
+ use tokio::io::{AsyncWrite, AsyncWriteExt};
+
+ use super::*;
+
+ async fn ld_write<'a, W>(writer: &mut W, bytes: &[u8]) -> Result<(), Error>
+ where
+ W: AsyncWrite + Send + Unpin,
+ {
+ writer.write_varint_async(bytes.len()).await?;
+ writer.write_all(bytes).await?;
+ writer.flush().await?;
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn ld_read_write_good() {
+ let mut buffer = Vec::<u8>::new();
+ ld_write(&mut buffer, b"test bytes").await.unwrap();
+ let reader = std::io::Cursor::new(buffer);
+
+ let mut buffer = vec![1u8; 1024];
+ let read = ld_read(reader, &mut buffer).await.unwrap().unwrap();
+ assert_eq!(read, b"test bytes");
+ }
+
+ #[tokio::test]
+ async fn ld_read_write_fail() {
+ let mut buffer = Vec::<u8>::new();
+ let size = MAX_ALLOC + 1;
+ ld_write(&mut buffer, &vec![2u8; size]).await.unwrap();
+ let reader = std::io::Cursor::new(buffer);
+
+ let mut buffer = vec![1u8; 1024];
+ let read = ld_read(reader, &mut buffer).await;
+ assert!(matches!(read, Err(Error::LdReadTooLarge(_))));
+ }
+}
diff --git a/iroh-car/src/writer.rs b/iroh-car/src/writer.rs
new file mode 100644
index 0000000..9f17eb9
--- /dev/null
+++ b/iroh-car/src/writer.rs
@@ -0,0 +1,71 @@
+use cid::Cid;
+use integer_encoding::VarIntAsyncWriter;
+use tokio::io::{AsyncWrite, AsyncWriteExt};
+
+use crate::{error::Error, header::CarHeader};
+
+#[derive(Debug)]
+pub struct CarWriter<W> {
+ header: CarHeader,
+ writer: W,
+ cid_buffer: Vec<u8>,
+ is_header_written: bool,
+}
+
+impl<W> CarWriter<W>
+where
+ W: AsyncWrite + Send + Unpin,
+{
+ pub fn new(header: CarHeader, writer: W) -> Self {
+ CarWriter {
+ header,
+ writer,
+ cid_buffer: Vec::new(),
+ is_header_written: false,
+ }
+ }
+
+ /// Writes header and stream of data to writer in Car format.
+ pub async fn write<T>(&mut self, cid: Cid, data: T) -> Result<(), Error>
+ where
+ T: AsRef<[u8]>,
+ {
+ if !self.is_header_written {
+ // Write header bytes
+ let header_bytes = self.header.encode()?;
+ self.writer.write_varint_async(header_bytes.len()).await?;
+ self.writer.write_all(&header_bytes).await?;
+ self.is_header_written = true;
+ }
+
+ // Write the given block.
+ self.cid_buffer.clear();
+ cid.write_bytes(&mut self.cid_buffer).expect("vec write");
+
+ let data = data.as_ref();
+ let len = self.cid_buffer.len() + data.len();
+
+ self.writer.write_varint_async(len).await?;
+ self.writer.write_all(&self.cid_buffer).await?;
+ self.writer.write_all(data).await?;
+
+ Ok(())
+ }
+
+ /// Finishes writing, including flushing and returns the writer.
+ pub async fn finish(mut self) -> Result<W, Error> {
+ self.flush().await?;
+ Ok(self.writer)
+ }
+
+ /// Flushes the underlying writer.
+ pub async fn flush(&mut self) -> Result<(), Error> {
+ self.writer.flush().await?;
+ Ok(())
+ }
+
+ /// Consumes the [`CarWriter`] and returns the underlying writer.
+ pub fn into_inner(self) -> W {
+ self.writer
+ }
+}