diff options
Diffstat (limited to 'adenosine-pds/src')
| -rw-r--r-- | adenosine-pds/src/bin/adenosine-pds.rs | 30 | ||||
| -rw-r--r-- | adenosine-pds/src/car.rs | 10 | ||||
| -rw-r--r-- | adenosine-pds/src/db.rs | 60 | ||||
| -rw-r--r-- | adenosine-pds/src/lib.rs | 118 | ||||
| -rw-r--r-- | adenosine-pds/src/models.rs | 27 | ||||
| -rw-r--r-- | adenosine-pds/src/mst.rs | 2 | ||||
| -rw-r--r-- | adenosine-pds/src/repo.rs | 20 | 
7 files changed, 195 insertions, 72 deletions
| diff --git a/adenosine-pds/src/bin/adenosine-pds.rs b/adenosine-pds/src/bin/adenosine-pds.rs index ce36587..beb423a 100644 --- a/adenosine-pds/src/bin/adenosine-pds.rs +++ b/adenosine-pds/src/bin/adenosine-pds.rs @@ -43,8 +43,17 @@ struct Opt {  enum Command {      /// Start ATP server as a foreground process      Serve { -        #[structopt(long, default_value = "3030")] +        /// Localhost port to listen on +        #[structopt(long, default_value = "3030", env = "ATP_PDS_PORT")]          port: u16, + +        /// Secret key, encoded in hex. Use 'generate-secret' to create a new one +        #[structopt( +            long = "--pds-secret-key", +            env = "ATP_PDS_SECRET_KEY", +            hide_env_values = true +        )] +        pds_secret_key: String,      },      /// Helper to import an IPLD CARv1 file in to sqlite data store @@ -53,11 +62,15 @@ enum Command {          car_path: std::path::PathBuf,          /// name of pointer to root of CAR DAG tree. Usually a DID +        #[structopt(long, default_value = "last-import")]          alias: String,      },      /// Helper to print MST keys/docs from a sqlite repo      Inspect, + +    /// Generate a PDS secret key and print to stdout (as hex) +    GenerateSecret,  }  fn main() -> Result<()> { @@ -82,14 +95,23 @@ fn main() -> Result<()> {      debug!("config parsed, starting up");      match opt.cmd { -        Command::Serve { port } => { +        Command::Serve { +            port, +            pds_secret_key, +        } => {              // TODO: log some config stuff? -            run_server(port, &opt.blockstore_db_path, &opt.atp_db_path) +            let keypair = KeyPair::from_hex(&pds_secret_key)?; +            run_server(port, &opt.blockstore_db_path, &opt.atp_db_path, keypair)          }          // TODO: handle alias          Command::Import { car_path, alias } => { -            load_car_to_sqlite(&opt.blockstore_db_path, &car_path) +            load_car_to_sqlite(&opt.blockstore_db_path, &car_path, &alias)          }          Command::Inspect {} => mst::dump_mst_keys(&opt.blockstore_db_path), +        Command::GenerateSecret {} => { +            let keypair = KeyPair::new_random(); +            println!("{}", keypair.to_hex()); +            Ok(()) +        }      }  } diff --git a/adenosine-pds/src/car.rs b/adenosine-pds/src/car.rs index 35cc3fd..63911e5 100644 --- a/adenosine-pds/src/car.rs +++ b/adenosine-pds/src/car.rs @@ -8,28 +8,30 @@ use std::path::PathBuf;  use tokio::fs::File;  use tokio::io::BufReader; -pub fn load_car_to_sqlite(db_path: &PathBuf, car_path: &PathBuf) -> Result<()> { +pub fn load_car_to_sqlite(db_path: &PathBuf, car_path: &PathBuf, alias: &str) -> Result<()> {      let mut db: BlockStore<libipld::DefaultParams> =          { BlockStore::open(db_path, Default::default())? }; -    load_car_to_blockstore(&mut db, car_path)?; +    load_car_to_blockstore(&mut db, car_path, alias)?;      Ok(())  }  pub fn load_car_to_blockstore(      db: &mut BlockStore<libipld::DefaultParams>,      car_path: &PathBuf, +    alias: &str,  ) -> Result<Cid> {      let rt = tokio::runtime::Builder::new_current_thread()          .enable_all()          .build()?; -    rt.block_on(inner_car_loader(db, car_path)) +    rt.block_on(inner_car_loader(db, car_path, alias))  }  // this async function is wrapped in the sync version above  async fn inner_car_loader(      db: &mut BlockStore<libipld::DefaultParams>,      car_path: &PathBuf, +    alias: &str,  ) -> Result<Cid> {      println!(          "{} - {}", @@ -55,7 +57,7 @@ async fn inner_car_loader(      // pin the header (?)      if car_header.roots().len() >= 1 { -        db.alias(b"import".to_vec(), Some(&car_header.roots()[0]))?; +        db.alias(alias.as_bytes(), Some(&car_header.roots()[0]))?;      }      Ok(car_header.roots()[0]) diff --git a/adenosine-pds/src/db.rs b/adenosine-pds/src/db.rs index 72f2a8d..17006cb 100644 --- a/adenosine-pds/src/db.rs +++ b/adenosine-pds/src/db.rs @@ -3,7 +3,7 @@ use crate::{AtpSession, KeyPair};  use anyhow::{anyhow, Result};  use lazy_static::lazy_static;  use log::debug; -use rusqlite::{params, Connection}; +use rusqlite::{params, Connection, OptionalExtension};  use rusqlite_migration::{Migrations, M};  use serde_json::Value;  use std::path::PathBuf; @@ -60,41 +60,6 @@ impl AtpDatabase {          Ok(AtpDatabase { conn })      } -    pub fn get_record(&mut self, did: &str, collection: &str, tid: &str) -> Result<Value> { -        let mut stmt = self.conn.prepare_cached( -            "SELECT record_json FROM record WHERE did = ?1 AND collection = ?2 AND tid = ?3", -        )?; -        Ok(stmt.query_row(params!(did, collection, tid), |row| { -            row.get(0).map(|v: String| Value::from_str(&v)) -        })??) -    } - -    pub fn get_record_list(&mut self, did: &str, collection: &str) -> Result<Vec<String>> { -        let mut stmt = self -            .conn -            .prepare_cached("SELECT tid FROM record WHERE did = ?1 AND collection = ?2")?; -        let ret = stmt -            .query_and_then(params!(did, collection), |row| { -                let v: String = row.get(0)?; -                Ok(v) -            })? -            .collect(); -        ret -    } - -    pub fn get_collection_list(&mut self, did: &str) -> Result<Vec<String>> { -        let mut stmt = self -            .conn -            .prepare_cached("SELECT collection FROM record WHERE did = ?1 GROUP BY collection")?; -        let ret = stmt -            .query_and_then(params!(did), |row| { -                let v: String = row.get(0)?; -                Ok(v) -            })? -            .collect(); -        ret -    } -      /// Quick check if an account already exists for given username or email      pub fn account_exists(&mut self, username: &str, email: &str) -> Result<bool> {          let mut stmt = self @@ -155,13 +120,21 @@ impl AtpDatabase {          })      } -    /// Returns the DID that a token is valid for -    pub fn check_auth_token(&mut self, jwt: &str) -> Result<String> { +    /// Returns the DID that a token is valid for, or None if session not found +    pub fn check_auth_token(&mut self, jwt: &str) -> Result<Option<String>> {          let mut stmt = self              .conn              .prepare_cached("SELECT did FROM session WHERE jwt = $1")?; -        let did = stmt.query_row(params!(jwt), |row| row.get(0))?; -        Ok(did) +        let did_maybe = stmt.query_row(params!(jwt), |row| row.get(0)).optional()?; +        Ok(did_maybe) +    } + +    pub fn delete_session(&mut self, jwt: &str) -> Result<bool> { +        let mut stmt = self +            .conn +            .prepare_cached("DELETE FROM session WHERE jwt = $1")?; +        let count = stmt.execute(params!(jwt))?; +        Ok(count >= 1)      }      pub fn put_did_doc(&mut self, did: &str, did_doc: &Value) -> Result<()> { @@ -171,4 +144,11 @@ impl AtpDatabase {          stmt.execute(params!(did, did_doc.to_string()))?;          Ok(())      } +    pub fn get_did_doc(&mut self, did: &str) -> Result<Value> { +        let mut stmt = self +            .conn +            .prepare_cached("SELECT doc_json FROM did_doc WHERE did = $1")?; +        let doc_json: String = stmt.query_row(params!(did), |row| row.get(0))?; +        Ok(Value::from_str(&doc_json)?) +    }  } diff --git a/adenosine-pds/src/lib.rs b/adenosine-pds/src/lib.rs index 4ea2f7b..0d73881 100644 --- a/adenosine-pds/src/lib.rs +++ b/adenosine-pds/src/lib.rs @@ -1,6 +1,6 @@  use anyhow::{anyhow, Result};  use libipld::Ipld; -use log::{error, info, warn}; +use log::{debug, error, info, warn};  use rouille::{router, Request, Response};  use serde_json::{json, Value};  use std::fmt; @@ -23,16 +23,6 @@ pub use models::*;  pub use repo::{RepoCommit, RepoStore};  pub use ucan_p256::P256KeyMaterial; -#[allow(non_snake_case)] -#[derive(Debug, serde::Deserialize, serde::Serialize, PartialEq, Eq)] -struct AccountRequest { -    email: String, -    username: String, -    password: String, -    inviteCode: Option<String>, -    recoveryKey: Option<String>, -} -  struct AtpService {      pub repo: RepoStore,      pub atp_db: AtpDatabase, @@ -77,14 +67,16 @@ fn xrpc_wrap<S: serde::Serialize>(resp: Result<S>) -> Response {      }  } -pub fn run_server(port: u16, blockstore_db_path: &PathBuf, atp_db_path: &PathBuf) -> Result<()> { -    // TODO: some static files? https://github.com/tomaka/rouille/blob/master/examples/static-files.rs - +pub fn run_server( +    port: u16, +    blockstore_db_path: &PathBuf, +    atp_db_path: &PathBuf, +    keypair: KeyPair, +) -> Result<()> {      let srv = Mutex::new(AtpService {          repo: RepoStore::open(blockstore_db_path)?,          atp_db: AtpDatabase::open(atp_db_path)?, -        // XXX: reuse a keypair -        pds_keypair: KeyPair::new_random(), +        pds_keypair: keypair,          pds_public_url: format!("http://localhost:{}", port).to_string(),      }); @@ -99,6 +91,9 @@ pub fn run_server(port: u16, blockstore_db_path: &PathBuf, atp_db_path: &PathBuf              elap          );      }; + +    // TODO: robots.txt +    // TODO: some static files? https://github.com/tomaka/rouille/blob/master/examples/static-files.rs      rouille::start_server(format!("localhost:{}", port), move |request| {          rouille::log_custom(request, log_ok, log_err, || {              router!(request, @@ -142,6 +137,21 @@ fn xrpc_required_param(request: &Request, key: &str) -> Result<String> {      )))?)  } +/// Returns DID of validated user +fn xrpc_check_auth_header(srv: &mut AtpService, request: &Request) -> Result<String> { +    let header = request +        .header("Authorization") +        .ok_or(XrpcError::Forbidden(format!("require auth header")))?; +    if !header.starts_with("Bearer ") { +        Err(XrpcError::Forbidden(format!("require bearer token")))?; +    } +    let jwt = header.split(" ").nth(1).unwrap(); +    match srv.atp_db.check_auth_token(&jwt)? { +        Some(did) => Ok(did), +        None => Err(XrpcError::Forbidden(format!("session token not found")))?, +    } +} +  fn xrpc_get_handler(      srv: &Mutex<AtpService>,      method: &str, @@ -175,6 +185,45 @@ fn xrpc_get_handler(                  .map(|v| json!({ "root": v }))                  .ok_or(XrpcError::NotFound(format!("no repository found for DID: {}", did)).into())          } +        "com.atproto.repoListRecords" => { +            // TODO: limit, before, after, tid, reverse +            // TODO: handle non-DID 'user' +            // TODO: validate 'collection' as an NSID +            // TODO: limit result set size +            let did = xrpc_required_param(request, "user")?; +            let collection = xrpc_required_param(request, "collection")?; +            let mut record_list: Vec<Value> = vec![]; +            let mut srv = srv.lock().expect("service mutex"); +            let full_map = srv.repo.mst_to_map(&did)?; +            let prefix = format!("/{}/", collection); +            for (mst_key, cid) in full_map.iter() { +                if mst_key.starts_with(&prefix) { +                    let record = srv.repo.get_ipld(cid)?; +                    record_list.push(json!({ +                        "uri": format!("at://{}{}", did, mst_key), +                        "cid": cid, +                        "value": ipld_into_json_value(record), +                    })); +                } +            } +            Ok(json!({ "records": record_list })) +        } +        "com.atproto.repoDescribe" => { +            let did = xrpc_required_param(request, "user")?; +            // TODO: resolve username? +            let username = did.clone(); +            let mut srv = srv.lock().expect("service mutex"); +            let did_doc = srv.atp_db.get_did_doc(&did)?; +            let collections: Vec<String> = srv.repo.collections(&did)?; +            let desc = RepoDescribe { +                name: username, +                did: did, +                didDoc: did_doc, +                collections: collections, +                nameIsCorrect: true, +            }; +            Ok(json!(desc)) +        }          _ => Err(anyhow!(XrpcError::NotFound(format!(              "XRPC endpoint handler not found: {}",              method @@ -186,12 +235,9 @@ fn xrpc_post_handler(      srv: &Mutex<AtpService>,      method: &str,      request: &Request, -) -> Result<impl serde::Serialize> { +) -> Result<serde_json::Value> {      match method {          "com.atproto.createAccount" => { -            // TODO: generate did:plc, and insert an empty record/pointer to repo -            info!("creating new account"); -              // validate account request              let req: AccountRequest = rouille::input::json_input(request)                  .map_err(|e| XrpcError::BadRequest(format!("failed to parse JSON body: {}", e)))?; @@ -205,6 +251,8 @@ fn xrpc_post_handler(                  )))?;              }; +            debug!("trying to create new account: {}", &req.username); +              // generate DID              let create_op = did::CreateOp::new(                  req.username.clone(), @@ -241,7 +289,35 @@ fn xrpc_post_handler(              let sess = srv                  .atp_db                  .create_session(&req.username, &req.password, &keypair)?; -            Ok(sess) +            Ok(json!(sess)) +        } +        "com.atproto.createSession" => { +            let req: AccountRequest = rouille::input::json_input(request) +                .map_err(|e| XrpcError::BadRequest(format!("failed to parse JSON body: {}", e)))?; +            let mut srv = srv.lock().unwrap(); +            let keypair = srv.pds_keypair.clone(); +            Ok(json!(srv.atp_db.create_session( +                &req.username, +                &req.password, +                &keypair +            )?)) +        } +        "com.atproto.deleteSession" => { +            let mut srv = srv.lock().unwrap(); +            let _did = xrpc_check_auth_header(&mut srv, request)?; +            let header = request +                .header("Authorization") +                .ok_or(XrpcError::Forbidden(format!("require auth header")))?; +            if !header.starts_with("Bearer ") { +                Err(XrpcError::Forbidden(format!("require bearer token")))?; +            } +            let jwt = header.split(" ").nth(1).expect("JWT in header"); +            if !srv.atp_db.delete_session(&jwt)? { +                Err(anyhow!( +                    "session token not found, even after using for auth" +                ))? +            }; +            Ok(json!({}))          }          _ => Err(anyhow!(XrpcError::NotFound(format!(              "XRPC endpoint handler not found: {}", diff --git a/adenosine-pds/src/models.rs b/adenosine-pds/src/models.rs index 8f855c5..9da6104 100644 --- a/adenosine-pds/src/models.rs +++ b/adenosine-pds/src/models.rs @@ -1,6 +1,23 @@  use serde;  #[allow(non_snake_case)] +#[derive(Debug, serde::Deserialize, serde::Serialize, PartialEq, Eq)] +pub struct AccountRequest { +    pub email: String, +    pub username: String, +    pub password: String, +    pub inviteCode: Option<String>, +    pub recoveryKey: Option<String>, +} + +#[allow(non_snake_case)] +#[derive(Debug, serde::Deserialize, serde::Serialize, PartialEq, Eq)] +pub struct SessionRequest { +    pub username: String, +    pub password: String, +} + +#[allow(non_snake_case)]  #[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq)]  pub struct AtpSession {      pub did: String, @@ -8,3 +25,13 @@ pub struct AtpSession {      pub accessJwt: String,      pub refreshJwt: String,  } + +#[allow(non_snake_case)] +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq)] +pub struct RepoDescribe { +    pub name: String, +    pub did: String, +    pub didDoc: serde_json::Value, +    pub collections: Vec<String>, +    pub nameIsCorrect: bool, +} diff --git a/adenosine-pds/src/mst.rs b/adenosine-pds/src/mst.rs index b71cc73..94e5f68 100644 --- a/adenosine-pds/src/mst.rs +++ b/adenosine-pds/src/mst.rs @@ -304,7 +304,7 @@ pub fn repro_mst(car_path: &PathBuf) -> Result<()> {          { BlockStore::open_path(ipfs_sqlite_block_store::DbPath::Memory, Default::default())? };      // load CAR contents from file -    load_car_to_blockstore(&mut db, car_path)?; +    load_car_to_blockstore(&mut db, car_path, "repro-import")?;      let all_aliases: Vec<(Vec<u8>, Cid)> = db.aliases()?;      if all_aliases.is_empty() { diff --git a/adenosine-pds/src/repo.rs b/adenosine-pds/src/repo.rs index 5351379..8c14a18 100644 --- a/adenosine-pds/src/repo.rs +++ b/adenosine-pds/src/repo.rs @@ -9,6 +9,7 @@ use libipld::store::DefaultParams;  use libipld::{Block, Cid, Ipld};  use std::borrow::Cow;  use std::collections::BTreeMap; +use std::collections::HashSet;  use std::path::PathBuf;  use std::str::FromStr; @@ -141,6 +142,21 @@ impl RepoStore {          }      } +    pub fn collections(&mut self, did: &str) -> Result<Vec<String>> { +        let commit = if let Some(c) = self.lookup_commit(did)? { +            self.get_commit(&c)? +        } else { +            return Err(anyhow!("DID not found in repositories: {}", did)); +        }; +        let map = self.mst_to_map(&commit.mst_cid)?; +        let mut collections: HashSet<String> = Default::default(); +        for k in map.keys() { +            let coll = k.split("/").nth(0).unwrap(); +            collections.insert(coll.to_string()); +        } +        Ok(collections.into_iter().collect()) +    } +      pub fn get_atp_record(          &mut self,          did: &str, @@ -217,8 +233,8 @@ impl RepoStore {      }      /// returns the root commit from CAR file -    pub fn load_car(&mut self, car_path: &PathBuf) -> Result<String> { -        let cid = load_car_to_blockstore(&mut self.db, car_path)?; +    pub fn load_car(&mut self, car_path: &PathBuf, alias: &str) -> Result<String> { +        let cid = load_car_to_blockstore(&mut self.db, car_path, alias)?;          Ok(cid.to_string())      } | 
