From 6c5c1e84b3540e3a4da81334f34b4e53cc818f4d Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Fri, 4 Nov 2022 00:49:49 -0700 Subject: pds: more progress --- adenosine-pds/src/bin/adenosine-pds.rs | 30 +++++++-- adenosine-pds/src/car.rs | 10 +-- adenosine-pds/src/db.rs | 60 ++++++----------- adenosine-pds/src/lib.rs | 118 +++++++++++++++++++++++++++------ adenosine-pds/src/models.rs | 27 ++++++++ adenosine-pds/src/mst.rs | 2 +- adenosine-pds/src/repo.rs | 20 +++++- 7 files changed, 195 insertions(+), 72 deletions(-) (limited to 'adenosine-pds') diff --git a/adenosine-pds/src/bin/adenosine-pds.rs b/adenosine-pds/src/bin/adenosine-pds.rs index ce36587..beb423a 100644 --- a/adenosine-pds/src/bin/adenosine-pds.rs +++ b/adenosine-pds/src/bin/adenosine-pds.rs @@ -43,8 +43,17 @@ struct Opt { enum Command { /// Start ATP server as a foreground process Serve { - #[structopt(long, default_value = "3030")] + /// Localhost port to listen on + #[structopt(long, default_value = "3030", env = "ATP_PDS_PORT")] port: u16, + + /// Secret key, encoded in hex. Use 'generate-secret' to create a new one + #[structopt( + long = "--pds-secret-key", + env = "ATP_PDS_SECRET_KEY", + hide_env_values = true + )] + pds_secret_key: String, }, /// Helper to import an IPLD CARv1 file in to sqlite data store @@ -53,11 +62,15 @@ enum Command { car_path: std::path::PathBuf, /// name of pointer to root of CAR DAG tree. Usually a DID + #[structopt(long, default_value = "last-import")] alias: String, }, /// Helper to print MST keys/docs from a sqlite repo Inspect, + + /// Generate a PDS secret key and print to stdout (as hex) + GenerateSecret, } fn main() -> Result<()> { @@ -82,14 +95,23 @@ fn main() -> Result<()> { debug!("config parsed, starting up"); match opt.cmd { - Command::Serve { port } => { + Command::Serve { + port, + pds_secret_key, + } => { // TODO: log some config stuff? - run_server(port, &opt.blockstore_db_path, &opt.atp_db_path) + let keypair = KeyPair::from_hex(&pds_secret_key)?; + run_server(port, &opt.blockstore_db_path, &opt.atp_db_path, keypair) } // TODO: handle alias Command::Import { car_path, alias } => { - load_car_to_sqlite(&opt.blockstore_db_path, &car_path) + load_car_to_sqlite(&opt.blockstore_db_path, &car_path, &alias) } Command::Inspect {} => mst::dump_mst_keys(&opt.blockstore_db_path), + Command::GenerateSecret {} => { + let keypair = KeyPair::new_random(); + println!("{}", keypair.to_hex()); + Ok(()) + } } } diff --git a/adenosine-pds/src/car.rs b/adenosine-pds/src/car.rs index 35cc3fd..63911e5 100644 --- a/adenosine-pds/src/car.rs +++ b/adenosine-pds/src/car.rs @@ -8,28 +8,30 @@ use std::path::PathBuf; use tokio::fs::File; use tokio::io::BufReader; -pub fn load_car_to_sqlite(db_path: &PathBuf, car_path: &PathBuf) -> Result<()> { +pub fn load_car_to_sqlite(db_path: &PathBuf, car_path: &PathBuf, alias: &str) -> Result<()> { let mut db: BlockStore = { BlockStore::open(db_path, Default::default())? }; - load_car_to_blockstore(&mut db, car_path)?; + load_car_to_blockstore(&mut db, car_path, alias)?; Ok(()) } pub fn load_car_to_blockstore( db: &mut BlockStore, car_path: &PathBuf, + alias: &str, ) -> Result { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build()?; - rt.block_on(inner_car_loader(db, car_path)) + rt.block_on(inner_car_loader(db, car_path, alias)) } // this async function is wrapped in the sync version above async fn inner_car_loader( db: &mut BlockStore, car_path: &PathBuf, + alias: &str, ) -> Result { println!( "{} - {}", @@ -55,7 +57,7 @@ async fn inner_car_loader( // pin the header (?) if car_header.roots().len() >= 1 { - db.alias(b"import".to_vec(), Some(&car_header.roots()[0]))?; + db.alias(alias.as_bytes(), Some(&car_header.roots()[0]))?; } Ok(car_header.roots()[0]) diff --git a/adenosine-pds/src/db.rs b/adenosine-pds/src/db.rs index 72f2a8d..17006cb 100644 --- a/adenosine-pds/src/db.rs +++ b/adenosine-pds/src/db.rs @@ -3,7 +3,7 @@ use crate::{AtpSession, KeyPair}; use anyhow::{anyhow, Result}; use lazy_static::lazy_static; use log::debug; -use rusqlite::{params, Connection}; +use rusqlite::{params, Connection, OptionalExtension}; use rusqlite_migration::{Migrations, M}; use serde_json::Value; use std::path::PathBuf; @@ -60,41 +60,6 @@ impl AtpDatabase { Ok(AtpDatabase { conn }) } - pub fn get_record(&mut self, did: &str, collection: &str, tid: &str) -> Result { - let mut stmt = self.conn.prepare_cached( - "SELECT record_json FROM record WHERE did = ?1 AND collection = ?2 AND tid = ?3", - )?; - Ok(stmt.query_row(params!(did, collection, tid), |row| { - row.get(0).map(|v: String| Value::from_str(&v)) - })??) - } - - pub fn get_record_list(&mut self, did: &str, collection: &str) -> Result> { - let mut stmt = self - .conn - .prepare_cached("SELECT tid FROM record WHERE did = ?1 AND collection = ?2")?; - let ret = stmt - .query_and_then(params!(did, collection), |row| { - let v: String = row.get(0)?; - Ok(v) - })? - .collect(); - ret - } - - pub fn get_collection_list(&mut self, did: &str) -> Result> { - let mut stmt = self - .conn - .prepare_cached("SELECT collection FROM record WHERE did = ?1 GROUP BY collection")?; - let ret = stmt - .query_and_then(params!(did), |row| { - let v: String = row.get(0)?; - Ok(v) - })? - .collect(); - ret - } - /// Quick check if an account already exists for given username or email pub fn account_exists(&mut self, username: &str, email: &str) -> Result { let mut stmt = self @@ -155,13 +120,21 @@ impl AtpDatabase { }) } - /// Returns the DID that a token is valid for - pub fn check_auth_token(&mut self, jwt: &str) -> Result { + /// Returns the DID that a token is valid for, or None if session not found + pub fn check_auth_token(&mut self, jwt: &str) -> Result> { let mut stmt = self .conn .prepare_cached("SELECT did FROM session WHERE jwt = $1")?; - let did = stmt.query_row(params!(jwt), |row| row.get(0))?; - Ok(did) + let did_maybe = stmt.query_row(params!(jwt), |row| row.get(0)).optional()?; + Ok(did_maybe) + } + + pub fn delete_session(&mut self, jwt: &str) -> Result { + let mut stmt = self + .conn + .prepare_cached("DELETE FROM session WHERE jwt = $1")?; + let count = stmt.execute(params!(jwt))?; + Ok(count >= 1) } pub fn put_did_doc(&mut self, did: &str, did_doc: &Value) -> Result<()> { @@ -171,4 +144,11 @@ impl AtpDatabase { stmt.execute(params!(did, did_doc.to_string()))?; Ok(()) } + pub fn get_did_doc(&mut self, did: &str) -> Result { + let mut stmt = self + .conn + .prepare_cached("SELECT doc_json FROM did_doc WHERE did = $1")?; + let doc_json: String = stmt.query_row(params!(did), |row| row.get(0))?; + Ok(Value::from_str(&doc_json)?) + } } diff --git a/adenosine-pds/src/lib.rs b/adenosine-pds/src/lib.rs index 4ea2f7b..0d73881 100644 --- a/adenosine-pds/src/lib.rs +++ b/adenosine-pds/src/lib.rs @@ -1,6 +1,6 @@ use anyhow::{anyhow, Result}; use libipld::Ipld; -use log::{error, info, warn}; +use log::{debug, error, info, warn}; use rouille::{router, Request, Response}; use serde_json::{json, Value}; use std::fmt; @@ -23,16 +23,6 @@ pub use models::*; pub use repo::{RepoCommit, RepoStore}; pub use ucan_p256::P256KeyMaterial; -#[allow(non_snake_case)] -#[derive(Debug, serde::Deserialize, serde::Serialize, PartialEq, Eq)] -struct AccountRequest { - email: String, - username: String, - password: String, - inviteCode: Option, - recoveryKey: Option, -} - struct AtpService { pub repo: RepoStore, pub atp_db: AtpDatabase, @@ -77,14 +67,16 @@ fn xrpc_wrap(resp: Result) -> Response { } } -pub fn run_server(port: u16, blockstore_db_path: &PathBuf, atp_db_path: &PathBuf) -> Result<()> { - // TODO: some static files? https://github.com/tomaka/rouille/blob/master/examples/static-files.rs - +pub fn run_server( + port: u16, + blockstore_db_path: &PathBuf, + atp_db_path: &PathBuf, + keypair: KeyPair, +) -> Result<()> { let srv = Mutex::new(AtpService { repo: RepoStore::open(blockstore_db_path)?, atp_db: AtpDatabase::open(atp_db_path)?, - // XXX: reuse a keypair - pds_keypair: KeyPair::new_random(), + pds_keypair: keypair, pds_public_url: format!("http://localhost:{}", port).to_string(), }); @@ -99,6 +91,9 @@ pub fn run_server(port: u16, blockstore_db_path: &PathBuf, atp_db_path: &PathBuf elap ); }; + + // TODO: robots.txt + // TODO: some static files? https://github.com/tomaka/rouille/blob/master/examples/static-files.rs rouille::start_server(format!("localhost:{}", port), move |request| { rouille::log_custom(request, log_ok, log_err, || { router!(request, @@ -142,6 +137,21 @@ fn xrpc_required_param(request: &Request, key: &str) -> Result { )))?) } +/// Returns DID of validated user +fn xrpc_check_auth_header(srv: &mut AtpService, request: &Request) -> Result { + let header = request + .header("Authorization") + .ok_or(XrpcError::Forbidden(format!("require auth header")))?; + if !header.starts_with("Bearer ") { + Err(XrpcError::Forbidden(format!("require bearer token")))?; + } + let jwt = header.split(" ").nth(1).unwrap(); + match srv.atp_db.check_auth_token(&jwt)? { + Some(did) => Ok(did), + None => Err(XrpcError::Forbidden(format!("session token not found")))?, + } +} + fn xrpc_get_handler( srv: &Mutex, method: &str, @@ -175,6 +185,45 @@ fn xrpc_get_handler( .map(|v| json!({ "root": v })) .ok_or(XrpcError::NotFound(format!("no repository found for DID: {}", did)).into()) } + "com.atproto.repoListRecords" => { + // TODO: limit, before, after, tid, reverse + // TODO: handle non-DID 'user' + // TODO: validate 'collection' as an NSID + // TODO: limit result set size + let did = xrpc_required_param(request, "user")?; + let collection = xrpc_required_param(request, "collection")?; + let mut record_list: Vec = vec![]; + let mut srv = srv.lock().expect("service mutex"); + let full_map = srv.repo.mst_to_map(&did)?; + let prefix = format!("/{}/", collection); + for (mst_key, cid) in full_map.iter() { + if mst_key.starts_with(&prefix) { + let record = srv.repo.get_ipld(cid)?; + record_list.push(json!({ + "uri": format!("at://{}{}", did, mst_key), + "cid": cid, + "value": ipld_into_json_value(record), + })); + } + } + Ok(json!({ "records": record_list })) + } + "com.atproto.repoDescribe" => { + let did = xrpc_required_param(request, "user")?; + // TODO: resolve username? + let username = did.clone(); + let mut srv = srv.lock().expect("service mutex"); + let did_doc = srv.atp_db.get_did_doc(&did)?; + let collections: Vec = srv.repo.collections(&did)?; + let desc = RepoDescribe { + name: username, + did: did, + didDoc: did_doc, + collections: collections, + nameIsCorrect: true, + }; + Ok(json!(desc)) + } _ => Err(anyhow!(XrpcError::NotFound(format!( "XRPC endpoint handler not found: {}", method @@ -186,12 +235,9 @@ fn xrpc_post_handler( srv: &Mutex, method: &str, request: &Request, -) -> Result { +) -> Result { match method { "com.atproto.createAccount" => { - // TODO: generate did:plc, and insert an empty record/pointer to repo - info!("creating new account"); - // validate account request let req: AccountRequest = rouille::input::json_input(request) .map_err(|e| XrpcError::BadRequest(format!("failed to parse JSON body: {}", e)))?; @@ -205,6 +251,8 @@ fn xrpc_post_handler( )))?; }; + debug!("trying to create new account: {}", &req.username); + // generate DID let create_op = did::CreateOp::new( req.username.clone(), @@ -241,7 +289,35 @@ fn xrpc_post_handler( let sess = srv .atp_db .create_session(&req.username, &req.password, &keypair)?; - Ok(sess) + Ok(json!(sess)) + } + "com.atproto.createSession" => { + let req: AccountRequest = rouille::input::json_input(request) + .map_err(|e| XrpcError::BadRequest(format!("failed to parse JSON body: {}", e)))?; + let mut srv = srv.lock().unwrap(); + let keypair = srv.pds_keypair.clone(); + Ok(json!(srv.atp_db.create_session( + &req.username, + &req.password, + &keypair + )?)) + } + "com.atproto.deleteSession" => { + let mut srv = srv.lock().unwrap(); + let _did = xrpc_check_auth_header(&mut srv, request)?; + let header = request + .header("Authorization") + .ok_or(XrpcError::Forbidden(format!("require auth header")))?; + if !header.starts_with("Bearer ") { + Err(XrpcError::Forbidden(format!("require bearer token")))?; + } + let jwt = header.split(" ").nth(1).expect("JWT in header"); + if !srv.atp_db.delete_session(&jwt)? { + Err(anyhow!( + "session token not found, even after using for auth" + ))? + }; + Ok(json!({})) } _ => Err(anyhow!(XrpcError::NotFound(format!( "XRPC endpoint handler not found: {}", diff --git a/adenosine-pds/src/models.rs b/adenosine-pds/src/models.rs index 8f855c5..9da6104 100644 --- a/adenosine-pds/src/models.rs +++ b/adenosine-pds/src/models.rs @@ -1,5 +1,22 @@ use serde; +#[allow(non_snake_case)] +#[derive(Debug, serde::Deserialize, serde::Serialize, PartialEq, Eq)] +pub struct AccountRequest { + pub email: String, + pub username: String, + pub password: String, + pub inviteCode: Option, + pub recoveryKey: Option, +} + +#[allow(non_snake_case)] +#[derive(Debug, serde::Deserialize, serde::Serialize, PartialEq, Eq)] +pub struct SessionRequest { + pub username: String, + pub password: String, +} + #[allow(non_snake_case)] #[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq)] pub struct AtpSession { @@ -8,3 +25,13 @@ pub struct AtpSession { pub accessJwt: String, pub refreshJwt: String, } + +#[allow(non_snake_case)] +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq)] +pub struct RepoDescribe { + pub name: String, + pub did: String, + pub didDoc: serde_json::Value, + pub collections: Vec, + pub nameIsCorrect: bool, +} diff --git a/adenosine-pds/src/mst.rs b/adenosine-pds/src/mst.rs index b71cc73..94e5f68 100644 --- a/adenosine-pds/src/mst.rs +++ b/adenosine-pds/src/mst.rs @@ -304,7 +304,7 @@ pub fn repro_mst(car_path: &PathBuf) -> Result<()> { { BlockStore::open_path(ipfs_sqlite_block_store::DbPath::Memory, Default::default())? }; // load CAR contents from file - load_car_to_blockstore(&mut db, car_path)?; + load_car_to_blockstore(&mut db, car_path, "repro-import")?; let all_aliases: Vec<(Vec, Cid)> = db.aliases()?; if all_aliases.is_empty() { diff --git a/adenosine-pds/src/repo.rs b/adenosine-pds/src/repo.rs index 5351379..8c14a18 100644 --- a/adenosine-pds/src/repo.rs +++ b/adenosine-pds/src/repo.rs @@ -9,6 +9,7 @@ use libipld::store::DefaultParams; use libipld::{Block, Cid, Ipld}; use std::borrow::Cow; use std::collections::BTreeMap; +use std::collections::HashSet; use std::path::PathBuf; use std::str::FromStr; @@ -141,6 +142,21 @@ impl RepoStore { } } + pub fn collections(&mut self, did: &str) -> Result> { + let commit = if let Some(c) = self.lookup_commit(did)? { + self.get_commit(&c)? + } else { + return Err(anyhow!("DID not found in repositories: {}", did)); + }; + let map = self.mst_to_map(&commit.mst_cid)?; + let mut collections: HashSet = Default::default(); + for k in map.keys() { + let coll = k.split("/").nth(0).unwrap(); + collections.insert(coll.to_string()); + } + Ok(collections.into_iter().collect()) + } + pub fn get_atp_record( &mut self, did: &str, @@ -217,8 +233,8 @@ impl RepoStore { } /// returns the root commit from CAR file - pub fn load_car(&mut self, car_path: &PathBuf) -> Result { - let cid = load_car_to_blockstore(&mut self.db, car_path)?; + pub fn load_car(&mut self, car_path: &PathBuf, alias: &str) -> Result { + let cid = load_car_to_blockstore(&mut self.db, car_path, alias)?; Ok(cid.to_string()) } -- cgit v1.2.3