From eec74745e4c4af7d744509d66cb9b56481c471d3 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 1 Nov 2022 11:28:46 -0700 Subject: pds: more skeleton progress --- Cargo.lock | 61 +++++++++++++++ adenosine-pds/Cargo.toml | 4 + adenosine-pds/src/atp_db.sql | 50 ++++++++++++ adenosine-pds/src/bin/adenosine-pds.rs | 6 +- adenosine-pds/src/car.rs | 61 +++++++++++++++ adenosine-pds/src/db.rs | 137 +++++++++++++++++++++++++++++++++ adenosine-pds/src/lib.rs | 71 +++++------------ adenosine-pds/src/models.rs | 8 ++ adenosine-pds/src/mst.rs | 11 +-- adenosine-pds/tests/test_repro_mst.rs | 1 - 10 files changed, 348 insertions(+), 62 deletions(-) create mode 100644 adenosine-pds/src/atp_db.sql create mode 100644 adenosine-pds/src/car.rs create mode 100644 adenosine-pds/src/db.rs create mode 100644 adenosine-pds/src/models.rs diff --git a/Cargo.lock b/Cargo.lock index e944657..84dc965 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -35,16 +35,20 @@ name = "adenosine-pds" version = "0.1.0-dev.0" dependencies = [ "anyhow", + "bcrypt", "futures", "ipfs-sqlite-block-store", "iroh-car", "jsonschema", + "lazy_static", "libipld", "log", "pretty_env_logger", "rouille", "rusqlite", + "rusqlite_migration", "schemafy", + "serde", "serde_json", "sha256", "structopt", @@ -243,6 +247,18 @@ version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" +[[package]] +name = "bcrypt" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7e7c93a3fb23b2fdde989b2c9ec4dd153063ec81f408507f84c090cd91c6641" +dependencies = [ + "base64", + "blowfish", + "getrandom 0.2.8", + "zeroize", +] + [[package]] name = "bit-set" version = "0.5.3" @@ -323,6 +339,16 @@ dependencies = [ "generic-array", ] +[[package]] +name = "blowfish" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e412e2cd0f2b2d93e02543ceae7917b3c70331573df19ee046bcbc35e45e87d7" +dependencies = [ + "byteorder", + "cipher", +] + [[package]] name = "brotli" version = "3.3.4" @@ -515,6 +541,16 @@ dependencies = [ "unsigned-varint", ] +[[package]] +name = "cipher" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1873270f8f7942c191139cb8a40fd228da6c3fd2fc376d7e92d47aa14aeb59e" +dependencies = [ + "crypto-common", + "inout", +] + [[package]] name = "clap" version = "2.34.0" @@ -1845,6 +1881,15 @@ dependencies = [ "adler32", ] +[[package]] +name = "inout" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0c10553d664a4d0bcff9f4215d0aac67a639cc68ef660840afe309b807bc9f5" +dependencies = [ + "generic-array", +] + [[package]] name = "instant" version = "0.1.12" @@ -3410,6 +3455,16 @@ dependencies = [ "smallvec", ] +[[package]] +name = "rusqlite_migration" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2a7ba8908f9b41e2b240a3de0790517c7a636df1f7bb7612276a2d0a42f9ce2" +dependencies = [ + "log", + "rusqlite", +] + [[package]] name = "rustc_version" version = "0.3.3" @@ -5275,3 +5330,9 @@ name = "yansi" version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09041cd90cf85f7f8b2df60c646f853b7f535ce68f85244eb6731cf89fa498ec" + +[[package]] +name = "zeroize" +version = "1.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c394b5bd0c6f669e7275d9c20aa90ae064cb22e75a1cad54e1b34088034b149f" diff --git a/adenosine-pds/Cargo.toml b/adenosine-pds/Cargo.toml index 7f6e84a..48154e7 100644 --- a/adenosine-pds/Cargo.toml +++ b/adenosine-pds/Cargo.toml @@ -15,12 +15,14 @@ repository = "https://gitlab.com/bnewbold/adenosine" [dependencies] anyhow = "*" structopt = "*" +serde = "1" serde_json = "*" log = "*" pretty_env_logger = "*" libipld = "*" ipfs-sqlite-block-store = "*" rusqlite = { version = "*", features = ["bundled"] } +rusqlite_migration = "*" jsonschema = "*" schemafy = "*" rouille = "*" @@ -28,6 +30,8 @@ iroh-car = { version = "0.1.0-vendored.0", path = "../iroh-car" } tokio = { version = "1", features = ["full"] } futures = "0.3" sha256 = "*" +lazy_static = "*" +bcrypt = "0.13" [package.metadata.deb] maintainer = "Bryan Newbold " diff --git a/adenosine-pds/src/atp_db.sql b/adenosine-pds/src/atp_db.sql new file mode 100644 index 0000000..918a89c --- /dev/null +++ b/adenosine-pds/src/atp_db.sql @@ -0,0 +1,50 @@ + +----------- atproto system tables + +CREATE TABLE account( + did TEXT PRIMARY KEY NOT NULL, + username TEXT NOT NULL, + email TEXT NOT NULL, + password_bcrypt TEXT NOT NULL, + signing_key TEXT NOT NULL, +); +CREATE UNIQUE INDEX account_username_uniq_idx on account(lower(username)); +CREATE UNIQUE INDEX account_email_uniq_idx on account(lower(email)); + +CREATE TABLE did_doc( + did TEXT PRIMARY KEY NOT NULL, + doc_json TEXT NOT NULL, + seen_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, +); + +CREATE TABLE session( + did TEXT NOT NULL, + jwt TEXT NOT NULL, + created_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, + PRIMARY KEY(did, jwt) +); + +CREATE TABLE repo( + did TEXT PRIMARY KEY NOT NULL, + head_commit TEXT NOT NULL, +); + +CREATE TABLE record( + did TEXT NOT NULL, + collection TEXT NOT NULL, + tid TEXT NOT NULL, + record_cid TEXT NOT NULL, + record_json TEXT NOT NULL, + PRIMARY KEY(did, collection, tid) +); + +CREATE TABLE password_reset( + did TEXT NOT NULL, + token TEXT NOT NULL, + PRIMARY KEY(did, token) +); + + +----------- bsky app/index tables + + diff --git a/adenosine-pds/src/bin/adenosine-pds.rs b/adenosine-pds/src/bin/adenosine-pds.rs index 44c4cef..b76d015 100644 --- a/adenosine-pds/src/bin/adenosine-pds.rs +++ b/adenosine-pds/src/bin/adenosine-pds.rs @@ -80,11 +80,9 @@ fn main() -> Result<()> { match opt.cmd { Command::Serve { port } => { // TODO: log some config stuff? - run_server(port) - } - Command::Import { car_path } => { - load_car_to_sqlite(&opt.blockstore_db_path, &car_path) + run_server(port, &opt.blockstore_db_path, &opt.atp_db_path) } + Command::Import { car_path } => load_car_to_sqlite(&opt.blockstore_db_path, &car_path), Command::Inspect {} => dump_mst_keys(&opt.blockstore_db_path), } } diff --git a/adenosine-pds/src/car.rs b/adenosine-pds/src/car.rs new file mode 100644 index 0000000..5731848 --- /dev/null +++ b/adenosine-pds/src/car.rs @@ -0,0 +1,61 @@ +use anyhow::Result; + +use futures::TryStreamExt; +use ipfs_sqlite_block_store::BlockStore; +use iroh_car::CarReader; +use libipld::block::Block; +use std::path::PathBuf; +use tokio::fs::File; +use tokio::io::BufReader; + +pub fn load_car_to_sqlite(db_path: &PathBuf, car_path: &PathBuf) -> Result<()> { + let mut db: BlockStore = + { BlockStore::open(db_path, Default::default())? }; + + load_car_to_blockstore(&mut db, car_path) +} + +pub fn load_car_to_blockstore( + db: &mut BlockStore, + car_path: &PathBuf, +) -> Result<()> { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()?; + rt.block_on(inner_car_loader(db, car_path)) +} + +// this async function is wrapped in the sync version above +async fn inner_car_loader( + db: &mut BlockStore, + car_path: &PathBuf, +) -> Result<()> { + println!( + "{} - {}", + std::env::current_dir()?.display(), + car_path.display() + ); + let car_reader = { + let file = File::open(car_path).await?; + let buf_reader = BufReader::new(file); + CarReader::new(buf_reader).await? + }; + let car_header = car_reader.header().clone(); + + car_reader + .stream() + .try_for_each(|(cid, raw)| { + // TODO: error handling here instead of unwrap (?) + let block = Block::new(cid, raw).unwrap(); + db.put_block(block, None).unwrap(); + futures::future::ready(Ok(())) + }) + .await?; + + // pin the header + if car_header.roots().len() >= 1 { + db.alias(b"import".to_vec(), Some(&car_header.roots()[0]))?; + } + + Ok(()) +} diff --git a/adenosine-pds/src/db.rs b/adenosine-pds/src/db.rs new file mode 100644 index 0000000..1d6d4ed --- /dev/null +++ b/adenosine-pds/src/db.rs @@ -0,0 +1,137 @@ +use crate::AtpSession; +/// ATP database (as distinct from blockstore) +use anyhow::{anyhow, Result}; +use lazy_static::lazy_static; +use rusqlite::{params, Connection}; +use rusqlite_migration::{Migrations, M}; +use serde_json::Value; +use std::path::PathBuf; +use std::str::FromStr; + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn migrations_test() { + assert!(MIGRATIONS.validate().is_ok()); + } +} + +lazy_static! { + static ref MIGRATIONS: Migrations<'static> = + Migrations::new(vec![M::up(include_str!("atp_db.sql")),]); +} + +#[derive(Debug)] +pub struct AtpDatabase { + conn: Connection, +} + +impl AtpDatabase { + pub fn open(path: &PathBuf) -> Result { + let mut conn = Connection::open(path)?; + MIGRATIONS.to_latest(&mut conn)?; + // any pragma would happen here + Ok(AtpDatabase { conn }) + } + + /// temporary database, eg for tests. + /// + /// TODO: should create a tmp file on ramdisk (/var/tmp?) instead of opening an in-memory + /// database. in-memory database can't be used with multiple connections + pub fn open_ephemeral() -> Result { + let mut conn = Connection::open_in_memory()?; + MIGRATIONS.to_latest(&mut conn)?; + // any pragma would happen here + Ok(AtpDatabase { conn }) + } + + /// Creates an entirely new connection to the same database + /// + /// Skips re-running migrations. + /// + /// Fails for ephemeral databases. + pub fn new_connection(&self) -> Result { + // TODO: let path = std::path::PathBuf::from(self.conn.path().ok_or(Err(anyhow!("expected real database")))?); + let path = std::path::PathBuf::from(self.conn.path().expect("expected real database")); + let conn = Connection::open(path)?; + Ok(AtpDatabase { conn }) + } + + pub fn get_record(&mut self, did: &str, collection: &str, tid: &str) -> Result { + let mut stmt = self.conn.prepare_cached( + "SELECT record_json FROM record WHERE did = ?1 collection = ?2 tid = ?3", + )?; + Ok(stmt.query_row(params!(did, collection, tid), |row| { + row.get(0).map(|v: String| Value::from_str(&v)) + })??) + } + + pub fn get_record_list(&mut self, did: &str, collection: &str) -> Result> { + let mut stmt = self + .conn + .prepare_cached("SELECT tid FROM record WHERE did = ?1 collection = ?2")?; + let ret = stmt + .query_and_then(params!(did, collection), |row| { + let v: String = row.get(0)?; + Ok(v) + })? + .collect(); + ret + } + + pub fn get_collection_list(&mut self, did: &str) -> Result> { + let mut stmt = self + .conn + .prepare_cached("SELECT collection FROM record WHERE did = ?1 GROUP BY collection")?; + let ret = stmt + .query_and_then(params!(did), |row| { + let v: String = row.get(0)?; + Ok(v) + })? + .collect(); + ret + } + + pub fn create_account( + &mut self, + username: &str, + password: &str, + email: &str, + ) -> Result { + // TODO: validate email (regex?) + // TODO: validate username + // TODO: generate and store signing key + // TODO: generate plc did (randomly for now?) + // TODO: insert did_doc + // TODO: also need to initialize repo with... profile? + { + let password_bcrypt = bcrypt::hash(password, bcrypt::DEFAULT_COST)?; + let signing_key = "key:TODO"; + let did = "did:TODO"; + let mut stmt = self + .conn + .prepare_cached("INSERT INTO account (username, password_bcrypt, email, did, signing_key) VALUES (?1, ?2, ?3, ?4, ?5)")?; + stmt.execute(params!(username, password_bcrypt, email, did, signing_key))?; + } + self.create_session(username, password) + } + + pub fn create_session(&mut self, username: &str, password: &str) -> Result { + let mut stmt = self + .conn + .prepare_cached("SELECT password_bcrypt FROM account WHERE username = ?1")?; + let password_bcrypt: String = stmt.query_row(params!(username), |row| row.get(0))?; + if !bcrypt::verify(password, &password_bcrypt)? { + return Err(anyhow!("password did not match")); + } + // TODO: generate JWT + // TODO: insert session wtih JWT + Ok(AtpSession { + name: username.to_string(), + did: "did:TODO".to_string(), + jwt: "jwt:TODO".to_string(), + }) + } +} diff --git a/adenosine-pds/src/lib.rs b/adenosine-pds/src/lib.rs index 1785640..a8dc46a 100644 --- a/adenosine-pds/src/lib.rs +++ b/adenosine-pds/src/lib.rs @@ -1,23 +1,29 @@ use anyhow::Result; use log::{error, info}; use rouille::{router, Request, Response}; +use std::path::PathBuf; +use std::sync::Mutex; -use futures::TryStreamExt; use ipfs_sqlite_block_store::BlockStore; -use iroh_car::CarReader; -use libipld::block::Block; -use std::path::PathBuf; -use tokio::fs::File; -use tokio::io::BufReader; +mod car; +mod db; +mod models; mod mst; +pub use car::{load_car_to_blockstore, load_car_to_sqlite}; +pub use db::AtpDatabase; +pub use models::*; pub use mst::{dump_mst_keys, repro_mst}; -pub fn run_server(port: u16) -> Result<()> { - // TODO: log access requests +pub fn run_server(port: u16, blockstore_db_path: &PathBuf, atp_db_path: &PathBuf) -> Result<()> { // TODO: some static files? https://github.com/tomaka/rouille/blob/master/examples/static-files.rs + // TODO: could just open connection on every request? + let db = Mutex::new(AtpDatabase::open(atp_db_path)?); + let mut _blockstore: BlockStore = + BlockStore::open(blockstore_db_path, Default::default())?; + let log_ok = |req: &Request, _resp: &Response, elap: std::time::Duration| { info!("{} {} ({:?})", req.method(), req.raw_url(), elap); }; @@ -43,50 +49,15 @@ pub fn run_server(port: u16) -> Result<()> { Response::text("didn't get other thing") // TODO: parse and echo back JSON body }, + + (GET) ["/xrpc/com.atproto.getRecord"] => { + // TODO: JSON response + // TODO: handle error + let mut db = db.lock().unwrap().new_connection().unwrap(); + Response::text(db.get_record("asdf", "123", "blah").unwrap().to_string()) + }, _ => rouille::Response::empty_404() ) }) }); } - -pub fn load_car_to_sqlite(db_path: &PathBuf, car_path: &PathBuf) -> Result<()> { - let mut db: BlockStore = - { BlockStore::open(db_path, Default::default())? }; - - load_car_to_blockstore(&mut db, car_path) -} - -pub fn load_car_to_blockstore(db: &mut BlockStore, car_path: &PathBuf) -> Result<()> { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build()?; - rt.block_on(inner_car_loader(db, car_path)) -} - -// this async function is wrapped in the sync version above -async fn inner_car_loader(db: &mut BlockStore, car_path: &PathBuf) -> Result<()> { - println!("{} - {}", std::env::current_dir()?.display(), car_path.display()); - let car_reader = { - let file = File::open(car_path).await?; - let buf_reader = BufReader::new(file); - CarReader::new(buf_reader).await? - }; - let car_header = car_reader.header().clone(); - - car_reader - .stream() - .try_for_each(|(cid, raw)| { - // TODO: error handling here instead of unwrap (?) - let block = Block::new(cid, raw).unwrap(); - db.put_block(block, None).unwrap(); - futures::future::ready(Ok(())) - }) - .await?; - - // pin the header - if car_header.roots().len() >= 1 { - db.alias(b"import".to_vec(), Some(&car_header.roots()[0]))?; - } - - Ok(()) -} diff --git a/adenosine-pds/src/models.rs b/adenosine-pds/src/models.rs new file mode 100644 index 0000000..6f9bf81 --- /dev/null +++ b/adenosine-pds/src/models.rs @@ -0,0 +1,8 @@ +use serde; + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq)] +pub struct AtpSession { + pub jwt: String, + pub name: String, + pub did: String, +} diff --git a/adenosine-pds/src/mst.rs b/adenosine-pds/src/mst.rs index a2a394f..429d8c8 100644 --- a/adenosine-pds/src/mst.rs +++ b/adenosine-pds/src/mst.rs @@ -1,3 +1,4 @@ +use crate::load_car_to_blockstore; use anyhow::{anyhow, Result}; use ipfs_sqlite_block_store::BlockStore; use libipld::cbor::DagCborCodec; @@ -9,7 +10,6 @@ use libipld::{Cid, DagCbor}; use log::{debug, error, info}; use std::collections::BTreeMap; use std::path::PathBuf; -use crate::load_car_to_blockstore; #[derive(Debug, DagCbor, PartialEq, Eq)] struct CommitNode { @@ -85,8 +85,7 @@ fn print_mst_keys(db: &mut BlockStore, cid: &Cid) -> Res } pub fn dump_mst_keys(db_path: &PathBuf) -> Result<()> { - let mut db: BlockStore = - { BlockStore::open(db_path, Default::default())? }; + let mut db: BlockStore = BlockStore::open(db_path, Default::default())?; let all_aliases: Vec<(Vec, Cid)> = db.aliases()?; if all_aliases.is_empty() { @@ -287,11 +286,9 @@ fn serialize_wip_tree( } pub fn repro_mst(car_path: &PathBuf) -> Result<()> { - // open a temp block store - let mut db: BlockStore = { - BlockStore::open_path(ipfs_sqlite_block_store::DbPath::Memory, Default::default())? - }; + let mut db: BlockStore = + { BlockStore::open_path(ipfs_sqlite_block_store::DbPath::Memory, Default::default())? }; // load CAR contents from file load_car_to_blockstore(&mut db, car_path)?; diff --git a/adenosine-pds/tests/test_repro_mst.rs b/adenosine-pds/tests/test_repro_mst.rs index 4dde364..ad5fba1 100644 --- a/adenosine-pds/tests/test_repro_mst.rs +++ b/adenosine-pds/tests/test_repro_mst.rs @@ -1,4 +1,3 @@ - use adenosine_pds::repro_mst; use std::path::PathBuf; use std::str::FromStr; -- cgit v1.2.3