summaryrefslogtreecommitdiffstats
path: root/adenosine-pds/src/db.rs
diff options
context:
space:
mode:
Diffstat (limited to 'adenosine-pds/src/db.rs')
-rw-r--r--adenosine-pds/src/db.rs87
1 files changed, 85 insertions, 2 deletions
diff --git a/adenosine-pds/src/db.rs b/adenosine-pds/src/db.rs
index 2249637..384b498 100644
--- a/adenosine-pds/src/db.rs
+++ b/adenosine-pds/src/db.rs
@@ -1,7 +1,11 @@
+use crate::models::{FollowRecord, Post, RefRecord};
/// ATP database (as distinct from blockstore)
-use crate::{AtpSession, Did, KeyPair};
+use crate::{ipld_into_json_value, AtpSession, Did, KeyPair, Tid};
use anyhow::{anyhow, Result};
use lazy_static::lazy_static;
+use libipld::cbor::DagCborCodec;
+use libipld::multihash::Code;
+use libipld::{Block, DefaultParams, Ipld};
use log::debug;
use rusqlite::{params, Connection, OptionalExtension};
use rusqlite_migration::{Migrations, M};
@@ -29,7 +33,7 @@ lazy_static! {
#[derive(Debug)]
pub struct AtpDatabase {
- conn: Connection,
+ pub conn: Connection,
}
impl AtpDatabase {
@@ -156,4 +160,83 @@ impl AtpDatabase {
let doc_json: String = stmt.query_row(params!(did.to_string()), |row| row.get(0))?;
Ok(Value::from_str(&doc_json)?)
}
+
+ pub fn bsky_upsert_post(&mut self, did: &Did, tid: &Tid, val: Option<Ipld>) -> Result<()> {
+ if let Some(val) = val {
+ // need to re-compute the CID from DagCbor re-encoding, I guess. bleh.
+ let block = Block::<DefaultParams>::encode(DagCborCodec, Code::Sha2_256, &val)?;
+ let cid = *block.cid();
+ let post: Post = serde_json::from_value(ipld_into_json_value(val))?;
+ let mut stmt = self
+ .conn
+ .prepare_cached("INSERT INTO bsky_post (did, tid, cid, record_json, created_at) VALUES (?1, ?2, ?3, ?4, ?5)")?;
+ stmt.execute(params!(
+ did.to_string(),
+ tid.to_string(),
+ cid.to_string(),
+ serde_json::to_string(&post)?,
+ post.createdAt
+ ))?;
+ } else {
+ let mut stmt = self
+ .conn
+ .prepare_cached("DELETE FROM bsky_post WHERE did = ?1 AND tid = ?2")?;
+ stmt.execute(params!(did.to_string(), tid.to_string()))?;
+ }
+ Ok(())
+ }
+
+ pub fn bsky_upsert_ref(
+ &mut self,
+ ref_type: &str,
+ did: &Did,
+ tid: &Tid,
+ val: Option<Ipld>,
+ ) -> Result<()> {
+ if let Some(val) = val {
+ let ref_obj: RefRecord = serde_json::from_value(ipld_into_json_value(val))?;
+ let mut stmt = self
+ .conn
+ .prepare_cached("INSERT INTO bsky_ref (ref_type, did, tid, subject_uri, subject_cid, created_at) VALUES (?1, ?2, ?3, ?4, ?5, ?6)")?;
+ stmt.execute(params!(
+ ref_type.to_string(),
+ did.to_string(),
+ tid.to_string(),
+ ref_obj.subject.uri,
+ ref_obj.subject.cid,
+ ref_obj.createdAt
+ ))?;
+ } else {
+ let mut stmt = self.conn.prepare_cached(
+ "DELETE FROM bsky_ref WHERE ref_type = ?1 AND did = ?2 AND tid = ?3",
+ )?;
+ stmt.execute(params!(
+ ref_type.to_string(),
+ did.to_string(),
+ tid.to_string()
+ ))?;
+ }
+ Ok(())
+ }
+
+ pub fn bsky_upsert_follow(&mut self, did: &Did, tid: &Tid, val: Option<Ipld>) -> Result<()> {
+ if let Some(val) = val {
+ let follow: FollowRecord = serde_json::from_value(ipld_into_json_value(val))?;
+ let mut stmt = self
+ .conn
+ .prepare_cached("INSERT INTO bsky_follow (did, tid, subject_did, created_at) VALUES (?1, ?2, ?3, ?4)")?;
+ stmt.execute(params!(
+ did.to_string(),
+ tid.to_string(),
+ follow.subject.did,
+ follow.createdAt
+ ))?;
+ } else {
+ let mut stmt = self
+ .conn
+ .prepare_cached("DELETE FROM bsky_follow WHERE did = ?2 AND tid = ?3")?;
+ stmt.execute(params!(did.to_string(), tid.to_string()))?;
+ }
+ Ok(())
+ }
}