summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock61
-rw-r--r--adenosine-pds/Cargo.toml4
-rw-r--r--adenosine-pds/src/atp_db.sql50
-rw-r--r--adenosine-pds/src/bin/adenosine-pds.rs6
-rw-r--r--adenosine-pds/src/car.rs61
-rw-r--r--adenosine-pds/src/db.rs137
-rw-r--r--adenosine-pds/src/lib.rs71
-rw-r--r--adenosine-pds/src/models.rs8
-rw-r--r--adenosine-pds/src/mst.rs11
-rw-r--r--adenosine-pds/tests/test_repro_mst.rs1
10 files changed, 348 insertions, 62 deletions
diff --git a/Cargo.lock b/Cargo.lock
index e944657..84dc965 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -35,16 +35,20 @@ name = "adenosine-pds"
version = "0.1.0-dev.0"
dependencies = [
"anyhow",
+ "bcrypt",
"futures",
"ipfs-sqlite-block-store",
"iroh-car",
"jsonschema",
+ "lazy_static",
"libipld",
"log",
"pretty_env_logger",
"rouille",
"rusqlite",
+ "rusqlite_migration",
"schemafy",
+ "serde",
"serde_json",
"sha256",
"structopt",
@@ -244,6 +248,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8"
[[package]]
+name = "bcrypt"
+version = "0.13.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a7e7c93a3fb23b2fdde989b2c9ec4dd153063ec81f408507f84c090cd91c6641"
+dependencies = [
+ "base64",
+ "blowfish",
+ "getrandom 0.2.8",
+ "zeroize",
+]
+
+[[package]]
name = "bit-set"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -324,6 +340,16 @@ dependencies = [
]
[[package]]
+name = "blowfish"
+version = "0.9.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e412e2cd0f2b2d93e02543ceae7917b3c70331573df19ee046bcbc35e45e87d7"
+dependencies = [
+ "byteorder",
+ "cipher",
+]
+
+[[package]]
name = "brotli"
version = "3.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -516,6 +542,16 @@ dependencies = [
]
[[package]]
+name = "cipher"
+version = "0.4.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d1873270f8f7942c191139cb8a40fd228da6c3fd2fc376d7e92d47aa14aeb59e"
+dependencies = [
+ "crypto-common",
+ "inout",
+]
+
+[[package]]
name = "clap"
version = "2.34.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1846,6 +1882,15 @@ dependencies = [
]
[[package]]
+name = "inout"
+version = "0.1.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a0c10553d664a4d0bcff9f4215d0aac67a639cc68ef660840afe309b807bc9f5"
+dependencies = [
+ "generic-array",
+]
+
+[[package]]
name = "instant"
version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -3411,6 +3456,16 @@ dependencies = [
]
[[package]]
+name = "rusqlite_migration"
+version = "1.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e2a7ba8908f9b41e2b240a3de0790517c7a636df1f7bb7612276a2d0a42f9ce2"
+dependencies = [
+ "log",
+ "rusqlite",
+]
+
+[[package]]
name = "rustc_version"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -5275,3 +5330,9 @@ name = "yansi"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09041cd90cf85f7f8b2df60c646f853b7f535ce68f85244eb6731cf89fa498ec"
+
+[[package]]
+name = "zeroize"
+version = "1.5.7"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c394b5bd0c6f669e7275d9c20aa90ae064cb22e75a1cad54e1b34088034b149f"
diff --git a/adenosine-pds/Cargo.toml b/adenosine-pds/Cargo.toml
index 7f6e84a..48154e7 100644
--- a/adenosine-pds/Cargo.toml
+++ b/adenosine-pds/Cargo.toml
@@ -15,12 +15,14 @@ repository = "https://gitlab.com/bnewbold/adenosine"
[dependencies]
anyhow = "*"
structopt = "*"
+serde = "1"
serde_json = "*"
log = "*"
pretty_env_logger = "*"
libipld = "*"
ipfs-sqlite-block-store = "*"
rusqlite = { version = "*", features = ["bundled"] }
+rusqlite_migration = "*"
jsonschema = "*"
schemafy = "*"
rouille = "*"
@@ -28,6 +30,8 @@ iroh-car = { version = "0.1.0-vendored.0", path = "../iroh-car" }
tokio = { version = "1", features = ["full"] }
futures = "0.3"
sha256 = "*"
+lazy_static = "*"
+bcrypt = "0.13"
[package.metadata.deb]
maintainer = "Bryan Newbold <bnewbold@robocracy.org>"
diff --git a/adenosine-pds/src/atp_db.sql b/adenosine-pds/src/atp_db.sql
new file mode 100644
index 0000000..918a89c
--- /dev/null
+++ b/adenosine-pds/src/atp_db.sql
@@ -0,0 +1,50 @@
+
+----------- atproto system tables
+
+CREATE TABLE account(
+ did TEXT PRIMARY KEY NOT NULL,
+ username TEXT NOT NULL,
+ email TEXT NOT NULL,
+ password_bcrypt TEXT NOT NULL,
+ signing_key TEXT NOT NULL,
+);
+CREATE UNIQUE INDEX account_username_uniq_idx on account(lower(username));
+CREATE UNIQUE INDEX account_email_uniq_idx on account(lower(email));
+
+CREATE TABLE did_doc(
+ did TEXT PRIMARY KEY NOT NULL,
+ doc_json TEXT NOT NULL,
+ seen_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
+);
+
+CREATE TABLE session(
+ did TEXT NOT NULL,
+ jwt TEXT NOT NULL,
+ created_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
+ PRIMARY KEY(did, jwt)
+);
+
+CREATE TABLE repo(
+ did TEXT PRIMARY KEY NOT NULL,
+ head_commit TEXT NOT NULL,
+);
+
+CREATE TABLE record(
+ did TEXT NOT NULL,
+ collection TEXT NOT NULL,
+ tid TEXT NOT NULL,
+ record_cid TEXT NOT NULL,
+ record_json TEXT NOT NULL,
+ PRIMARY KEY(did, collection, tid)
+);
+
+CREATE TABLE password_reset(
+ did TEXT NOT NULL,
+ token TEXT NOT NULL,
+ PRIMARY KEY(did, token)
+);
+
+
+----------- bsky app/index tables
+
+
diff --git a/adenosine-pds/src/bin/adenosine-pds.rs b/adenosine-pds/src/bin/adenosine-pds.rs
index 44c4cef..b76d015 100644
--- a/adenosine-pds/src/bin/adenosine-pds.rs
+++ b/adenosine-pds/src/bin/adenosine-pds.rs
@@ -80,11 +80,9 @@ fn main() -> Result<()> {
match opt.cmd {
Command::Serve { port } => {
// TODO: log some config stuff?
- run_server(port)
- }
- Command::Import { car_path } => {
- load_car_to_sqlite(&opt.blockstore_db_path, &car_path)
+ run_server(port, &opt.blockstore_db_path, &opt.atp_db_path)
}
+ Command::Import { car_path } => load_car_to_sqlite(&opt.blockstore_db_path, &car_path),
Command::Inspect {} => dump_mst_keys(&opt.blockstore_db_path),
}
}
diff --git a/adenosine-pds/src/car.rs b/adenosine-pds/src/car.rs
new file mode 100644
index 0000000..5731848
--- /dev/null
+++ b/adenosine-pds/src/car.rs
@@ -0,0 +1,61 @@
+use anyhow::Result;
+
+use futures::TryStreamExt;
+use ipfs_sqlite_block_store::BlockStore;
+use iroh_car::CarReader;
+use libipld::block::Block;
+use std::path::PathBuf;
+use tokio::fs::File;
+use tokio::io::BufReader;
+
+pub fn load_car_to_sqlite(db_path: &PathBuf, car_path: &PathBuf) -> Result<()> {
+ let mut db: BlockStore<libipld::DefaultParams> =
+ { BlockStore::open(db_path, Default::default())? };
+
+ load_car_to_blockstore(&mut db, car_path)
+}
+
+pub fn load_car_to_blockstore(
+ db: &mut BlockStore<libipld::DefaultParams>,
+ car_path: &PathBuf,
+) -> Result<()> {
+ let rt = tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()?;
+ rt.block_on(inner_car_loader(db, car_path))
+}
+
+// this async function is wrapped in the sync version above
+async fn inner_car_loader(
+ db: &mut BlockStore<libipld::DefaultParams>,
+ car_path: &PathBuf,
+) -> Result<()> {
+ println!(
+ "{} - {}",
+ std::env::current_dir()?.display(),
+ car_path.display()
+ );
+ let car_reader = {
+ let file = File::open(car_path).await?;
+ let buf_reader = BufReader::new(file);
+ CarReader::new(buf_reader).await?
+ };
+ let car_header = car_reader.header().clone();
+
+ car_reader
+ .stream()
+ .try_for_each(|(cid, raw)| {
+ // TODO: error handling here instead of unwrap (?)
+ let block = Block::new(cid, raw).unwrap();
+ db.put_block(block, None).unwrap();
+ futures::future::ready(Ok(()))
+ })
+ .await?;
+
+ // pin the header
+ if car_header.roots().len() >= 1 {
+ db.alias(b"import".to_vec(), Some(&car_header.roots()[0]))?;
+ }
+
+ Ok(())
+}
diff --git a/adenosine-pds/src/db.rs b/adenosine-pds/src/db.rs
new file mode 100644
index 0000000..1d6d4ed
--- /dev/null
+++ b/adenosine-pds/src/db.rs
@@ -0,0 +1,137 @@
+use crate::AtpSession;
+/// ATP database (as distinct from blockstore)
+use anyhow::{anyhow, Result};
+use lazy_static::lazy_static;
+use rusqlite::{params, Connection};
+use rusqlite_migration::{Migrations, M};
+use serde_json::Value;
+use std::path::PathBuf;
+use std::str::FromStr;
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn migrations_test() {
+ assert!(MIGRATIONS.validate().is_ok());
+ }
+}
+
+lazy_static! {
+ static ref MIGRATIONS: Migrations<'static> =
+ Migrations::new(vec![M::up(include_str!("atp_db.sql")),]);
+}
+
+#[derive(Debug)]
+pub struct AtpDatabase {
+ conn: Connection,
+}
+
+impl AtpDatabase {
+ pub fn open(path: &PathBuf) -> Result<Self> {
+ let mut conn = Connection::open(path)?;
+ MIGRATIONS.to_latest(&mut conn)?;
+ // any pragma would happen here
+ Ok(AtpDatabase { conn })
+ }
+
+ /// temporary database, eg for tests.
+ ///
+ /// TODO: should create a tmp file on ramdisk (/var/tmp?) instead of opening an in-memory
+ /// database. in-memory database can't be used with multiple connections
+ pub fn open_ephemeral() -> Result<Self> {
+ let mut conn = Connection::open_in_memory()?;
+ MIGRATIONS.to_latest(&mut conn)?;
+ // any pragma would happen here
+ Ok(AtpDatabase { conn })
+ }
+
+ /// Creates an entirely new connection to the same database
+ ///
+ /// Skips re-running migrations.
+ ///
+ /// Fails for ephemeral databases.
+ pub fn new_connection(&self) -> Result<Self> {
+ // TODO: let path = std::path::PathBuf::from(self.conn.path().ok_or(Err(anyhow!("expected real database")))?);
+ let path = std::path::PathBuf::from(self.conn.path().expect("expected real database"));
+ let conn = Connection::open(path)?;
+ Ok(AtpDatabase { conn })
+ }
+
+ pub fn get_record(&mut self, did: &str, collection: &str, tid: &str) -> Result<Value> {
+ let mut stmt = self.conn.prepare_cached(
+ "SELECT record_json FROM record WHERE did = ?1 collection = ?2 tid = ?3",
+ )?;
+ Ok(stmt.query_row(params!(did, collection, tid), |row| {
+ row.get(0).map(|v: String| Value::from_str(&v))
+ })??)
+ }
+
+ pub fn get_record_list(&mut self, did: &str, collection: &str) -> Result<Vec<String>> {
+ let mut stmt = self
+ .conn
+ .prepare_cached("SELECT tid FROM record WHERE did = ?1 collection = ?2")?;
+ let ret = stmt
+ .query_and_then(params!(did, collection), |row| {
+ let v: String = row.get(0)?;
+ Ok(v)
+ })?
+ .collect();
+ ret
+ }
+
+ pub fn get_collection_list(&mut self, did: &str) -> Result<Vec<String>> {
+ let mut stmt = self
+ .conn
+ .prepare_cached("SELECT collection FROM record WHERE did = ?1 GROUP BY collection")?;
+ let ret = stmt
+ .query_and_then(params!(did), |row| {
+ let v: String = row.get(0)?;
+ Ok(v)
+ })?
+ .collect();
+ ret
+ }
+
+ pub fn create_account(
+ &mut self,
+ username: &str,
+ password: &str,
+ email: &str,
+ ) -> Result<AtpSession> {
+ // TODO: validate email (regex?)
+ // TODO: validate username
+ // TODO: generate and store signing key
+ // TODO: generate plc did (randomly for now?)
+ // TODO: insert did_doc
+ // TODO: also need to initialize repo with... profile?
+ {
+ let password_bcrypt = bcrypt::hash(password, bcrypt::DEFAULT_COST)?;
+ let signing_key = "key:TODO";
+ let did = "did:TODO";
+ let mut stmt = self
+ .conn
+ .prepare_cached("INSERT INTO account (username, password_bcrypt, email, did, signing_key) VALUES (?1, ?2, ?3, ?4, ?5)")?;
+ stmt.execute(params!(username, password_bcrypt, email, did, signing_key))?;
+ }
+ self.create_session(username, password)
+ }
+
+ pub fn create_session(&mut self, username: &str, password: &str) -> Result<AtpSession> {
+ let mut stmt = self
+ .conn
+ .prepare_cached("SELECT password_bcrypt FROM account WHERE username = ?1")?;
+ let password_bcrypt: String = stmt.query_row(params!(username), |row| row.get(0))?;
+ if !bcrypt::verify(password, &password_bcrypt)? {
+ return Err(anyhow!("password did not match"));
+ }
+ // TODO: generate JWT
+ // TODO: insert session wtih JWT
+ Ok(AtpSession {
+ name: username.to_string(),
+ did: "did:TODO".to_string(),
+ jwt: "jwt:TODO".to_string(),
+ })
+ }
+}
diff --git a/adenosine-pds/src/lib.rs b/adenosine-pds/src/lib.rs
index 1785640..a8dc46a 100644
--- a/adenosine-pds/src/lib.rs
+++ b/adenosine-pds/src/lib.rs
@@ -1,23 +1,29 @@
use anyhow::Result;
use log::{error, info};
use rouille::{router, Request, Response};
+use std::path::PathBuf;
+use std::sync::Mutex;
-use futures::TryStreamExt;
use ipfs_sqlite_block_store::BlockStore;
-use iroh_car::CarReader;
-use libipld::block::Block;
-use std::path::PathBuf;
-use tokio::fs::File;
-use tokio::io::BufReader;
+mod car;
+mod db;
+mod models;
mod mst;
+pub use car::{load_car_to_blockstore, load_car_to_sqlite};
+pub use db::AtpDatabase;
+pub use models::*;
pub use mst::{dump_mst_keys, repro_mst};
-pub fn run_server(port: u16) -> Result<()> {
- // TODO: log access requests
+pub fn run_server(port: u16, blockstore_db_path: &PathBuf, atp_db_path: &PathBuf) -> Result<()> {
// TODO: some static files? https://github.com/tomaka/rouille/blob/master/examples/static-files.rs
+ // TODO: could just open connection on every request?
+ let db = Mutex::new(AtpDatabase::open(atp_db_path)?);
+ let mut _blockstore: BlockStore<libipld::DefaultParams> =
+ BlockStore::open(blockstore_db_path, Default::default())?;
+
let log_ok = |req: &Request, _resp: &Response, elap: std::time::Duration| {
info!("{} {} ({:?})", req.method(), req.raw_url(), elap);
};
@@ -43,50 +49,15 @@ pub fn run_server(port: u16) -> Result<()> {
Response::text("didn't get other thing")
// TODO: parse and echo back JSON body
},
+
+ (GET) ["/xrpc/com.atproto.getRecord"] => {
+ // TODO: JSON response
+ // TODO: handle error
+ let mut db = db.lock().unwrap().new_connection().unwrap();
+ Response::text(db.get_record("asdf", "123", "blah").unwrap().to_string())
+ },
_ => rouille::Response::empty_404()
)
})
});
}
-
-pub fn load_car_to_sqlite(db_path: &PathBuf, car_path: &PathBuf) -> Result<()> {
- let mut db: BlockStore<libipld::DefaultParams> =
- { BlockStore::open(db_path, Default::default())? };
-
- load_car_to_blockstore(&mut db, car_path)
-}
-
-pub fn load_car_to_blockstore(db: &mut BlockStore<libipld::DefaultParams>, car_path: &PathBuf) -> Result<()> {
- let rt = tokio::runtime::Builder::new_current_thread()
- .enable_all()
- .build()?;
- rt.block_on(inner_car_loader(db, car_path))
-}
-
-// this async function is wrapped in the sync version above
-async fn inner_car_loader(db: &mut BlockStore<libipld::DefaultParams>, car_path: &PathBuf) -> Result<()> {
- println!("{} - {}", std::env::current_dir()?.display(), car_path.display());
- let car_reader = {
- let file = File::open(car_path).await?;
- let buf_reader = BufReader::new(file);
- CarReader::new(buf_reader).await?
- };
- let car_header = car_reader.header().clone();
-
- car_reader
- .stream()
- .try_for_each(|(cid, raw)| {
- // TODO: error handling here instead of unwrap (?)
- let block = Block::new(cid, raw).unwrap();
- db.put_block(block, None).unwrap();
- futures::future::ready(Ok(()))
- })
- .await?;
-
- // pin the header
- if car_header.roots().len() >= 1 {
- db.alias(b"import".to_vec(), Some(&car_header.roots()[0]))?;
- }
-
- Ok(())
-}
diff --git a/adenosine-pds/src/models.rs b/adenosine-pds/src/models.rs
new file mode 100644
index 0000000..6f9bf81
--- /dev/null
+++ b/adenosine-pds/src/models.rs
@@ -0,0 +1,8 @@
+use serde;
+
+#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq)]
+pub struct AtpSession {
+ pub jwt: String,
+ pub name: String,
+ pub did: String,
+}
diff --git a/adenosine-pds/src/mst.rs b/adenosine-pds/src/mst.rs
index a2a394f..429d8c8 100644
--- a/adenosine-pds/src/mst.rs
+++ b/adenosine-pds/src/mst.rs
@@ -1,3 +1,4 @@
+use crate::load_car_to_blockstore;
use anyhow::{anyhow, Result};
use ipfs_sqlite_block_store::BlockStore;
use libipld::cbor::DagCborCodec;
@@ -9,7 +10,6 @@ use libipld::{Cid, DagCbor};
use log::{debug, error, info};
use std::collections::BTreeMap;
use std::path::PathBuf;
-use crate::load_car_to_blockstore;
#[derive(Debug, DagCbor, PartialEq, Eq)]
struct CommitNode {
@@ -85,8 +85,7 @@ fn print_mst_keys(db: &mut BlockStore<libipld::DefaultParams>, cid: &Cid) -> Res
}
pub fn dump_mst_keys(db_path: &PathBuf) -> Result<()> {
- let mut db: BlockStore<libipld::DefaultParams> =
- { BlockStore::open(db_path, Default::default())? };
+ let mut db: BlockStore<libipld::DefaultParams> = BlockStore::open(db_path, Default::default())?;
let all_aliases: Vec<(Vec<u8>, Cid)> = db.aliases()?;
if all_aliases.is_empty() {
@@ -287,11 +286,9 @@ fn serialize_wip_tree(
}
pub fn repro_mst(car_path: &PathBuf) -> Result<()> {
-
// open a temp block store
- let mut db: BlockStore<libipld::DefaultParams> = {
- BlockStore::open_path(ipfs_sqlite_block_store::DbPath::Memory, Default::default())?
- };
+ let mut db: BlockStore<libipld::DefaultParams> =
+ { BlockStore::open_path(ipfs_sqlite_block_store::DbPath::Memory, Default::default())? };
// load CAR contents from file
load_car_to_blockstore(&mut db, car_path)?;
diff --git a/adenosine-pds/tests/test_repro_mst.rs b/adenosine-pds/tests/test_repro_mst.rs
index 4dde364..ad5fba1 100644
--- a/adenosine-pds/tests/test_repro_mst.rs
+++ b/adenosine-pds/tests/test_repro_mst.rs
@@ -1,4 +1,3 @@
-
use adenosine_pds::repro_mst;
use std::path::PathBuf;
use std::str::FromStr;