summaryrefslogtreecommitdiffstats
path: root/adenosine-pds/src
diff options
context:
space:
mode:
Diffstat (limited to 'adenosine-pds/src')
-rw-r--r--adenosine-pds/src/atp_db.sql50
-rw-r--r--adenosine-pds/src/bin/adenosine-pds.rs6
-rw-r--r--adenosine-pds/src/car.rs61
-rw-r--r--adenosine-pds/src/db.rs137
-rw-r--r--adenosine-pds/src/lib.rs71
-rw-r--r--adenosine-pds/src/models.rs8
-rw-r--r--adenosine-pds/src/mst.rs11
7 files changed, 283 insertions, 61 deletions
diff --git a/adenosine-pds/src/atp_db.sql b/adenosine-pds/src/atp_db.sql
new file mode 100644
index 0000000..918a89c
--- /dev/null
+++ b/adenosine-pds/src/atp_db.sql
@@ -0,0 +1,50 @@
+
+----------- atproto system tables
+
+CREATE TABLE account(
+ did TEXT PRIMARY KEY NOT NULL,
+ username TEXT NOT NULL,
+ email TEXT NOT NULL,
+ password_bcrypt TEXT NOT NULL,
+ signing_key TEXT NOT NULL,
+);
+CREATE UNIQUE INDEX account_username_uniq_idx on account(lower(username));
+CREATE UNIQUE INDEX account_email_uniq_idx on account(lower(email));
+
+CREATE TABLE did_doc(
+ did TEXT PRIMARY KEY NOT NULL,
+ doc_json TEXT NOT NULL,
+ seen_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
+);
+
+CREATE TABLE session(
+ did TEXT NOT NULL,
+ jwt TEXT NOT NULL,
+ created_at TIMESTAMP WITH TIME ZONE DEFAULT now() NOT NULL,
+ PRIMARY KEY(did, jwt)
+);
+
+CREATE TABLE repo(
+ did TEXT PRIMARY KEY NOT NULL,
+ head_commit TEXT NOT NULL,
+);
+
+CREATE TABLE record(
+ did TEXT NOT NULL,
+ collection TEXT NOT NULL,
+ tid TEXT NOT NULL,
+ record_cid TEXT NOT NULL,
+ record_json TEXT NOT NULL,
+ PRIMARY KEY(did, collection, tid)
+);
+
+CREATE TABLE password_reset(
+ did TEXT NOT NULL,
+ token TEXT NOT NULL,
+ PRIMARY KEY(did, token)
+);
+
+
+----------- bsky app/index tables
+
+
diff --git a/adenosine-pds/src/bin/adenosine-pds.rs b/adenosine-pds/src/bin/adenosine-pds.rs
index 44c4cef..b76d015 100644
--- a/adenosine-pds/src/bin/adenosine-pds.rs
+++ b/adenosine-pds/src/bin/adenosine-pds.rs
@@ -80,11 +80,9 @@ fn main() -> Result<()> {
match opt.cmd {
Command::Serve { port } => {
// TODO: log some config stuff?
- run_server(port)
- }
- Command::Import { car_path } => {
- load_car_to_sqlite(&opt.blockstore_db_path, &car_path)
+ run_server(port, &opt.blockstore_db_path, &opt.atp_db_path)
}
+ Command::Import { car_path } => load_car_to_sqlite(&opt.blockstore_db_path, &car_path),
Command::Inspect {} => dump_mst_keys(&opt.blockstore_db_path),
}
}
diff --git a/adenosine-pds/src/car.rs b/adenosine-pds/src/car.rs
new file mode 100644
index 0000000..5731848
--- /dev/null
+++ b/adenosine-pds/src/car.rs
@@ -0,0 +1,61 @@
+use anyhow::Result;
+
+use futures::TryStreamExt;
+use ipfs_sqlite_block_store::BlockStore;
+use iroh_car::CarReader;
+use libipld::block::Block;
+use std::path::PathBuf;
+use tokio::fs::File;
+use tokio::io::BufReader;
+
+pub fn load_car_to_sqlite(db_path: &PathBuf, car_path: &PathBuf) -> Result<()> {
+ let mut db: BlockStore<libipld::DefaultParams> =
+ { BlockStore::open(db_path, Default::default())? };
+
+ load_car_to_blockstore(&mut db, car_path)
+}
+
+pub fn load_car_to_blockstore(
+ db: &mut BlockStore<libipld::DefaultParams>,
+ car_path: &PathBuf,
+) -> Result<()> {
+ let rt = tokio::runtime::Builder::new_current_thread()
+ .enable_all()
+ .build()?;
+ rt.block_on(inner_car_loader(db, car_path))
+}
+
+// this async function is wrapped in the sync version above
+async fn inner_car_loader(
+ db: &mut BlockStore<libipld::DefaultParams>,
+ car_path: &PathBuf,
+) -> Result<()> {
+ println!(
+ "{} - {}",
+ std::env::current_dir()?.display(),
+ car_path.display()
+ );
+ let car_reader = {
+ let file = File::open(car_path).await?;
+ let buf_reader = BufReader::new(file);
+ CarReader::new(buf_reader).await?
+ };
+ let car_header = car_reader.header().clone();
+
+ car_reader
+ .stream()
+ .try_for_each(|(cid, raw)| {
+ // TODO: error handling here instead of unwrap (?)
+ let block = Block::new(cid, raw).unwrap();
+ db.put_block(block, None).unwrap();
+ futures::future::ready(Ok(()))
+ })
+ .await?;
+
+ // pin the header
+ if car_header.roots().len() >= 1 {
+ db.alias(b"import".to_vec(), Some(&car_header.roots()[0]))?;
+ }
+
+ Ok(())
+}
diff --git a/adenosine-pds/src/db.rs b/adenosine-pds/src/db.rs
new file mode 100644
index 0000000..1d6d4ed
--- /dev/null
+++ b/adenosine-pds/src/db.rs
@@ -0,0 +1,137 @@
+use crate::AtpSession;
+/// ATP database (as distinct from blockstore)
+use anyhow::{anyhow, Result};
+use lazy_static::lazy_static;
+use rusqlite::{params, Connection};
+use rusqlite_migration::{Migrations, M};
+use serde_json::Value;
+use std::path::PathBuf;
+use std::str::FromStr;
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn migrations_test() {
+ assert!(MIGRATIONS.validate().is_ok());
+ }
+}
+
+lazy_static! {
+ static ref MIGRATIONS: Migrations<'static> =
+ Migrations::new(vec![M::up(include_str!("atp_db.sql")),]);
+}
+
+#[derive(Debug)]
+pub struct AtpDatabase {
+ conn: Connection,
+}
+
+impl AtpDatabase {
+ pub fn open(path: &PathBuf) -> Result<Self> {
+ let mut conn = Connection::open(path)?;
+ MIGRATIONS.to_latest(&mut conn)?;
+ // any pragma would happen here
+ Ok(AtpDatabase { conn })
+ }
+
+ /// temporary database, eg for tests.
+ ///
+ /// TODO: should create a tmp file on ramdisk (/var/tmp?) instead of opening an in-memory
+ /// database. in-memory database can't be used with multiple connections
+ pub fn open_ephemeral() -> Result<Self> {
+ let mut conn = Connection::open_in_memory()?;
+ MIGRATIONS.to_latest(&mut conn)?;
+ // any pragma would happen here
+ Ok(AtpDatabase { conn })
+ }
+
+ /// Creates an entirely new connection to the same database
+ ///
+ /// Skips re-running migrations.
+ ///
+ /// Fails for ephemeral databases.
+ pub fn new_connection(&self) -> Result<Self> {
+ // TODO: let path = std::path::PathBuf::from(self.conn.path().ok_or(Err(anyhow!("expected real database")))?);
+ let path = std::path::PathBuf::from(self.conn.path().expect("expected real database"));
+ let conn = Connection::open(path)?;
+ Ok(AtpDatabase { conn })
+ }
+
+ pub fn get_record(&mut self, did: &str, collection: &str, tid: &str) -> Result<Value> {
+ let mut stmt = self.conn.prepare_cached(
+ "SELECT record_json FROM record WHERE did = ?1 collection = ?2 tid = ?3",
+ )?;
+ Ok(stmt.query_row(params!(did, collection, tid), |row| {
+ row.get(0).map(|v: String| Value::from_str(&v))
+ })??)
+ }
+
+ pub fn get_record_list(&mut self, did: &str, collection: &str) -> Result<Vec<String>> {
+ let mut stmt = self
+ .conn
+ .prepare_cached("SELECT tid FROM record WHERE did = ?1 collection = ?2")?;
+ let ret = stmt
+ .query_and_then(params!(did, collection), |row| {
+ let v: String = row.get(0)?;
+ Ok(v)
+ })?
+ .collect();
+ ret
+ }
+
+ pub fn get_collection_list(&mut self, did: &str) -> Result<Vec<String>> {
+ let mut stmt = self
+ .conn
+ .prepare_cached("SELECT collection FROM record WHERE did = ?1 GROUP BY collection")?;
+ let ret = stmt
+ .query_and_then(params!(did), |row| {
+ let v: String = row.get(0)?;
+ Ok(v)
+ })?
+ .collect();
+ ret
+ }
+
+ pub fn create_account(
+ &mut self,
+ username: &str,
+ password: &str,
+ email: &str,
+ ) -> Result<AtpSession> {
+ // TODO: validate email (regex?)
+ // TODO: validate username
+ // TODO: generate and store signing key
+ // TODO: generate plc did (randomly for now?)
+ // TODO: insert did_doc
+ // TODO: also need to initialize repo with... profile?
+ {
+ let password_bcrypt = bcrypt::hash(password, bcrypt::DEFAULT_COST)?;
+ let signing_key = "key:TODO";
+ let did = "did:TODO";
+ let mut stmt = self
+ .conn
+ .prepare_cached("INSERT INTO account (username, password_bcrypt, email, did, signing_key) VALUES (?1, ?2, ?3, ?4, ?5)")?;
+ stmt.execute(params!(username, password_bcrypt, email, did, signing_key))?;
+ }
+ self.create_session(username, password)
+ }
+
+ pub fn create_session(&mut self, username: &str, password: &str) -> Result<AtpSession> {
+ let mut stmt = self
+ .conn
+ .prepare_cached("SELECT password_bcrypt FROM account WHERE username = ?1")?;
+ let password_bcrypt: String = stmt.query_row(params!(username), |row| row.get(0))?;
+ if !bcrypt::verify(password, &password_bcrypt)? {
+ return Err(anyhow!("password did not match"));
+ }
+ // TODO: generate JWT
+ // TODO: insert session wtih JWT
+ Ok(AtpSession {
+ name: username.to_string(),
+ did: "did:TODO".to_string(),
+ jwt: "jwt:TODO".to_string(),
+ })
+ }
+}
diff --git a/adenosine-pds/src/lib.rs b/adenosine-pds/src/lib.rs
index 1785640..a8dc46a 100644
--- a/adenosine-pds/src/lib.rs
+++ b/adenosine-pds/src/lib.rs
@@ -1,23 +1,29 @@
use anyhow::Result;
use log::{error, info};
use rouille::{router, Request, Response};
+use std::path::PathBuf;
+use std::sync::Mutex;
-use futures::TryStreamExt;
use ipfs_sqlite_block_store::BlockStore;
-use iroh_car::CarReader;
-use libipld::block::Block;
-use std::path::PathBuf;
-use tokio::fs::File;
-use tokio::io::BufReader;
+mod car;
+mod db;
+mod models;
mod mst;
+pub use car::{load_car_to_blockstore, load_car_to_sqlite};
+pub use db::AtpDatabase;
+pub use models::*;
pub use mst::{dump_mst_keys, repro_mst};
-pub fn run_server(port: u16) -> Result<()> {
- // TODO: log access requests
+pub fn run_server(port: u16, blockstore_db_path: &PathBuf, atp_db_path: &PathBuf) -> Result<()> {
// TODO: some static files? https://github.com/tomaka/rouille/blob/master/examples/static-files.rs
+ // TODO: could just open connection on every request?
+ let db = Mutex::new(AtpDatabase::open(atp_db_path)?);
+ let mut _blockstore: BlockStore<libipld::DefaultParams> =
+ BlockStore::open(blockstore_db_path, Default::default())?;
+
let log_ok = |req: &Request, _resp: &Response, elap: std::time::Duration| {
info!("{} {} ({:?})", req.method(), req.raw_url(), elap);
};
@@ -43,50 +49,15 @@ pub fn run_server(port: u16) -> Result<()> {
Response::text("didn't get other thing")
// TODO: parse and echo back JSON body
},
+
+ (GET) ["/xrpc/com.atproto.getRecord"] => {
+ // TODO: JSON response
+ // TODO: handle error
+ let mut db = db.lock().unwrap().new_connection().unwrap();
+ Response::text(db.get_record("asdf", "123", "blah").unwrap().to_string())
+ },
_ => rouille::Response::empty_404()
)
})
});
}
-
-pub fn load_car_to_sqlite(db_path: &PathBuf, car_path: &PathBuf) -> Result<()> {
- let mut db: BlockStore<libipld::DefaultParams> =
- { BlockStore::open(db_path, Default::default())? };
-
- load_car_to_blockstore(&mut db, car_path)
-}
-
-pub fn load_car_to_blockstore(db: &mut BlockStore<libipld::DefaultParams>, car_path: &PathBuf) -> Result<()> {
- let rt = tokio::runtime::Builder::new_current_thread()
- .enable_all()
- .build()?;
- rt.block_on(inner_car_loader(db, car_path))
-}
-
-// this async function is wrapped in the sync version above
-async fn inner_car_loader(db: &mut BlockStore<libipld::DefaultParams>, car_path: &PathBuf) -> Result<()> {
- println!("{} - {}", std::env::current_dir()?.display(), car_path.display());
- let car_reader = {
- let file = File::open(car_path).await?;
- let buf_reader = BufReader::new(file);
- CarReader::new(buf_reader).await?
- };
- let car_header = car_reader.header().clone();
-
- car_reader
- .stream()
- .try_for_each(|(cid, raw)| {
- // TODO: error handling here instead of unwrap (?)
- let block = Block::new(cid, raw).unwrap();
- db.put_block(block, None).unwrap();
- futures::future::ready(Ok(()))
- })
- .await?;
-
- // pin the header
- if car_header.roots().len() >= 1 {
- db.alias(b"import".to_vec(), Some(&car_header.roots()[0]))?;
- }
-
- Ok(())
-}
diff --git a/adenosine-pds/src/models.rs b/adenosine-pds/src/models.rs
new file mode 100644
index 0000000..6f9bf81
--- /dev/null
+++ b/adenosine-pds/src/models.rs
@@ -0,0 +1,8 @@
+use serde;
+
+#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq)]
+pub struct AtpSession {
+ pub jwt: String,
+ pub name: String,
+ pub did: String,
+}
diff --git a/adenosine-pds/src/mst.rs b/adenosine-pds/src/mst.rs
index a2a394f..429d8c8 100644
--- a/adenosine-pds/src/mst.rs
+++ b/adenosine-pds/src/mst.rs
@@ -1,3 +1,4 @@
+use crate::load_car_to_blockstore;
use anyhow::{anyhow, Result};
use ipfs_sqlite_block_store::BlockStore;
use libipld::cbor::DagCborCodec;
@@ -9,7 +10,6 @@ use libipld::{Cid, DagCbor};
use log::{debug, error, info};
use std::collections::BTreeMap;
use std::path::PathBuf;
-use crate::load_car_to_blockstore;
#[derive(Debug, DagCbor, PartialEq, Eq)]
struct CommitNode {
@@ -85,8 +85,7 @@ fn print_mst_keys(db: &mut BlockStore<libipld::DefaultParams>, cid: &Cid) -> Res
}
pub fn dump_mst_keys(db_path: &PathBuf) -> Result<()> {
- let mut db: BlockStore<libipld::DefaultParams> =
- { BlockStore::open(db_path, Default::default())? };
+ let mut db: BlockStore<libipld::DefaultParams> = BlockStore::open(db_path, Default::default())?;
let all_aliases: Vec<(Vec<u8>, Cid)> = db.aliases()?;
if all_aliases.is_empty() {
@@ -287,11 +286,9 @@ fn serialize_wip_tree(
}
pub fn repro_mst(car_path: &PathBuf) -> Result<()> {
-
// open a temp block store
- let mut db: BlockStore<libipld::DefaultParams> = {
- BlockStore::open_path(ipfs_sqlite_block_store::DbPath::Memory, Default::default())?
- };
+ let mut db: BlockStore<libipld::DefaultParams> =
+ { BlockStore::open_path(ipfs_sqlite_block_store::DbPath::Memory, Default::default())? };
// load CAR contents from file
load_car_to_blockstore(&mut db, car_path)?;