diff options
author | Bryan Newbold <bnewbold@robocracy.org> | 2022-11-01 11:28:46 -0700 |
---|---|---|
committer | Bryan Newbold <bnewbold@robocracy.org> | 2022-11-01 11:28:46 -0700 |
commit | eec74745e4c4af7d744509d66cb9b56481c471d3 (patch) | |
tree | 67f1f0827c2a447ea0e8f3678b6fbe465239e786 /adenosine-pds/src | |
parent | a03eb78a0c10625baefb7fe4d5b7d00cf5403f94 (diff) | |
download | adenosine-eec74745e4c4af7d744509d66cb9b56481c471d3.tar.gz adenosine-eec74745e4c4af7d744509d66cb9b56481c471d3.zip |
pds: more skeleton progress
Diffstat (limited to 'adenosine-pds/src')
-rw-r--r-- | adenosine-pds/src/atp_db.sql | 50 | ||||
-rw-r--r-- | adenosine-pds/src/bin/adenosine-pds.rs | 6 | ||||
-rw-r--r-- | adenosine-pds/src/car.rs | 61 | ||||
-rw-r--r-- | adenosine-pds/src/db.rs | 137 | ||||
-rw-r--r-- | adenosine-pds/src/lib.rs | 71 | ||||
-rw-r--r-- | adenosine-pds/src/models.rs | 8 | ||||
-rw-r--r-- | adenosine-pds/src/mst.rs | 11 |
7 files changed, 283 insertions, 61 deletions
diff --git a/adenosine-pds/src/atp_db.sql b/adenosine-pds/src/atp_db.sql new file mode 100644 index 0000000..918a89c --- /dev/null +++ b/adenosine-pds/src/atp_db.sql @@ -0,0 +1,50 @@ + +----------- atproto system tables + +CREATE TABLE account( + did TEXT PRIMARY KEY NOT NULL, + username TEXT NOT NULL, + email TEXT NOT NULL, + password_bcrypt TEXT NOT NULL, + signing_key TEXT NOT NULL, +); +CREATE UNIQUE INDEX account_username_uniq_idx on account(lower(username)); +CREATE UNIQUE INDEX account_email_uniq_idx on account(lower(email)); + +CREATE TABLE did_doc( + did TEXT PRIMARY KEY NOT NULL, + doc_json TEXT NOT NULL, + seen_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, +); + +CREATE TABLE session( + did TEXT NOT NULL, + jwt TEXT NOT NULL, + created_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL, + PRIMARY KEY(did, jwt) +); + +CREATE TABLE repo( + did TEXT PRIMARY KEY NOT NULL, + head_commit TEXT NOT NULL, +); + +CREATE TABLE record( + did TEXT NOT NULL, + collection TEXT NOT NULL, + tid TEXT NOT NULL, + record_cid TEXT NOT NULL, + record_json TEXT NOT NULL, + PRIMARY KEY(did, collection, tid) +); + +CREATE TABLE password_reset( + did TEXT NOT NULL, + token TEXT NOT NULL, + PRIMARY KEY(did, token) +); + + +----------- bsky app/index tables + + diff --git a/adenosine-pds/src/bin/adenosine-pds.rs b/adenosine-pds/src/bin/adenosine-pds.rs index 44c4cef..b76d015 100644 --- a/adenosine-pds/src/bin/adenosine-pds.rs +++ b/adenosine-pds/src/bin/adenosine-pds.rs @@ -80,11 +80,9 @@ fn main() -> Result<()> { match opt.cmd { Command::Serve { port } => { // TODO: log some config stuff? - run_server(port) - } - Command::Import { car_path } => { - load_car_to_sqlite(&opt.blockstore_db_path, &car_path) + run_server(port, &opt.blockstore_db_path, &opt.atp_db_path) } + Command::Import { car_path } => load_car_to_sqlite(&opt.blockstore_db_path, &car_path), Command::Inspect {} => dump_mst_keys(&opt.blockstore_db_path), } } diff --git a/adenosine-pds/src/car.rs b/adenosine-pds/src/car.rs new file mode 100644 index 0000000..5731848 --- /dev/null +++ b/adenosine-pds/src/car.rs @@ -0,0 +1,61 @@ +use anyhow::Result; + +use futures::TryStreamExt; +use ipfs_sqlite_block_store::BlockStore; +use iroh_car::CarReader; +use libipld::block::Block; +use std::path::PathBuf; +use tokio::fs::File; +use tokio::io::BufReader; + +pub fn load_car_to_sqlite(db_path: &PathBuf, car_path: &PathBuf) -> Result<()> { + let mut db: BlockStore<libipld::DefaultParams> = + { BlockStore::open(db_path, Default::default())? }; + + load_car_to_blockstore(&mut db, car_path) +} + +pub fn load_car_to_blockstore( + db: &mut BlockStore<libipld::DefaultParams>, + car_path: &PathBuf, +) -> Result<()> { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()?; + rt.block_on(inner_car_loader(db, car_path)) +} + +// this async function is wrapped in the sync version above +async fn inner_car_loader( + db: &mut BlockStore<libipld::DefaultParams>, + car_path: &PathBuf, +) -> Result<()> { + println!( + "{} - {}", + std::env::current_dir()?.display(), + car_path.display() + ); + let car_reader = { + let file = File::open(car_path).await?; + let buf_reader = BufReader::new(file); + CarReader::new(buf_reader).await? + }; + let car_header = car_reader.header().clone(); + + car_reader + .stream() + .try_for_each(|(cid, raw)| { + // TODO: error handling here instead of unwrap (?) + let block = Block::new(cid, raw).unwrap(); + db.put_block(block, None).unwrap(); + futures::future::ready(Ok(())) + }) + .await?; + + // pin the header + if car_header.roots().len() >= 1 { + db.alias(b"import".to_vec(), Some(&car_header.roots()[0]))?; + } + + Ok(()) +} diff --git a/adenosine-pds/src/db.rs b/adenosine-pds/src/db.rs new file mode 100644 index 0000000..1d6d4ed --- /dev/null +++ b/adenosine-pds/src/db.rs @@ -0,0 +1,137 @@ +use crate::AtpSession; +/// ATP database (as distinct from blockstore) +use anyhow::{anyhow, Result}; +use lazy_static::lazy_static; +use rusqlite::{params, Connection}; +use rusqlite_migration::{Migrations, M}; +use serde_json::Value; +use std::path::PathBuf; +use std::str::FromStr; + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn migrations_test() { + assert!(MIGRATIONS.validate().is_ok()); + } +} + +lazy_static! { + static ref MIGRATIONS: Migrations<'static> = + Migrations::new(vec![M::up(include_str!("atp_db.sql")),]); +} + +#[derive(Debug)] +pub struct AtpDatabase { + conn: Connection, +} + +impl AtpDatabase { + pub fn open(path: &PathBuf) -> Result<Self> { + let mut conn = Connection::open(path)?; + MIGRATIONS.to_latest(&mut conn)?; + // any pragma would happen here + Ok(AtpDatabase { conn }) + } + + /// temporary database, eg for tests. + /// + /// TODO: should create a tmp file on ramdisk (/var/tmp?) instead of opening an in-memory + /// database. in-memory database can't be used with multiple connections + pub fn open_ephemeral() -> Result<Self> { + let mut conn = Connection::open_in_memory()?; + MIGRATIONS.to_latest(&mut conn)?; + // any pragma would happen here + Ok(AtpDatabase { conn }) + } + + /// Creates an entirely new connection to the same database + /// + /// Skips re-running migrations. + /// + /// Fails for ephemeral databases. + pub fn new_connection(&self) -> Result<Self> { + // TODO: let path = std::path::PathBuf::from(self.conn.path().ok_or(Err(anyhow!("expected real database")))?); + let path = std::path::PathBuf::from(self.conn.path().expect("expected real database")); + let conn = Connection::open(path)?; + Ok(AtpDatabase { conn }) + } + + pub fn get_record(&mut self, did: &str, collection: &str, tid: &str) -> Result<Value> { + let mut stmt = self.conn.prepare_cached( + "SELECT record_json FROM record WHERE did = ?1 collection = ?2 tid = ?3", + )?; + Ok(stmt.query_row(params!(did, collection, tid), |row| { + row.get(0).map(|v: String| Value::from_str(&v)) + })??) + } + + pub fn get_record_list(&mut self, did: &str, collection: &str) -> Result<Vec<String>> { + let mut stmt = self + .conn + .prepare_cached("SELECT tid FROM record WHERE did = ?1 collection = ?2")?; + let ret = stmt + .query_and_then(params!(did, collection), |row| { + let v: String = row.get(0)?; + Ok(v) + })? + .collect(); + ret + } + + pub fn get_collection_list(&mut self, did: &str) -> Result<Vec<String>> { + let mut stmt = self + .conn + .prepare_cached("SELECT collection FROM record WHERE did = ?1 GROUP BY collection")?; + let ret = stmt + .query_and_then(params!(did), |row| { + let v: String = row.get(0)?; + Ok(v) + })? + .collect(); + ret + } + + pub fn create_account( + &mut self, + username: &str, + password: &str, + email: &str, + ) -> Result<AtpSession> { + // TODO: validate email (regex?) + // TODO: validate username + // TODO: generate and store signing key + // TODO: generate plc did (randomly for now?) + // TODO: insert did_doc + // TODO: also need to initialize repo with... profile? + { + let password_bcrypt = bcrypt::hash(password, bcrypt::DEFAULT_COST)?; + let signing_key = "key:TODO"; + let did = "did:TODO"; + let mut stmt = self + .conn + .prepare_cached("INSERT INTO account (username, password_bcrypt, email, did, signing_key) VALUES (?1, ?2, ?3, ?4, ?5)")?; + stmt.execute(params!(username, password_bcrypt, email, did, signing_key))?; + } + self.create_session(username, password) + } + + pub fn create_session(&mut self, username: &str, password: &str) -> Result<AtpSession> { + let mut stmt = self + .conn + .prepare_cached("SELECT password_bcrypt FROM account WHERE username = ?1")?; + let password_bcrypt: String = stmt.query_row(params!(username), |row| row.get(0))?; + if !bcrypt::verify(password, &password_bcrypt)? { + return Err(anyhow!("password did not match")); + } + // TODO: generate JWT + // TODO: insert session wtih JWT + Ok(AtpSession { + name: username.to_string(), + did: "did:TODO".to_string(), + jwt: "jwt:TODO".to_string(), + }) + } +} diff --git a/adenosine-pds/src/lib.rs b/adenosine-pds/src/lib.rs index 1785640..a8dc46a 100644 --- a/adenosine-pds/src/lib.rs +++ b/adenosine-pds/src/lib.rs @@ -1,23 +1,29 @@ use anyhow::Result; use log::{error, info}; use rouille::{router, Request, Response}; +use std::path::PathBuf; +use std::sync::Mutex; -use futures::TryStreamExt; use ipfs_sqlite_block_store::BlockStore; -use iroh_car::CarReader; -use libipld::block::Block; -use std::path::PathBuf; -use tokio::fs::File; -use tokio::io::BufReader; +mod car; +mod db; +mod models; mod mst; +pub use car::{load_car_to_blockstore, load_car_to_sqlite}; +pub use db::AtpDatabase; +pub use models::*; pub use mst::{dump_mst_keys, repro_mst}; -pub fn run_server(port: u16) -> Result<()> { - // TODO: log access requests +pub fn run_server(port: u16, blockstore_db_path: &PathBuf, atp_db_path: &PathBuf) -> Result<()> { // TODO: some static files? https://github.com/tomaka/rouille/blob/master/examples/static-files.rs + // TODO: could just open connection on every request? + let db = Mutex::new(AtpDatabase::open(atp_db_path)?); + let mut _blockstore: BlockStore<libipld::DefaultParams> = + BlockStore::open(blockstore_db_path, Default::default())?; + let log_ok = |req: &Request, _resp: &Response, elap: std::time::Duration| { info!("{} {} ({:?})", req.method(), req.raw_url(), elap); }; @@ -43,50 +49,15 @@ pub fn run_server(port: u16) -> Result<()> { Response::text("didn't get other thing") // TODO: parse and echo back JSON body }, + + (GET) ["/xrpc/com.atproto.getRecord"] => { + // TODO: JSON response + // TODO: handle error + let mut db = db.lock().unwrap().new_connection().unwrap(); + Response::text(db.get_record("asdf", "123", "blah").unwrap().to_string()) + }, _ => rouille::Response::empty_404() ) }) }); } - -pub fn load_car_to_sqlite(db_path: &PathBuf, car_path: &PathBuf) -> Result<()> { - let mut db: BlockStore<libipld::DefaultParams> = - { BlockStore::open(db_path, Default::default())? }; - - load_car_to_blockstore(&mut db, car_path) -} - -pub fn load_car_to_blockstore(db: &mut BlockStore<libipld::DefaultParams>, car_path: &PathBuf) -> Result<()> { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build()?; - rt.block_on(inner_car_loader(db, car_path)) -} - -// this async function is wrapped in the sync version above -async fn inner_car_loader(db: &mut BlockStore<libipld::DefaultParams>, car_path: &PathBuf) -> Result<()> { - println!("{} - {}", std::env::current_dir()?.display(), car_path.display()); - let car_reader = { - let file = File::open(car_path).await?; - let buf_reader = BufReader::new(file); - CarReader::new(buf_reader).await? - }; - let car_header = car_reader.header().clone(); - - car_reader - .stream() - .try_for_each(|(cid, raw)| { - // TODO: error handling here instead of unwrap (?) - let block = Block::new(cid, raw).unwrap(); - db.put_block(block, None).unwrap(); - futures::future::ready(Ok(())) - }) - .await?; - - // pin the header - if car_header.roots().len() >= 1 { - db.alias(b"import".to_vec(), Some(&car_header.roots()[0]))?; - } - - Ok(()) -} diff --git a/adenosine-pds/src/models.rs b/adenosine-pds/src/models.rs new file mode 100644 index 0000000..6f9bf81 --- /dev/null +++ b/adenosine-pds/src/models.rs @@ -0,0 +1,8 @@ +use serde; + +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq)] +pub struct AtpSession { + pub jwt: String, + pub name: String, + pub did: String, +} diff --git a/adenosine-pds/src/mst.rs b/adenosine-pds/src/mst.rs index a2a394f..429d8c8 100644 --- a/adenosine-pds/src/mst.rs +++ b/adenosine-pds/src/mst.rs @@ -1,3 +1,4 @@ +use crate::load_car_to_blockstore; use anyhow::{anyhow, Result}; use ipfs_sqlite_block_store::BlockStore; use libipld::cbor::DagCborCodec; @@ -9,7 +10,6 @@ use libipld::{Cid, DagCbor}; use log::{debug, error, info}; use std::collections::BTreeMap; use std::path::PathBuf; -use crate::load_car_to_blockstore; #[derive(Debug, DagCbor, PartialEq, Eq)] struct CommitNode { @@ -85,8 +85,7 @@ fn print_mst_keys(db: &mut BlockStore<libipld::DefaultParams>, cid: &Cid) -> Res } pub fn dump_mst_keys(db_path: &PathBuf) -> Result<()> { - let mut db: BlockStore<libipld::DefaultParams> = - { BlockStore::open(db_path, Default::default())? }; + let mut db: BlockStore<libipld::DefaultParams> = BlockStore::open(db_path, Default::default())?; let all_aliases: Vec<(Vec<u8>, Cid)> = db.aliases()?; if all_aliases.is_empty() { @@ -287,11 +286,9 @@ fn serialize_wip_tree( } pub fn repro_mst(car_path: &PathBuf) -> Result<()> { - // open a temp block store - let mut db: BlockStore<libipld::DefaultParams> = { - BlockStore::open_path(ipfs_sqlite_block_store::DbPath::Memory, Default::default())? - }; + let mut db: BlockStore<libipld::DefaultParams> = + { BlockStore::open_path(ipfs_sqlite_block_store::DbPath::Memory, Default::default())? }; // load CAR contents from file load_car_to_blockstore(&mut db, car_path)?; |