From 10a90a353a3bb46652a42b0e01834fd158fd600d Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Mon, 31 Oct 2022 15:14:36 -0700 Subject: PDS: early progress --- adenosine-pds/plan.txt | 148 ++++++++++++++++++++++++++++++++- adenosine-pds/src/bin/adenosine-pds.rs | 90 ++++++++++++++++++++ adenosine-pds/src/lib.rs | 42 ++++++++++ adenosine-pds/src/main.rs | 3 - 4 files changed, 278 insertions(+), 5 deletions(-) create mode 100644 adenosine-pds/src/bin/adenosine-pds.rs create mode 100644 adenosine-pds/src/lib.rs delete mode 100644 adenosine-pds/src/main.rs diff --git a/adenosine-pds/plan.txt b/adenosine-pds/plan.txt index d615d77..5f6a8f0 100644 --- a/adenosine-pds/plan.txt +++ b/adenosine-pds/plan.txt @@ -3,13 +3,50 @@ PDS proof of concept: x ipld sqlite driver importing CAR file => simple binary, two args - MST code to read and mutate tree state + => just read the whole tree and then write the whole tree => with tests -- implement basic non-authenticated CRUD on repository +- skeleton + env config: DB paths, port + commands: serve, import, resolve, generate-secret + warp: homepage, serving XRPC endpoint (or individually?) + database wrappers + integration tests +- implement basic non-authenticated CRUD on repository, test with CLI + createAccount + repoGetRecord + repoListRecords + repoBatchWrite + repoCreateRecord + repoPutRecord + repoDeleteRecord + syncGetRepo + syncGetRoot + syncUpdateRepo ? python test script - sqlite schema (for application) - write wrapper which updates MST *and* updates other tables in a transaction - JSON schema type generation (separate crate?) -- HTTP API handler implementing most endpoints +- HTTP API handler implementing many endpoints + com.atproto + createSession + getAccountsConfig + getSession + repoDescribe + resolveName + app.bsky + getHomeFeed + getAuthorFeed + getLikedBy + getNotificationCount + getNotifications + getPostThread + getProfile + getRepostedBy + getUserFollowers + getUserFollows + getUsersSearch + postNotificationsSeen + updateProfile - did:web handler? other utils/helpers: @@ -21,8 +58,115 @@ libraries: - `rusqlite` with "bundled" sqlite for datastore - `ipfs-sqlite-block-store` and `libipld` to parse and persist repo content - `warp` as async HTTP service +- `deadpool-sqlite` or `tokio-rusqlite` to use rusqlite from async code? - `r2d2` to wrap rusqlite (?) - pretty_env_logger - ??? for CBOR (de)serialization of MST, separate from the IPLD stuff? - no good crate for working with CAR files... could rip out this code? https://github.com/n0-computer/iroh/tree/main/iroh-car + +## concurrency (in warp app) + +note that there isn't really any point in having this be async, given that we +just have a single shared sqlite on disk. could try `rouille` instead of +`warp`? + +maybe good for remote stuff like did:web resolution? + +could try using sqlx instead of rusqlite for natively-async database stuff? + +for block store: +- open a single connection at startup, store in mutex +- handlers get a reference to mutex. if they need a connection, they enter a blocking thread then: + block on the mutex, then create a new connection, unlock the mutex + do any operations on connection synchronously + exit the block + +## system tables + +account + did (PK) + username (UNIQUE, indexed) + email (UNIQUE) + password_bcrypt + signing_key + +did_doc + did (PK) + seen_at (timestamp) + +session + did + jwt + ??? + +repo + did + head_commit + +record (should this exist? good for queries) + did + collection + tid + record_cid + record_cbor (CBOR bytes? JSON?) + +password_reset + did + token + + +## atp tables + +what actually needs to be indexed? +- post replies (forwards and backwards +- likes (back index) +- follows (back index) +- usernames (as part of profile?) +- mentions? hashtags? + +additional state +- notifications + +bsky_post + did + tid (or timestamp from tid?) + text + reply_root (nullable) + reply_parent (nullable) + entities: JSON (?) + +bsky_profile + did + tid + display_name + description + my_state (JSON) + +bsky_follow + did + tid + target_did + +bsky_like + did + tid + target_uri + target_cid (what is this? the commit, or record CID?) + +bsky_repost + did + tid + target_uri + target_cid + +bsky_notification + did + created_at (timestamp) + seen (boolean) + reason + target_uri + +TODO: +- bsky_badge (etc) +- bsky_media_embed diff --git a/adenosine-pds/src/bin/adenosine-pds.rs b/adenosine-pds/src/bin/adenosine-pds.rs new file mode 100644 index 0000000..3a00fac --- /dev/null +++ b/adenosine-pds/src/bin/adenosine-pds.rs @@ -0,0 +1,90 @@ +use adenosine_pds::*; +use anyhow::Result; + +use log::{self, debug}; +use structopt::StructOpt; + + +#[derive(StructOpt)] +#[structopt( + rename_all = "kebab-case", + about = "personal digital server (PDS) implementation for AT protocol (atproto.com)" +)] +struct Opt { + // TODO: different path type for structopt? + + /// File path of sqlite database for storing IPLD blocks (aka, repository content) + #[structopt( + parse(from_os_str), + global = true, + long = "--block-db", + env = "ATP_BLOCK_DB", + default_value = "adenosine_pds_blockstore.sqlite" + )] + blockstore_db_path: std::path::PathBuf, + + /// File path of sqlite database for ATP service (user accounts, indices, etc) + #[structopt( + parse(from_os_str), + global = true, + long = "--atp-db", + env = "ATP_ATP_DB", + default_value = "adenosine_pds_atp.sqlite" + )] + atp_db_path: std::path::PathBuf, + + /// Log more messages. Pass multiple times for ever more verbosity + #[structopt(global = true, long, short = "v", parse(from_occurrences))] + verbose: i8, + + #[structopt(subcommand)] + cmd: Command, +} + +#[derive(StructOpt)] +enum Command { + /// Start ATP server as a foreground process + Serve, + + /// Import a CAR file (TODO) + Import, + + /// Dump info from databases (TODO) + Inspect, +} + +#[tokio::main] +async fn main() -> Result<()> { + let opt = Opt::from_args(); + + let log_level = match opt.verbose { + std::i8::MIN..=-1 => "none", + 0 => "warn", + 1 => "info", + 2 => "debug", + 3..=std::i8::MAX => "trace", + }; + // hyper logging is very verbose, so crank that down even if everything else is more verbose + let cli_filter = format!("{},hyper=error", log_level); + // defer to env var config, fallback to CLI settings + let log_filter = std::env::var("RUST_LOG").unwrap_or(cli_filter); + pretty_env_logger::formatted_timed_builder() + .parse_filters(&log_filter) + .init(); + + debug!("config parsed, starting up"); + + match opt.cmd { + Command::Serve {} => { + // TODO: log some config stuff? + run_server().await + }, + Command::Import {} => { + unimplemented!() + }, + Command::Inspect {} => { + unimplemented!() + }, + } +} + diff --git a/adenosine-pds/src/lib.rs b/adenosine-pds/src/lib.rs new file mode 100644 index 0000000..bb34c80 --- /dev/null +++ b/adenosine-pds/src/lib.rs @@ -0,0 +1,42 @@ + +use anyhow::Result; +use log::{self, debug}; +use warp::Filter; +use warp::reply::Response; +use std::collections::HashMap; + +pub async fn run_server() -> Result<()> { + + // GET / + let homepage = warp::path::end().map(|| "Not much to see here yet!"); + + // GET /xrpc/some.method w/ query params + let xrpc_some_get = warp::get() + .and(warp::path!("xrpc" / "some.method")) + .and(warp::query::>()) + .map(|query_params: HashMap| { + println!("query params: {:?}", query_params); + // return query params as a JSON map object + warp::reply::json(&query_params) + }); + + // POST /xrpc/other.method w/ query params + let xrpc_other_post = warp::post() + .and(warp::path!("xrpc" / "other.method")) + .and(warp::query::>()) + .and(warp::body::json()) + .map(|query_params: HashMap, body_val: serde_json::Value| { + println!("query params: {:?}", query_params); + println!("body JSON: {}", body_val); + // echo it back + warp::reply::json(&body_val) + }); + + let routes = homepage.or(xrpc_some_get).or(xrpc_other_post).with(warp::log("adenosine-pds")); + warp::serve(routes) + .run(([127, 0, 0, 1], 3030)) + .await; + Ok(()) +} + +// TODO: tokio::task::spawn_blocking diff --git a/adenosine-pds/src/main.rs b/adenosine-pds/src/main.rs deleted file mode 100644 index e7a11a9..0000000 --- a/adenosine-pds/src/main.rs +++ /dev/null @@ -1,3 +0,0 @@ -fn main() { - println!("Hello, world!"); -} -- cgit v1.2.3