diff options
-rw-r--r-- | adenosine-pds/src/atp_db.sql | 65 | ||||
-rw-r--r-- | adenosine-pds/src/db.rs | 68 | ||||
-rw-r--r-- | adenosine-pds/src/did.rs | 10 | ||||
-rw-r--r-- | adenosine-pds/src/lib.rs | 95 | ||||
-rw-r--r-- | adenosine-pds/src/models.rs | 5 |
5 files changed, 182 insertions, 61 deletions
diff --git a/adenosine-pds/src/atp_db.sql b/adenosine-pds/src/atp_db.sql index a6fce1c..259ab9d 100644 --- a/adenosine-pds/src/atp_db.sql +++ b/adenosine-pds/src/atp_db.sql @@ -1,20 +1,22 @@ ------------ atproto system tables +----------- atproto PDS system tables CREATE TABLE account( did TEXT PRIMARY KEY NOT NULL, username TEXT NOT NULL, email TEXT NOT NULL, password_bcrypt TEXT NOT NULL, - signing_key TEXT NOT NULL + recovery_pubkey TEXT NOT NULL, + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT ( DATETIME('now') ) ); CREATE UNIQUE INDEX account_username_uniq_idx on account(lower(username)); CREATE UNIQUE INDEX account_email_uniq_idx on account(lower(email)); CREATE TABLE did_doc( did TEXT PRIMARY KEY NOT NULL, + -- TODO: username TEXT NOT NULL, doc_json TEXT NOT NULL, - seen_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT ( DATETIME('now') ) + indexed_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT ( DATETIME('now') ) ); CREATE TABLE session( @@ -24,26 +26,57 @@ CREATE TABLE session( PRIMARY KEY(did, jwt) ); -CREATE TABLE repo( - did TEXT PRIMARY KEY NOT NULL, - head_commit TEXT NOT NULL -); +----------- bsky app/index tables -CREATE TABLE record( +CREATE TABLE bsky_post( did TEXT NOT NULL, - collection TEXT NOT NULL, tid TEXT NOT NULL, - record_cid TEXT NOT NULL, + cid TEXT NOT NULL, record_json TEXT NOT NULL, - PRIMARY KEY(did, collection, tid) + reply_root_uri TEXT, + created_at TIMESTAMP WITH TIME ZONE NOT NULL, + indexed_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT ( DATETIME('now') ), + PRIMARY KEY(did, tid) ); +CREATE INDEX bsky_post_reply_root_uri_idx on bsky_post(reply_root_uri); -CREATE TABLE password_reset( +CREATE TABLE bsky_repost( did TEXT NOT NULL, - token TEXT NOT NULL, - PRIMARY KEY(did, token) + subject_uri TEXT NOT NULL, + cid TEXT NOT NULL, + created_at TIMESTAMP WITH TIME ZONE NOT NULL, + indexed_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT ( DATETIME('now') ), + PRIMARY KEY(did, subject_uri) ); +CREATE INDEX bsky_repost_subject_uri_idx on bsky_repost(subject_uri); ------------ bsky app/index tables +CREATE TABLE bsky_like( + did TEXT NOT NULL, + subject_uri TEXT NOT NULL, + cid TEXT NOT NULL, + created_at TIMESTAMP WITH TIME ZONE NOT NULL, + indexed_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT ( DATETIME('now') ), + PRIMARY KEY(did, subject_uri) +); +CREATE INDEX bsky_like_subject_uri_idx on bsky_like(subject_uri); --- TODO +CREATE TABLE bsky_follow( + did TEXT NOT NULL, + subject_did TEXT NOT NULL, + cid TEXT NOT NULL, + created_at TIMESTAMP WITH TIME ZONE NOT NULL, + indexed_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT ( DATETIME('now') ), + PRIMARY KEY(did, subject_did) +); +CREATE INDEX bsky_follow_subject_did_idx on bsky_follow(subject_did); + +-- TODO: notifications +CREATE TABLE bsky_notification( + pk INTEGER PRIMARY KEY AUTOINCREMENT, + user_did TEXT NOT NULL, + subject_uri TEXT NOT NULL, + subject_cid TEXT NOT NULL, + reason TEXT NOT NULL, + seen_at TIMESTAMP WITH TIME ZONE, + indexed_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT ( DATETIME('now') ), +); diff --git a/adenosine-pds/src/db.rs b/adenosine-pds/src/db.rs index 35c798d..03f6c68 100644 --- a/adenosine-pds/src/db.rs +++ b/adenosine-pds/src/db.rs @@ -95,45 +95,67 @@ impl AtpDatabase { ret } + /// Quick check if an account already exists for given username or email + pub fn account_exists(&mut self, username: &str, email: &str) -> Result<bool> { + let mut stmt = self + .conn + .prepare_cached("SELECT COUNT(*) FROM account WHERE username = $1 OR email = $2")?; + let count: i32 = stmt.query_row(params!(username, email), |row| row.get(0))?; + Ok(count > 0) + } + pub fn create_account( &mut self, + did: &str, username: &str, password: &str, email: &str, - ) -> Result<AtpSession> { - // TODO: validate email (regex?) - // TODO: validate username - // TODO: generate and store signing key - // TODO: generate plc did (randomly for now?) - // TODO: insert did_doc - // TODO: also need to initialize repo with... profile? - { - debug!("bcrypt hashing password (can be slow)..."); - let password_bcrypt = bcrypt::hash(password, bcrypt::DEFAULT_COST)?; - let signing_key = "key:TODO"; - let did = "did:TODO"; - let mut stmt = self - .conn - .prepare_cached("INSERT INTO account (username, password_bcrypt, email, did, signing_key) VALUES (?1, ?2, ?3, ?4, ?5)")?; - stmt.execute(params!(username, password_bcrypt, email, did, signing_key))?; - } - self.create_session(username, password) + ) -> Result<()> { + debug!("bcrypt hashing password (can be slow)..."); + let password_bcrypt = bcrypt::hash(password, bcrypt::DEFAULT_COST)?; + let did = "did:TODO"; + let mut stmt = self.conn.prepare_cached( + "INSERT INTO account (username, password_bcrypt, email, did) VALUES (?1, ?2, ?3, ?4)", + )?; + stmt.execute(params!(username, password_bcrypt, email, did))?; + Ok(()) } + /// Returns a JWT session token pub fn create_session(&mut self, username: &str, password: &str) -> Result<AtpSession> { let mut stmt = self .conn - .prepare_cached("SELECT password_bcrypt FROM account WHERE username = ?1")?; - let password_bcrypt: String = stmt.query_row(params!(username), |row| row.get(0))?; + .prepare_cached("SELECT did, password_bcrypt FROM account WHERE username = ?1")?; + let (did, password_bcrypt): (String, String) = + stmt.query_row(params!(username), |row| Ok((row.get(0)?, row.get(1)?)))?; if !bcrypt::verify(password, &password_bcrypt)? { return Err(anyhow!("password did not match")); } // TODO: generate JWT - // TODO: insert session wtih JWT + // TODO: insert session with JWT + let jwt = "jwt:BOGUS"; Ok(AtpSession { + did, name: username.to_string(), - did: "did:TODO".to_string(), - jwt: "jwt:TODO".to_string(), + accessJwt: jwt.to_string(), + refreshJwt: jwt.to_string(), }) } + + /// Returns the DID that a token is valid for + pub fn check_auth_token(&mut self, jwt: &str) -> Result<String> { + let mut stmt = self + .conn + .prepare_cached("SELECT did FROM session WHERE jwt = $1")?; + let did = stmt.query_row(params!(jwt), |row| row.get(0))?; + Ok(did) + } + + pub fn put_did_doc(&mut self, did: &str, did_doc: &Value) -> Result<()> { + let mut stmt = self + .conn + .prepare_cached("INSERT INTO did_doc (did, doc_json) VALUES (?1, ?2)")?; + stmt.execute(params!(did, did_doc.to_string()))?; + Ok(()) + } } diff --git a/adenosine-pds/src/did.rs b/adenosine-pds/src/did.rs index f33f512..109a717 100644 --- a/adenosine-pds/src/did.rs +++ b/adenosine-pds/src/did.rs @@ -12,7 +12,7 @@ use serde_json::json; #[allow(non_snake_case)] #[derive(Debug, DagCbor, PartialEq, Eq, Clone)] -struct CreateOp { +pub struct CreateOp { #[ipld(rename = "type")] pub op_type: String, pub signingKey: String, @@ -50,7 +50,7 @@ impl UnsignedCreateOp { } impl CreateOp { - fn new( + pub fn new( username: String, atp_pds: String, keypair: &KeyPair, @@ -72,7 +72,7 @@ impl CreateOp { unsigned.into_signed(sig) } - fn did_plc(&self) -> String { + pub fn did_plc(&self) -> String { // dump DAG-CBOR let block = Block::<DefaultParams>::encode(DagCborCodec, Code::Sha2_256, self) .expect("encode DAG-CBOR"); @@ -89,7 +89,7 @@ impl CreateOp { format!("did:plc:{}", &digest_b32[0..24]) } - fn did_doc(&self) -> serde_json::Value { + pub fn did_doc(&self) -> serde_json::Value { let did = self.did_plc(); // TODO: let user_url = format!("https://{}.test", self.username); @@ -140,7 +140,7 @@ impl CreateOp { } /// This method only makes sense on the "genesis" create object - fn verify_self(&self) -> Result<()> { + pub fn verify_self(&self) -> Result<()> { let key = PubKey::from_did_key(&self.signingKey)?; let unsigned = { let cpy = (*self).clone(); diff --git a/adenosine-pds/src/lib.rs b/adenosine-pds/src/lib.rs index 90cac3f..5627a7e 100644 --- a/adenosine-pds/src/lib.rs +++ b/adenosine-pds/src/lib.rs @@ -1,7 +1,8 @@ use anyhow::{anyhow, Result}; +use libipld::Ipld; use log::{error, info}; -use rouille::{router, try_or_400, Request, Response}; -use serde_json::json; +use rouille::{router, Request, Response}; +use serde_json::{json, Value}; use std::fmt; use std::path::PathBuf; use std::sync::Mutex; @@ -25,11 +26,15 @@ struct AccountRequest { email: String, username: String, password: String, + inviteCode: Option<String>, + recoveryKey: Option<String>, } struct AtpService { pub repo: RepoStore, pub atp_db: AtpDatabase, + pub pds_keypair: KeyPair, + pub pds_public_url: String, } #[derive(Debug)] @@ -74,6 +79,9 @@ pub fn run_server(port: u16, blockstore_db_path: &PathBuf, atp_db_path: &PathBuf let srv = Mutex::new(AtpService { repo: RepoStore::open(blockstore_db_path)?, atp_db: AtpDatabase::open(atp_db_path)?, + // XXX: reuse a keypair + pds_keypair: KeyPair::new_random(), + pds_public_url: format!("http://localhost:{}", port).to_string(), }); let log_ok = |req: &Request, _resp: &Response, elap: std::time::Duration| { @@ -105,6 +113,31 @@ pub fn run_server(port: u16, blockstore_db_path: &PathBuf, atp_db_path: &PathBuf }); } +/// Intentionally serializing with this instead of DAG-JSON, because ATP schemas don't encode CID +/// links in any special way, they just pass the CID as a string. +fn ipld_into_json_value(val: Ipld) -> Value { + match val { + Ipld::Null => Value::Null, + Ipld::Bool(b) => Value::Bool(b), + Ipld::Integer(v) => json!(v), + Ipld::Float(v) => json!(v), + Ipld::String(s) => Value::String(s), + Ipld::Bytes(b) => Value::String(data_encoding::BASE64_NOPAD.encode(&b)), + Ipld::List(l) => Value::Array(l.into_iter().map(|v| ipld_into_json_value(v)).collect()), + Ipld::Map(m) => Value::Object(serde_json::Map::from_iter( + m.into_iter().map(|(k, v)| (k, ipld_into_json_value(v))), + )), + Ipld::Link(c) => Value::String(c.to_string()), + } +} + +fn xrpc_required_param(request: &Request, key: &str) -> Result<String> { + Ok(request.get_param(key).ok_or(XrpcError::BadRequest(format!( + "require '{}' query parameter", + key + )))?) +} + fn xrpc_get_atproto( srv: &Mutex<AtpService>, method: &str, @@ -115,14 +148,14 @@ fn xrpc_get_atproto( Ok(json!({"availableUserDomains": ["test"], "inviteCodeRequired": false})) } "getRecord" => { - let did = request.get_param("user").unwrap(); - let collection = request.get_param("collection").unwrap(); - let rkey = request.get_param("rkey").unwrap(); + let did = xrpc_required_param(request, "did")?; + let collection = xrpc_required_param(request, "collection")?; + let rkey = xrpc_required_param(request, "rkey")?; let mut srv = srv.lock().expect("service mutex"); let key = format!("/{}/{}", collection, rkey); match srv.repo.get_atp_record(&did, &collection, &rkey) { // TODO: format as JSON, not text debug - Ok(Some(ipld)) => Ok(json!({ "thing": format!("{:?}", ipld) })), + Ok(Some(ipld)) => Ok(ipld_into_json_value(ipld)), Ok(None) => Err(anyhow!(XrpcError::NotFound(format!( "could not find record: {}", key @@ -131,12 +164,12 @@ fn xrpc_get_atproto( } } "syncGetRoot" => { - let did = request.get_param("did").unwrap(); + let did = xrpc_required_param(request, "did")?; let mut srv = srv.lock().expect("service mutex"); srv.repo .lookup_commit(&did)? .map(|v| json!({ "root": v })) - .ok_or(anyhow!("XXX: missing")) + .ok_or(XrpcError::NotFound(format!("no repository found for DID: {}", did)).into()) } _ => Err(anyhow!(XrpcError::NotFound(format!( "XRPC endpoint handler not found: com.atproto.{}", @@ -149,18 +182,50 @@ fn xrpc_post_atproto( srv: &Mutex<AtpService>, method: &str, request: &Request, -) -> Result<serde_json::Value> { +) -> Result<impl serde::Serialize> { match method { "createAccount" => { - // TODO: failure here is a 400, not 500 + // TODO: generate did:plc, and insert an empty record/pointer to repo + + // validate account request let req: AccountRequest = rouille::input::json_input(request) .map_err(|e| XrpcError::BadRequest(format!("failed to parse JSON body: {}", e)))?; + // TODO: validate username, email, recoverykey + + // check if account already exists (fast path, also confirmed by database schema) let mut srv = srv.lock().unwrap(); - Ok(serde_json::to_value(srv.atp_db.create_account( - &req.username, - &req.password, - &req.email, - )?)?) + if srv.atp_db.account_exists(&req.username, &req.email)? { + Err(XrpcError::BadRequest(format!( + "username or email already exists" + )))?; + }; + + // generate DID + let create_op = did::CreateOp::new( + req.username.clone(), + srv.pds_public_url.clone(), + &srv.pds_keypair, + req.recoveryKey, + ); + create_op.verify_self()?; + let did = create_op.did_plc(); + let did_doc = create_op.did_doc(); + + // register in ATP DB and generate DID doc + srv.atp_db + .create_account(&did, &req.username, &req.password, &req.email)?; + srv.atp_db.put_did_doc(&did, &did_doc)?; + + // insert empty MST repository + let root_cid = { + let empty_map_cid: String = srv.repo.mst_from_map(&Default::default())?; + let meta_cid = srv.repo.write_metadata(&did)?; + srv.repo.write_root(&did, &meta_cid, None, &empty_map_cid)? + }; + let _commit_cid = srv.repo.write_commit(&did, &root_cid, "XXX-dummy-sig")?; + + let sess = srv.atp_db.create_session(&req.username, &req.password)?; + Ok(sess) } _ => Err(anyhow!(XrpcError::NotFound(format!( "XRPC endpoint handler not found: com.atproto.{}", diff --git a/adenosine-pds/src/models.rs b/adenosine-pds/src/models.rs index 6f9bf81..cfbd877 100644 --- a/adenosine-pds/src/models.rs +++ b/adenosine-pds/src/models.rs @@ -2,7 +2,8 @@ use serde; #[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq)] pub struct AtpSession { - pub jwt: String, - pub name: String, pub did: String, + pub name: String, + pub accessJwt: String, + pub refreshJwt: String, } |