diff options
author | Bryan Newbold <bnewbold@robocracy.org> | 2022-11-08 19:06:05 -0800 |
---|---|---|
committer | Bryan Newbold <bnewbold@robocracy.org> | 2022-11-08 19:06:05 -0800 |
commit | b8eea211866766aabde8c5e55d1061deb799ddc6 (patch) | |
tree | ef11ca35c7bdd8b80744bfe7ba71a299e737ade3 /adenosine-pds/src/db.rs | |
parent | 02cd7b33d090db2aa47126a4d1aeecb247e7b7ef (diff) | |
download | adenosine-b8eea211866766aabde8c5e55d1061deb799ddc6.tar.gz adenosine-b8eea211866766aabde8c5e55d1061deb799ddc6.zip |
pds: start implementing bsky database ops and XRPC endpoints
Diffstat (limited to 'adenosine-pds/src/db.rs')
-rw-r--r-- | adenosine-pds/src/db.rs | 87 |
1 files changed, 85 insertions, 2 deletions
diff --git a/adenosine-pds/src/db.rs b/adenosine-pds/src/db.rs index 2249637..384b498 100644 --- a/adenosine-pds/src/db.rs +++ b/adenosine-pds/src/db.rs @@ -1,7 +1,11 @@ +use crate::models::{FollowRecord, Post, RefRecord}; /// ATP database (as distinct from blockstore) -use crate::{AtpSession, Did, KeyPair}; +use crate::{ipld_into_json_value, AtpSession, Did, KeyPair, Tid}; use anyhow::{anyhow, Result}; use lazy_static::lazy_static; +use libipld::cbor::DagCborCodec; +use libipld::multihash::Code; +use libipld::{Block, DefaultParams, Ipld}; use log::debug; use rusqlite::{params, Connection, OptionalExtension}; use rusqlite_migration::{Migrations, M}; @@ -29,7 +33,7 @@ lazy_static! { #[derive(Debug)] pub struct AtpDatabase { - conn: Connection, + pub conn: Connection, } impl AtpDatabase { @@ -156,4 +160,83 @@ impl AtpDatabase { let doc_json: String = stmt.query_row(params!(did.to_string()), |row| row.get(0))?; Ok(Value::from_str(&doc_json)?) } + + pub fn bsky_upsert_post(&mut self, did: &Did, tid: &Tid, val: Option<Ipld>) -> Result<()> { + if let Some(val) = val { + // need to re-compute the CID from DagCbor re-encoding, I guess. bleh. + let block = Block::<DefaultParams>::encode(DagCborCodec, Code::Sha2_256, &val)?; + let cid = *block.cid(); + let post: Post = serde_json::from_value(ipld_into_json_value(val))?; + let mut stmt = self + .conn + .prepare_cached("INSERT INTO bsky_post (did, tid, cid, record_json, created_at) VALUES (?1, ?2, ?3, ?4, ?5)")?; + stmt.execute(params!( + did.to_string(), + tid.to_string(), + cid.to_string(), + serde_json::to_string(&post)?, + post.createdAt + ))?; + } else { + let mut stmt = self + .conn + .prepare_cached("DELETE FROM bsky_post WHERE did = ?1 AND tid = ?2")?; + stmt.execute(params!(did.to_string(), tid.to_string()))?; + } + Ok(()) + } + + pub fn bsky_upsert_ref( + &mut self, + ref_type: &str, + did: &Did, + tid: &Tid, + val: Option<Ipld>, + ) -> Result<()> { + if let Some(val) = val { + let ref_obj: RefRecord = serde_json::from_value(ipld_into_json_value(val))?; + let mut stmt = self + .conn + .prepare_cached("INSERT INTO bsky_ref (ref_type, did, tid, subject_uri, subject_cid, created_at) VALUES (?1, ?2, ?3, ?4, ?5, ?6)")?; + stmt.execute(params!( + ref_type.to_string(), + did.to_string(), + tid.to_string(), + ref_obj.subject.uri, + ref_obj.subject.cid, + ref_obj.createdAt + ))?; + } else { + let mut stmt = self.conn.prepare_cached( + "DELETE FROM bsky_ref WHERE ref_type = ?1 AND did = ?2 AND tid = ?3", + )?; + stmt.execute(params!( + ref_type.to_string(), + did.to_string(), + tid.to_string() + ))?; + } + Ok(()) + } + + pub fn bsky_upsert_follow(&mut self, did: &Did, tid: &Tid, val: Option<Ipld>) -> Result<()> { + if let Some(val) = val { + let follow: FollowRecord = serde_json::from_value(ipld_into_json_value(val))?; + let mut stmt = self + .conn + .prepare_cached("INSERT INTO bsky_follow (did, tid, subject_did, created_at) VALUES (?1, ?2, ?3, ?4)")?; + stmt.execute(params!( + did.to_string(), + tid.to_string(), + follow.subject.did, + follow.createdAt + ))?; + } else { + let mut stmt = self + .conn + .prepare_cached("DELETE FROM bsky_follow WHERE did = ?2 AND tid = ?3")?; + stmt.execute(params!(did.to_string(), tid.to_string()))?; + } + Ok(()) + } } |