summaryrefslogtreecommitdiffstats
path: root/adenosine-pds
diff options
context:
space:
mode:
Diffstat (limited to 'adenosine-pds')
-rw-r--r--adenosine-pds/src/atp_db.sql65
-rw-r--r--adenosine-pds/src/db.rs68
-rw-r--r--adenosine-pds/src/did.rs10
-rw-r--r--adenosine-pds/src/lib.rs95
-rw-r--r--adenosine-pds/src/models.rs5
5 files changed, 182 insertions, 61 deletions
diff --git a/adenosine-pds/src/atp_db.sql b/adenosine-pds/src/atp_db.sql
index a6fce1c..259ab9d 100644
--- a/adenosine-pds/src/atp_db.sql
+++ b/adenosine-pds/src/atp_db.sql
@@ -1,20 +1,22 @@
------------ atproto system tables
+----------- atproto PDS system tables
CREATE TABLE account(
did TEXT PRIMARY KEY NOT NULL,
username TEXT NOT NULL,
email TEXT NOT NULL,
password_bcrypt TEXT NOT NULL,
- signing_key TEXT NOT NULL
+ recovery_pubkey TEXT NOT NULL,
+ created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT ( DATETIME('now') )
);
CREATE UNIQUE INDEX account_username_uniq_idx on account(lower(username));
CREATE UNIQUE INDEX account_email_uniq_idx on account(lower(email));
CREATE TABLE did_doc(
did TEXT PRIMARY KEY NOT NULL,
+ -- TODO: username TEXT NOT NULL,
doc_json TEXT NOT NULL,
- seen_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT ( DATETIME('now') )
+ indexed_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT ( DATETIME('now') )
);
CREATE TABLE session(
@@ -24,26 +26,57 @@ CREATE TABLE session(
PRIMARY KEY(did, jwt)
);
-CREATE TABLE repo(
- did TEXT PRIMARY KEY NOT NULL,
- head_commit TEXT NOT NULL
-);
+----------- bsky app/index tables
-CREATE TABLE record(
+CREATE TABLE bsky_post(
did TEXT NOT NULL,
- collection TEXT NOT NULL,
tid TEXT NOT NULL,
- record_cid TEXT NOT NULL,
+ cid TEXT NOT NULL,
record_json TEXT NOT NULL,
- PRIMARY KEY(did, collection, tid)
+ reply_root_uri TEXT,
+ created_at TIMESTAMP WITH TIME ZONE NOT NULL,
+ indexed_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT ( DATETIME('now') ),
+ PRIMARY KEY(did, tid)
);
+CREATE INDEX bsky_post_reply_root_uri_idx on bsky_post(reply_root_uri);
-CREATE TABLE password_reset(
+CREATE TABLE bsky_repost(
did TEXT NOT NULL,
- token TEXT NOT NULL,
- PRIMARY KEY(did, token)
+ subject_uri TEXT NOT NULL,
+ cid TEXT NOT NULL,
+ created_at TIMESTAMP WITH TIME ZONE NOT NULL,
+ indexed_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT ( DATETIME('now') ),
+ PRIMARY KEY(did, subject_uri)
);
+CREATE INDEX bsky_repost_subject_uri_idx on bsky_repost(subject_uri);
------------ bsky app/index tables
+CREATE TABLE bsky_like(
+ did TEXT NOT NULL,
+ subject_uri TEXT NOT NULL,
+ cid TEXT NOT NULL,
+ created_at TIMESTAMP WITH TIME ZONE NOT NULL,
+ indexed_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT ( DATETIME('now') ),
+ PRIMARY KEY(did, subject_uri)
+);
+CREATE INDEX bsky_like_subject_uri_idx on bsky_like(subject_uri);
--- TODO
+CREATE TABLE bsky_follow(
+ did TEXT NOT NULL,
+ subject_did TEXT NOT NULL,
+ cid TEXT NOT NULL,
+ created_at TIMESTAMP WITH TIME ZONE NOT NULL,
+ indexed_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT ( DATETIME('now') ),
+ PRIMARY KEY(did, subject_did)
+);
+CREATE INDEX bsky_follow_subject_did_idx on bsky_follow(subject_did);
+
+-- TODO: notifications
+CREATE TABLE bsky_notification(
+ pk INTEGER PRIMARY KEY AUTOINCREMENT,
+ user_did TEXT NOT NULL,
+ subject_uri TEXT NOT NULL,
+ subject_cid TEXT NOT NULL,
+ reason TEXT NOT NULL,
+ seen_at TIMESTAMP WITH TIME ZONE,
+ indexed_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT ( DATETIME('now') ),
+);
diff --git a/adenosine-pds/src/db.rs b/adenosine-pds/src/db.rs
index 35c798d..03f6c68 100644
--- a/adenosine-pds/src/db.rs
+++ b/adenosine-pds/src/db.rs
@@ -95,45 +95,67 @@ impl AtpDatabase {
ret
}
+ /// Quick check if an account already exists for given username or email
+ pub fn account_exists(&mut self, username: &str, email: &str) -> Result<bool> {
+ let mut stmt = self
+ .conn
+ .prepare_cached("SELECT COUNT(*) FROM account WHERE username = $1 OR email = $2")?;
+ let count: i32 = stmt.query_row(params!(username, email), |row| row.get(0))?;
+ Ok(count > 0)
+ }
+
pub fn create_account(
&mut self,
+ did: &str,
username: &str,
password: &str,
email: &str,
- ) -> Result<AtpSession> {
- // TODO: validate email (regex?)
- // TODO: validate username
- // TODO: generate and store signing key
- // TODO: generate plc did (randomly for now?)
- // TODO: insert did_doc
- // TODO: also need to initialize repo with... profile?
- {
- debug!("bcrypt hashing password (can be slow)...");
- let password_bcrypt = bcrypt::hash(password, bcrypt::DEFAULT_COST)?;
- let signing_key = "key:TODO";
- let did = "did:TODO";
- let mut stmt = self
- .conn
- .prepare_cached("INSERT INTO account (username, password_bcrypt, email, did, signing_key) VALUES (?1, ?2, ?3, ?4, ?5)")?;
- stmt.execute(params!(username, password_bcrypt, email, did, signing_key))?;
- }
- self.create_session(username, password)
+ ) -> Result<()> {
+ debug!("bcrypt hashing password (can be slow)...");
+ let password_bcrypt = bcrypt::hash(password, bcrypt::DEFAULT_COST)?;
+ let did = "did:TODO";
+ let mut stmt = self.conn.prepare_cached(
+ "INSERT INTO account (username, password_bcrypt, email, did) VALUES (?1, ?2, ?3, ?4)",
+ )?;
+ stmt.execute(params!(username, password_bcrypt, email, did))?;
+ Ok(())
}
+ /// Returns a JWT session token
pub fn create_session(&mut self, username: &str, password: &str) -> Result<AtpSession> {
let mut stmt = self
.conn
- .prepare_cached("SELECT password_bcrypt FROM account WHERE username = ?1")?;
- let password_bcrypt: String = stmt.query_row(params!(username), |row| row.get(0))?;
+ .prepare_cached("SELECT did, password_bcrypt FROM account WHERE username = ?1")?;
+ let (did, password_bcrypt): (String, String) =
+ stmt.query_row(params!(username), |row| Ok((row.get(0)?, row.get(1)?)))?;
if !bcrypt::verify(password, &password_bcrypt)? {
return Err(anyhow!("password did not match"));
}
// TODO: generate JWT
- // TODO: insert session wtih JWT
+ // TODO: insert session with JWT
+ let jwt = "jwt:BOGUS";
Ok(AtpSession {
+ did,
name: username.to_string(),
- did: "did:TODO".to_string(),
- jwt: "jwt:TODO".to_string(),
+ accessJwt: jwt.to_string(),
+ refreshJwt: jwt.to_string(),
})
}
+
+ /// Returns the DID that a token is valid for
+ pub fn check_auth_token(&mut self, jwt: &str) -> Result<String> {
+ let mut stmt = self
+ .conn
+ .prepare_cached("SELECT did FROM session WHERE jwt = $1")?;
+ let did = stmt.query_row(params!(jwt), |row| row.get(0))?;
+ Ok(did)
+ }
+
+ pub fn put_did_doc(&mut self, did: &str, did_doc: &Value) -> Result<()> {
+ let mut stmt = self
+ .conn
+ .prepare_cached("INSERT INTO did_doc (did, doc_json) VALUES (?1, ?2)")?;
+ stmt.execute(params!(did, did_doc.to_string()))?;
+ Ok(())
+ }
}
diff --git a/adenosine-pds/src/did.rs b/adenosine-pds/src/did.rs
index f33f512..109a717 100644
--- a/adenosine-pds/src/did.rs
+++ b/adenosine-pds/src/did.rs
@@ -12,7 +12,7 @@ use serde_json::json;
#[allow(non_snake_case)]
#[derive(Debug, DagCbor, PartialEq, Eq, Clone)]
-struct CreateOp {
+pub struct CreateOp {
#[ipld(rename = "type")]
pub op_type: String,
pub signingKey: String,
@@ -50,7 +50,7 @@ impl UnsignedCreateOp {
}
impl CreateOp {
- fn new(
+ pub fn new(
username: String,
atp_pds: String,
keypair: &KeyPair,
@@ -72,7 +72,7 @@ impl CreateOp {
unsigned.into_signed(sig)
}
- fn did_plc(&self) -> String {
+ pub fn did_plc(&self) -> String {
// dump DAG-CBOR
let block = Block::<DefaultParams>::encode(DagCborCodec, Code::Sha2_256, self)
.expect("encode DAG-CBOR");
@@ -89,7 +89,7 @@ impl CreateOp {
format!("did:plc:{}", &digest_b32[0..24])
}
- fn did_doc(&self) -> serde_json::Value {
+ pub fn did_doc(&self) -> serde_json::Value {
let did = self.did_plc();
// TODO:
let user_url = format!("https://{}.test", self.username);
@@ -140,7 +140,7 @@ impl CreateOp {
}
/// This method only makes sense on the "genesis" create object
- fn verify_self(&self) -> Result<()> {
+ pub fn verify_self(&self) -> Result<()> {
let key = PubKey::from_did_key(&self.signingKey)?;
let unsigned = {
let cpy = (*self).clone();
diff --git a/adenosine-pds/src/lib.rs b/adenosine-pds/src/lib.rs
index 90cac3f..5627a7e 100644
--- a/adenosine-pds/src/lib.rs
+++ b/adenosine-pds/src/lib.rs
@@ -1,7 +1,8 @@
use anyhow::{anyhow, Result};
+use libipld::Ipld;
use log::{error, info};
-use rouille::{router, try_or_400, Request, Response};
-use serde_json::json;
+use rouille::{router, Request, Response};
+use serde_json::{json, Value};
use std::fmt;
use std::path::PathBuf;
use std::sync::Mutex;
@@ -25,11 +26,15 @@ struct AccountRequest {
email: String,
username: String,
password: String,
+ inviteCode: Option<String>,
+ recoveryKey: Option<String>,
}
struct AtpService {
pub repo: RepoStore,
pub atp_db: AtpDatabase,
+ pub pds_keypair: KeyPair,
+ pub pds_public_url: String,
}
#[derive(Debug)]
@@ -74,6 +79,9 @@ pub fn run_server(port: u16, blockstore_db_path: &PathBuf, atp_db_path: &PathBuf
let srv = Mutex::new(AtpService {
repo: RepoStore::open(blockstore_db_path)?,
atp_db: AtpDatabase::open(atp_db_path)?,
+ // XXX: reuse a keypair
+ pds_keypair: KeyPair::new_random(),
+ pds_public_url: format!("http://localhost:{}", port).to_string(),
});
let log_ok = |req: &Request, _resp: &Response, elap: std::time::Duration| {
@@ -105,6 +113,31 @@ pub fn run_server(port: u16, blockstore_db_path: &PathBuf, atp_db_path: &PathBuf
});
}
+/// Intentionally serializing with this instead of DAG-JSON, because ATP schemas don't encode CID
+/// links in any special way, they just pass the CID as a string.
+fn ipld_into_json_value(val: Ipld) -> Value {
+ match val {
+ Ipld::Null => Value::Null,
+ Ipld::Bool(b) => Value::Bool(b),
+ Ipld::Integer(v) => json!(v),
+ Ipld::Float(v) => json!(v),
+ Ipld::String(s) => Value::String(s),
+ Ipld::Bytes(b) => Value::String(data_encoding::BASE64_NOPAD.encode(&b)),
+ Ipld::List(l) => Value::Array(l.into_iter().map(|v| ipld_into_json_value(v)).collect()),
+ Ipld::Map(m) => Value::Object(serde_json::Map::from_iter(
+ m.into_iter().map(|(k, v)| (k, ipld_into_json_value(v))),
+ )),
+ Ipld::Link(c) => Value::String(c.to_string()),
+ }
+}
+
+fn xrpc_required_param(request: &Request, key: &str) -> Result<String> {
+ Ok(request.get_param(key).ok_or(XrpcError::BadRequest(format!(
+ "require '{}' query parameter",
+ key
+ )))?)
+}
+
fn xrpc_get_atproto(
srv: &Mutex<AtpService>,
method: &str,
@@ -115,14 +148,14 @@ fn xrpc_get_atproto(
Ok(json!({"availableUserDomains": ["test"], "inviteCodeRequired": false}))
}
"getRecord" => {
- let did = request.get_param("user").unwrap();
- let collection = request.get_param("collection").unwrap();
- let rkey = request.get_param("rkey").unwrap();
+ let did = xrpc_required_param(request, "did")?;
+ let collection = xrpc_required_param(request, "collection")?;
+ let rkey = xrpc_required_param(request, "rkey")?;
let mut srv = srv.lock().expect("service mutex");
let key = format!("/{}/{}", collection, rkey);
match srv.repo.get_atp_record(&did, &collection, &rkey) {
// TODO: format as JSON, not text debug
- Ok(Some(ipld)) => Ok(json!({ "thing": format!("{:?}", ipld) })),
+ Ok(Some(ipld)) => Ok(ipld_into_json_value(ipld)),
Ok(None) => Err(anyhow!(XrpcError::NotFound(format!(
"could not find record: {}",
key
@@ -131,12 +164,12 @@ fn xrpc_get_atproto(
}
}
"syncGetRoot" => {
- let did = request.get_param("did").unwrap();
+ let did = xrpc_required_param(request, "did")?;
let mut srv = srv.lock().expect("service mutex");
srv.repo
.lookup_commit(&did)?
.map(|v| json!({ "root": v }))
- .ok_or(anyhow!("XXX: missing"))
+ .ok_or(XrpcError::NotFound(format!("no repository found for DID: {}", did)).into())
}
_ => Err(anyhow!(XrpcError::NotFound(format!(
"XRPC endpoint handler not found: com.atproto.{}",
@@ -149,18 +182,50 @@ fn xrpc_post_atproto(
srv: &Mutex<AtpService>,
method: &str,
request: &Request,
-) -> Result<serde_json::Value> {
+) -> Result<impl serde::Serialize> {
match method {
"createAccount" => {
- // TODO: failure here is a 400, not 500
+ // TODO: generate did:plc, and insert an empty record/pointer to repo
+
+ // validate account request
let req: AccountRequest = rouille::input::json_input(request)
.map_err(|e| XrpcError::BadRequest(format!("failed to parse JSON body: {}", e)))?;
+ // TODO: validate username, email, recoverykey
+
+ // check if account already exists (fast path, also confirmed by database schema)
let mut srv = srv.lock().unwrap();
- Ok(serde_json::to_value(srv.atp_db.create_account(
- &req.username,
- &req.password,
- &req.email,
- )?)?)
+ if srv.atp_db.account_exists(&req.username, &req.email)? {
+ Err(XrpcError::BadRequest(format!(
+ "username or email already exists"
+ )))?;
+ };
+
+ // generate DID
+ let create_op = did::CreateOp::new(
+ req.username.clone(),
+ srv.pds_public_url.clone(),
+ &srv.pds_keypair,
+ req.recoveryKey,
+ );
+ create_op.verify_self()?;
+ let did = create_op.did_plc();
+ let did_doc = create_op.did_doc();
+
+ // register in ATP DB and generate DID doc
+ srv.atp_db
+ .create_account(&did, &req.username, &req.password, &req.email)?;
+ srv.atp_db.put_did_doc(&did, &did_doc)?;
+
+ // insert empty MST repository
+ let root_cid = {
+ let empty_map_cid: String = srv.repo.mst_from_map(&Default::default())?;
+ let meta_cid = srv.repo.write_metadata(&did)?;
+ srv.repo.write_root(&did, &meta_cid, None, &empty_map_cid)?
+ };
+ let _commit_cid = srv.repo.write_commit(&did, &root_cid, "XXX-dummy-sig")?;
+
+ let sess = srv.atp_db.create_session(&req.username, &req.password)?;
+ Ok(sess)
}
_ => Err(anyhow!(XrpcError::NotFound(format!(
"XRPC endpoint handler not found: com.atproto.{}",
diff --git a/adenosine-pds/src/models.rs b/adenosine-pds/src/models.rs
index 6f9bf81..cfbd877 100644
--- a/adenosine-pds/src/models.rs
+++ b/adenosine-pds/src/models.rs
@@ -2,7 +2,8 @@ use serde;
#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq)]
pub struct AtpSession {
- pub jwt: String,
- pub name: String,
pub did: String,
+ pub name: String,
+ pub accessJwt: String,
+ pub refreshJwt: String,
}