From b8eea211866766aabde8c5e55d1061deb799ddc6 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 8 Nov 2022 19:06:05 -0800 Subject: pds: start implementing bsky database ops and XRPC endpoints --- adenosine-pds/src/bsky.rs | 233 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 233 insertions(+) create mode 100644 adenosine-pds/src/bsky.rs (limited to 'adenosine-pds/src/bsky.rs') diff --git a/adenosine-pds/src/bsky.rs b/adenosine-pds/src/bsky.rs new file mode 100644 index 0000000..eeab9e3 --- /dev/null +++ b/adenosine-pds/src/bsky.rs @@ -0,0 +1,233 @@ +use crate::models::*; +use crate::repo::Mutation; +/// Helper functions for doing database and repo operations relating to bluesky endpoints and +/// records +use crate::{ + ipld_into_json_value, json_value_into_ipld, AtpDatabase, AtpService, Did, Result, Tid, +}; +use adenosine_cli::identifiers::{AtUri, Nsid}; +use libipld::Cid; +use rusqlite::params; +use serde_json::json; +use std::str::FromStr; + +/// Handles updating the database with creation, update, deletion of arbitrary records +pub fn bsky_mutate_db(db: &mut AtpDatabase, did: &Did, mutations: Vec) -> Result<()> { + // TODO: this function could probably be refactored + let bsky_post: Nsid = Nsid::from_str("app.bsky.feed.post").unwrap(); + let bsky_repost: Nsid = Nsid::from_str("app.bsky.feed.repost").unwrap(); + let bsky_like: Nsid = Nsid::from_str("app.bsky.feed.like").unwrap(); + let bsky_follow: Nsid = Nsid::from_str("app.bsky.graph.follow").unwrap(); + for m in mutations.into_iter() { + match m { + Mutation::Create(ref_type, tid, val) | Mutation::Update(ref_type, tid, val) + if ref_type == bsky_post => + { + db.bsky_upsert_post(did, &tid, Some(val))? + } + Mutation::Delete(ref_type, tid) if ref_type == bsky_post => { + db.bsky_upsert_post(did, &tid, None)? + } + Mutation::Create(ref_type, tid, val) | Mutation::Update(ref_type, tid, val) + if ref_type == bsky_repost => + { + db.bsky_upsert_ref("repost", did, &tid, Some(val))? + } + Mutation::Delete(ref_type, tid) if ref_type == bsky_repost => { + db.bsky_upsert_ref("repost", did, &tid, None)? + } + Mutation::Create(ref_type, tid, val) | Mutation::Update(ref_type, tid, val) + if ref_type == bsky_like => + { + db.bsky_upsert_ref("like", did, &tid, Some(val))? + } + Mutation::Delete(ref_type, tid) if ref_type == bsky_like => { + db.bsky_upsert_ref("like", did, &tid, None)? + } + Mutation::Create(ref_type, tid, val) | Mutation::Update(ref_type, tid, val) + if ref_type == bsky_follow => + { + db.bsky_upsert_follow(did, &tid, Some(val))? + } + Mutation::Delete(ref_type, tid) if ref_type == bsky_follow => { + db.bsky_upsert_follow(did, &tid, None)? + } + _ => (), + } + } + Ok(()) +} + +pub fn bsky_get_profile(srv: &mut AtpService, did: &Did) -> Result { + // first get the profile record + let mut profile_cid: Option = None; + let commit_cid = &srv.repo.lookup_commit(&did)?.unwrap(); + let last_commit = srv.repo.get_commit(commit_cid)?; + let full_map = srv.repo.mst_to_map(&last_commit.mst_cid)?; + let prefix = "/app.bsky.actor.profile/"; + for (mst_key, cid) in full_map.iter() { + if mst_key.starts_with(&prefix) { + profile_cid = Some(*cid); + } + } + let (display_name, description): (Option, Option) = + if let Some(cid) = profile_cid { + let record: ProfileRecord = + serde_json::from_value(ipld_into_json_value(srv.repo.get_ipld(&cid)?))?; + (Some(record.displayName), record.description) + } else { + (None, None) + }; + let mut stmt = srv + .atp_db + .conn + .prepare_cached("SELECT handle FROM account WHERE did = $1")?; + let handle: String = stmt.query_row(params!(did.to_string()), |row| row.get(0))?; + let mut stmt = srv + .atp_db + .conn + .prepare_cached("SELECT COUNT(*) FROM bsky_post WHERE did = $1")?; + let post_count: u64 = stmt.query_row(params!(did.to_string()), |row| row.get(0))?; + let mut stmt = srv + .atp_db + .conn + .prepare_cached("SELECT COUNT(*) FROM bsky_ref WHERE ref_type = 'follow' AND did = $1")?; + let follows_count: u64 = stmt.query_row(params!(did.to_string()), |row| row.get(0))?; + let uri = format!("at://{}", did); + let mut stmt = srv + .atp_db + .conn + .prepare_cached("SELECT COUNT(*) FROM bsky_ref WHERE ref_type = 'follow' AND uri = $1")?; + let followers_count: u64 = stmt.query_row(params!(uri), |row| row.get(0))?; + Ok(Profile { + did: did.to_string(), + handle: handle, + displayName: display_name, + description: description, + followersCount: followers_count, + followsCount: follows_count, + postsCount: post_count, + myState: json!({}), + }) +} + +pub fn bsky_update_profile(srv: &mut AtpService, did: &Did, profile: ProfileRecord) -> Result<()> { + // get the profile record + let mut profile_tid: Option = None; + let commit_cid = &srv.repo.lookup_commit(&did)?.unwrap(); + let last_commit = srv.repo.get_commit(commit_cid)?; + let full_map = srv.repo.mst_to_map(&last_commit.mst_cid)?; + let prefix = "/app.bsky.actor.profile/"; + for (mst_key, _cid) in full_map.iter() { + if mst_key.starts_with(&prefix) { + profile_tid = Some(Tid::from_str(mst_key.split('/').nth(1).unwrap())?); + } + } + let profile_tid: Tid = profile_tid.unwrap_or(srv.tid_gen.next_tid()); + let mutations: Vec = vec![Mutation::Update( + Nsid::from_str("app.bsky.actor.profile")?, + profile_tid, + json_value_into_ipld(serde_json::to_value(profile)?), + )]; + let keypair = srv.pds_keypair.clone(); + srv.repo.mutate_repo(&did, &mutations, &keypair)?; + Ok(()) +} + +struct FeedRow { + pub item_did: Did, + pub item_handle: String, + pub item_post_tid: Tid, + pub item_post_cid: Cid, +} + +fn feed_row(row: &rusqlite::Row) -> Result { + let item_did: String = row.get(0)?; + let item_did = Did::from_str(&item_did)?; + let item_handle = row.get(1)?; + let item_post_tid: String = row.get(2)?; + let item_post_tid = Tid::from_str(&item_post_tid)?; + let cid_string: String = row.get(3)?; + let item_post_cid = Cid::from_str(&cid_string)?; + Ok(FeedRow { + item_did, + item_handle, + item_post_tid, + item_post_cid, + }) +} + +fn feed_row_to_item(srv: &mut AtpService, row: FeedRow) -> Result { + let record_ipld = srv.repo.get_ipld(&row.item_post_cid)?; + let feed_item = FeedItem { + uri: format!( + "at://{}/{}/{}", + row.item_did, "app.bsky.feed.post", row.item_post_tid + ), + cid: row.item_post_cid.to_string(), + author: User { + did: row.item_did.to_string(), + handle: row.item_handle, + displayName: None, // TODO + }, + repostedBy: None, + record: ipld_into_json_value(record_ipld), + embed: None, + replyCount: 0, // TODO + repostCount: 0, // TODO + likeCount: 0, // TODO + indexedAt: "TODO".to_string(), // TODO + myState: None, + }; + Ok(feed_item) +} + +pub fn bsky_get_author_feed(srv: &mut AtpService, did: &Did) -> Result { + let mut feed: Vec = vec![]; + let rows = { + let mut stmt = srv.atp_db + .conn + .prepare_cached("SELECT account.did, account.handle, bsky_post.tid, bsky_post.cid, FROM bsky_post LEFT JOIN account ON bsky_post.did = account.did LEFT JOIN bsky_follow ON bsky_post.did = bsky_follow.subject_did WHERE bsky_follow.did = ?1 ORDER BY bsky_post.tid DESC LIMIT 20")?; + let mut sql_rows = stmt.query(params!(did.to_string()))?; + let mut rows: Vec = vec![]; + while let Some(sql_row) = sql_rows.next()? { + let row = feed_row(sql_row)?; + rows.push(row); + } + rows + }; + for row in rows { + feed.push(feed_row_to_item(srv, row)?); + } + Ok(GenericFeed { feed }) +} + +pub fn bsky_get_timeline(srv: &mut AtpService, did: &Did) -> Result { + let mut feed: Vec = vec![]; + let rows = { + let mut stmt = srv.atp_db + .conn + .prepare_cached("SELECT account.did, account.handle, bsky_post.tid, bsky_post.cid, FROM bsky_post LEFT JOIN account ON bsky_post.did = account.did WHERE bsky_post.did = ?1 ORDER BY bsky_post.tid DESC LIMIT 20")?; + let mut sql_rows = stmt.query(params!(did.to_string()))?; + let mut rows: Vec = vec![]; + while let Some(sql_row) = sql_rows.next()? { + let row = feed_row(sql_row)?; + rows.push(row); + } + rows + }; + for row in rows { + feed.push(feed_row_to_item(srv, row)?); + } + Ok(GenericFeed { feed }) +} + +pub fn bsky_get_thread( + _srv: &mut AtpService, + _uri: &AtUri, + _depth: Option, +) -> Result { + // TODO: what is the best way to implement this? recurisvely? just first-level children to + // start? + unimplemented!() +} -- cgit v1.2.3