aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2022-11-04 00:49:49 -0700
committerBryan Newbold <bnewbold@robocracy.org>2022-11-04 00:49:49 -0700
commit6c5c1e84b3540e3a4da81334f34b4e53cc818f4d (patch)
tree140cd3963067c2e529dbee0fc3d2869adb9204e4
parented47a0bc0d1d6692c7c365bf33a69d8017129f96 (diff)
downloadadenosine-6c5c1e84b3540e3a4da81334f34b4e53cc818f4d.tar.gz
adenosine-6c5c1e84b3540e3a4da81334f34b4e53cc818f4d.zip
pds: more progress
-rw-r--r--adenosine-pds/src/bin/adenosine-pds.rs30
-rw-r--r--adenosine-pds/src/car.rs10
-rw-r--r--adenosine-pds/src/db.rs60
-rw-r--r--adenosine-pds/src/lib.rs118
-rw-r--r--adenosine-pds/src/models.rs27
-rw-r--r--adenosine-pds/src/mst.rs2
-rw-r--r--adenosine-pds/src/repo.rs20
7 files changed, 195 insertions, 72 deletions
diff --git a/adenosine-pds/src/bin/adenosine-pds.rs b/adenosine-pds/src/bin/adenosine-pds.rs
index ce36587..beb423a 100644
--- a/adenosine-pds/src/bin/adenosine-pds.rs
+++ b/adenosine-pds/src/bin/adenosine-pds.rs
@@ -43,8 +43,17 @@ struct Opt {
enum Command {
/// Start ATP server as a foreground process
Serve {
- #[structopt(long, default_value = "3030")]
+ /// Localhost port to listen on
+ #[structopt(long, default_value = "3030", env = "ATP_PDS_PORT")]
port: u16,
+
+ /// Secret key, encoded in hex. Use 'generate-secret' to create a new one
+ #[structopt(
+ long = "--pds-secret-key",
+ env = "ATP_PDS_SECRET_KEY",
+ hide_env_values = true
+ )]
+ pds_secret_key: String,
},
/// Helper to import an IPLD CARv1 file in to sqlite data store
@@ -53,11 +62,15 @@ enum Command {
car_path: std::path::PathBuf,
/// name of pointer to root of CAR DAG tree. Usually a DID
+ #[structopt(long, default_value = "last-import")]
alias: String,
},
/// Helper to print MST keys/docs from a sqlite repo
Inspect,
+
+ /// Generate a PDS secret key and print to stdout (as hex)
+ GenerateSecret,
}
fn main() -> Result<()> {
@@ -82,14 +95,23 @@ fn main() -> Result<()> {
debug!("config parsed, starting up");
match opt.cmd {
- Command::Serve { port } => {
+ Command::Serve {
+ port,
+ pds_secret_key,
+ } => {
// TODO: log some config stuff?
- run_server(port, &opt.blockstore_db_path, &opt.atp_db_path)
+ let keypair = KeyPair::from_hex(&pds_secret_key)?;
+ run_server(port, &opt.blockstore_db_path, &opt.atp_db_path, keypair)
}
// TODO: handle alias
Command::Import { car_path, alias } => {
- load_car_to_sqlite(&opt.blockstore_db_path, &car_path)
+ load_car_to_sqlite(&opt.blockstore_db_path, &car_path, &alias)
}
Command::Inspect {} => mst::dump_mst_keys(&opt.blockstore_db_path),
+ Command::GenerateSecret {} => {
+ let keypair = KeyPair::new_random();
+ println!("{}", keypair.to_hex());
+ Ok(())
+ }
}
}
diff --git a/adenosine-pds/src/car.rs b/adenosine-pds/src/car.rs
index 35cc3fd..63911e5 100644
--- a/adenosine-pds/src/car.rs
+++ b/adenosine-pds/src/car.rs
@@ -8,28 +8,30 @@ use std::path::PathBuf;
use tokio::fs::File;
use tokio::io::BufReader;
-pub fn load_car_to_sqlite(db_path: &PathBuf, car_path: &PathBuf) -> Result<()> {
+pub fn load_car_to_sqlite(db_path: &PathBuf, car_path: &PathBuf, alias: &str) -> Result<()> {
let mut db: BlockStore<libipld::DefaultParams> =
{ BlockStore::open(db_path, Default::default())? };
- load_car_to_blockstore(&mut db, car_path)?;
+ load_car_to_blockstore(&mut db, car_path, alias)?;
Ok(())
}
pub fn load_car_to_blockstore(
db: &mut BlockStore<libipld::DefaultParams>,
car_path: &PathBuf,
+ alias: &str,
) -> Result<Cid> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
- rt.block_on(inner_car_loader(db, car_path))
+ rt.block_on(inner_car_loader(db, car_path, alias))
}
// this async function is wrapped in the sync version above
async fn inner_car_loader(
db: &mut BlockStore<libipld::DefaultParams>,
car_path: &PathBuf,
+ alias: &str,
) -> Result<Cid> {
println!(
"{} - {}",
@@ -55,7 +57,7 @@ async fn inner_car_loader(
// pin the header (?)
if car_header.roots().len() >= 1 {
- db.alias(b"import".to_vec(), Some(&car_header.roots()[0]))?;
+ db.alias(alias.as_bytes(), Some(&car_header.roots()[0]))?;
}
Ok(car_header.roots()[0])
diff --git a/adenosine-pds/src/db.rs b/adenosine-pds/src/db.rs
index 72f2a8d..17006cb 100644
--- a/adenosine-pds/src/db.rs
+++ b/adenosine-pds/src/db.rs
@@ -3,7 +3,7 @@ use crate::{AtpSession, KeyPair};
use anyhow::{anyhow, Result};
use lazy_static::lazy_static;
use log::debug;
-use rusqlite::{params, Connection};
+use rusqlite::{params, Connection, OptionalExtension};
use rusqlite_migration::{Migrations, M};
use serde_json::Value;
use std::path::PathBuf;
@@ -60,41 +60,6 @@ impl AtpDatabase {
Ok(AtpDatabase { conn })
}
- pub fn get_record(&mut self, did: &str, collection: &str, tid: &str) -> Result<Value> {
- let mut stmt = self.conn.prepare_cached(
- "SELECT record_json FROM record WHERE did = ?1 AND collection = ?2 AND tid = ?3",
- )?;
- Ok(stmt.query_row(params!(did, collection, tid), |row| {
- row.get(0).map(|v: String| Value::from_str(&v))
- })??)
- }
-
- pub fn get_record_list(&mut self, did: &str, collection: &str) -> Result<Vec<String>> {
- let mut stmt = self
- .conn
- .prepare_cached("SELECT tid FROM record WHERE did = ?1 AND collection = ?2")?;
- let ret = stmt
- .query_and_then(params!(did, collection), |row| {
- let v: String = row.get(0)?;
- Ok(v)
- })?
- .collect();
- ret
- }
-
- pub fn get_collection_list(&mut self, did: &str) -> Result<Vec<String>> {
- let mut stmt = self
- .conn
- .prepare_cached("SELECT collection FROM record WHERE did = ?1 GROUP BY collection")?;
- let ret = stmt
- .query_and_then(params!(did), |row| {
- let v: String = row.get(0)?;
- Ok(v)
- })?
- .collect();
- ret
- }
-
/// Quick check if an account already exists for given username or email
pub fn account_exists(&mut self, username: &str, email: &str) -> Result<bool> {
let mut stmt = self
@@ -155,13 +120,21 @@ impl AtpDatabase {
})
}
- /// Returns the DID that a token is valid for
- pub fn check_auth_token(&mut self, jwt: &str) -> Result<String> {
+ /// Returns the DID that a token is valid for, or None if session not found
+ pub fn check_auth_token(&mut self, jwt: &str) -> Result<Option<String>> {
let mut stmt = self
.conn
.prepare_cached("SELECT did FROM session WHERE jwt = $1")?;
- let did = stmt.query_row(params!(jwt), |row| row.get(0))?;
- Ok(did)
+ let did_maybe = stmt.query_row(params!(jwt), |row| row.get(0)).optional()?;
+ Ok(did_maybe)
+ }
+
+ pub fn delete_session(&mut self, jwt: &str) -> Result<bool> {
+ let mut stmt = self
+ .conn
+ .prepare_cached("DELETE FROM session WHERE jwt = $1")?;
+ let count = stmt.execute(params!(jwt))?;
+ Ok(count >= 1)
}
pub fn put_did_doc(&mut self, did: &str, did_doc: &Value) -> Result<()> {
@@ -171,4 +144,11 @@ impl AtpDatabase {
stmt.execute(params!(did, did_doc.to_string()))?;
Ok(())
}
+ pub fn get_did_doc(&mut self, did: &str) -> Result<Value> {
+ let mut stmt = self
+ .conn
+ .prepare_cached("SELECT doc_json FROM did_doc WHERE did = $1")?;
+ let doc_json: String = stmt.query_row(params!(did), |row| row.get(0))?;
+ Ok(Value::from_str(&doc_json)?)
+ }
}
diff --git a/adenosine-pds/src/lib.rs b/adenosine-pds/src/lib.rs
index 4ea2f7b..0d73881 100644
--- a/adenosine-pds/src/lib.rs
+++ b/adenosine-pds/src/lib.rs
@@ -1,6 +1,6 @@
use anyhow::{anyhow, Result};
use libipld::Ipld;
-use log::{error, info, warn};
+use log::{debug, error, info, warn};
use rouille::{router, Request, Response};
use serde_json::{json, Value};
use std::fmt;
@@ -23,16 +23,6 @@ pub use models::*;
pub use repo::{RepoCommit, RepoStore};
pub use ucan_p256::P256KeyMaterial;
-#[allow(non_snake_case)]
-#[derive(Debug, serde::Deserialize, serde::Serialize, PartialEq, Eq)]
-struct AccountRequest {
- email: String,
- username: String,
- password: String,
- inviteCode: Option<String>,
- recoveryKey: Option<String>,
-}
-
struct AtpService {
pub repo: RepoStore,
pub atp_db: AtpDatabase,
@@ -77,14 +67,16 @@ fn xrpc_wrap<S: serde::Serialize>(resp: Result<S>) -> Response {
}
}
-pub fn run_server(port: u16, blockstore_db_path: &PathBuf, atp_db_path: &PathBuf) -> Result<()> {
- // TODO: some static files? https://github.com/tomaka/rouille/blob/master/examples/static-files.rs
-
+pub fn run_server(
+ port: u16,
+ blockstore_db_path: &PathBuf,
+ atp_db_path: &PathBuf,
+ keypair: KeyPair,
+) -> Result<()> {
let srv = Mutex::new(AtpService {
repo: RepoStore::open(blockstore_db_path)?,
atp_db: AtpDatabase::open(atp_db_path)?,
- // XXX: reuse a keypair
- pds_keypair: KeyPair::new_random(),
+ pds_keypair: keypair,
pds_public_url: format!("http://localhost:{}", port).to_string(),
});
@@ -99,6 +91,9 @@ pub fn run_server(port: u16, blockstore_db_path: &PathBuf, atp_db_path: &PathBuf
elap
);
};
+
+ // TODO: robots.txt
+ // TODO: some static files? https://github.com/tomaka/rouille/blob/master/examples/static-files.rs
rouille::start_server(format!("localhost:{}", port), move |request| {
rouille::log_custom(request, log_ok, log_err, || {
router!(request,
@@ -142,6 +137,21 @@ fn xrpc_required_param(request: &Request, key: &str) -> Result<String> {
)))?)
}
+/// Returns DID of validated user
+fn xrpc_check_auth_header(srv: &mut AtpService, request: &Request) -> Result<String> {
+ let header = request
+ .header("Authorization")
+ .ok_or(XrpcError::Forbidden(format!("require auth header")))?;
+ if !header.starts_with("Bearer ") {
+ Err(XrpcError::Forbidden(format!("require bearer token")))?;
+ }
+ let jwt = header.split(" ").nth(1).unwrap();
+ match srv.atp_db.check_auth_token(&jwt)? {
+ Some(did) => Ok(did),
+ None => Err(XrpcError::Forbidden(format!("session token not found")))?,
+ }
+}
+
fn xrpc_get_handler(
srv: &Mutex<AtpService>,
method: &str,
@@ -175,6 +185,45 @@ fn xrpc_get_handler(
.map(|v| json!({ "root": v }))
.ok_or(XrpcError::NotFound(format!("no repository found for DID: {}", did)).into())
}
+ "com.atproto.repoListRecords" => {
+ // TODO: limit, before, after, tid, reverse
+ // TODO: handle non-DID 'user'
+ // TODO: validate 'collection' as an NSID
+ // TODO: limit result set size
+ let did = xrpc_required_param(request, "user")?;
+ let collection = xrpc_required_param(request, "collection")?;
+ let mut record_list: Vec<Value> = vec![];
+ let mut srv = srv.lock().expect("service mutex");
+ let full_map = srv.repo.mst_to_map(&did)?;
+ let prefix = format!("/{}/", collection);
+ for (mst_key, cid) in full_map.iter() {
+ if mst_key.starts_with(&prefix) {
+ let record = srv.repo.get_ipld(cid)?;
+ record_list.push(json!({
+ "uri": format!("at://{}{}", did, mst_key),
+ "cid": cid,
+ "value": ipld_into_json_value(record),
+ }));
+ }
+ }
+ Ok(json!({ "records": record_list }))
+ }
+ "com.atproto.repoDescribe" => {
+ let did = xrpc_required_param(request, "user")?;
+ // TODO: resolve username?
+ let username = did.clone();
+ let mut srv = srv.lock().expect("service mutex");
+ let did_doc = srv.atp_db.get_did_doc(&did)?;
+ let collections: Vec<String> = srv.repo.collections(&did)?;
+ let desc = RepoDescribe {
+ name: username,
+ did: did,
+ didDoc: did_doc,
+ collections: collections,
+ nameIsCorrect: true,
+ };
+ Ok(json!(desc))
+ }
_ => Err(anyhow!(XrpcError::NotFound(format!(
"XRPC endpoint handler not found: {}",
method
@@ -186,12 +235,9 @@ fn xrpc_post_handler(
srv: &Mutex<AtpService>,
method: &str,
request: &Request,
-) -> Result<impl serde::Serialize> {
+) -> Result<serde_json::Value> {
match method {
"com.atproto.createAccount" => {
- // TODO: generate did:plc, and insert an empty record/pointer to repo
- info!("creating new account");
-
// validate account request
let req: AccountRequest = rouille::input::json_input(request)
.map_err(|e| XrpcError::BadRequest(format!("failed to parse JSON body: {}", e)))?;
@@ -205,6 +251,8 @@ fn xrpc_post_handler(
)))?;
};
+ debug!("trying to create new account: {}", &req.username);
+
// generate DID
let create_op = did::CreateOp::new(
req.username.clone(),
@@ -241,7 +289,35 @@ fn xrpc_post_handler(
let sess = srv
.atp_db
.create_session(&req.username, &req.password, &keypair)?;
- Ok(sess)
+ Ok(json!(sess))
+ }
+ "com.atproto.createSession" => {
+ let req: AccountRequest = rouille::input::json_input(request)
+ .map_err(|e| XrpcError::BadRequest(format!("failed to parse JSON body: {}", e)))?;
+ let mut srv = srv.lock().unwrap();
+ let keypair = srv.pds_keypair.clone();
+ Ok(json!(srv.atp_db.create_session(
+ &req.username,
+ &req.password,
+ &keypair
+ )?))
+ }
+ "com.atproto.deleteSession" => {
+ let mut srv = srv.lock().unwrap();
+ let _did = xrpc_check_auth_header(&mut srv, request)?;
+ let header = request
+ .header("Authorization")
+ .ok_or(XrpcError::Forbidden(format!("require auth header")))?;
+ if !header.starts_with("Bearer ") {
+ Err(XrpcError::Forbidden(format!("require bearer token")))?;
+ }
+ let jwt = header.split(" ").nth(1).expect("JWT in header");
+ if !srv.atp_db.delete_session(&jwt)? {
+ Err(anyhow!(
+ "session token not found, even after using for auth"
+ ))?
+ };
+ Ok(json!({}))
}
_ => Err(anyhow!(XrpcError::NotFound(format!(
"XRPC endpoint handler not found: {}",
diff --git a/adenosine-pds/src/models.rs b/adenosine-pds/src/models.rs
index 8f855c5..9da6104 100644
--- a/adenosine-pds/src/models.rs
+++ b/adenosine-pds/src/models.rs
@@ -1,6 +1,23 @@
use serde;
#[allow(non_snake_case)]
+#[derive(Debug, serde::Deserialize, serde::Serialize, PartialEq, Eq)]
+pub struct AccountRequest {
+ pub email: String,
+ pub username: String,
+ pub password: String,
+ pub inviteCode: Option<String>,
+ pub recoveryKey: Option<String>,
+}
+
+#[allow(non_snake_case)]
+#[derive(Debug, serde::Deserialize, serde::Serialize, PartialEq, Eq)]
+pub struct SessionRequest {
+ pub username: String,
+ pub password: String,
+}
+
+#[allow(non_snake_case)]
#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq)]
pub struct AtpSession {
pub did: String,
@@ -8,3 +25,13 @@ pub struct AtpSession {
pub accessJwt: String,
pub refreshJwt: String,
}
+
+#[allow(non_snake_case)]
+#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq)]
+pub struct RepoDescribe {
+ pub name: String,
+ pub did: String,
+ pub didDoc: serde_json::Value,
+ pub collections: Vec<String>,
+ pub nameIsCorrect: bool,
+}
diff --git a/adenosine-pds/src/mst.rs b/adenosine-pds/src/mst.rs
index b71cc73..94e5f68 100644
--- a/adenosine-pds/src/mst.rs
+++ b/adenosine-pds/src/mst.rs
@@ -304,7 +304,7 @@ pub fn repro_mst(car_path: &PathBuf) -> Result<()> {
{ BlockStore::open_path(ipfs_sqlite_block_store::DbPath::Memory, Default::default())? };
// load CAR contents from file
- load_car_to_blockstore(&mut db, car_path)?;
+ load_car_to_blockstore(&mut db, car_path, "repro-import")?;
let all_aliases: Vec<(Vec<u8>, Cid)> = db.aliases()?;
if all_aliases.is_empty() {
diff --git a/adenosine-pds/src/repo.rs b/adenosine-pds/src/repo.rs
index 5351379..8c14a18 100644
--- a/adenosine-pds/src/repo.rs
+++ b/adenosine-pds/src/repo.rs
@@ -9,6 +9,7 @@ use libipld::store::DefaultParams;
use libipld::{Block, Cid, Ipld};
use std::borrow::Cow;
use std::collections::BTreeMap;
+use std::collections::HashSet;
use std::path::PathBuf;
use std::str::FromStr;
@@ -141,6 +142,21 @@ impl RepoStore {
}
}
+ pub fn collections(&mut self, did: &str) -> Result<Vec<String>> {
+ let commit = if let Some(c) = self.lookup_commit(did)? {
+ self.get_commit(&c)?
+ } else {
+ return Err(anyhow!("DID not found in repositories: {}", did));
+ };
+ let map = self.mst_to_map(&commit.mst_cid)?;
+ let mut collections: HashSet<String> = Default::default();
+ for k in map.keys() {
+ let coll = k.split("/").nth(0).unwrap();
+ collections.insert(coll.to_string());
+ }
+ Ok(collections.into_iter().collect())
+ }
+
pub fn get_atp_record(
&mut self,
did: &str,
@@ -217,8 +233,8 @@ impl RepoStore {
}
/// returns the root commit from CAR file
- pub fn load_car(&mut self, car_path: &PathBuf) -> Result<String> {
- let cid = load_car_to_blockstore(&mut self.db, car_path)?;
+ pub fn load_car(&mut self, car_path: &PathBuf, alias: &str) -> Result<String> {
+ let cid = load_car_to_blockstore(&mut self.db, car_path, alias)?;
Ok(cid.to_string())
}