summaryrefslogtreecommitdiffstats
path: root/adenosine-pds/src/bsky.rs
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2022-11-09 15:35:17 -0800
committerBryan Newbold <bnewbold@robocracy.org>2022-11-09 15:35:38 -0800
commit7af51292ddfc81d55dd5beff07c557757bb0075f (patch)
tree6601cc2c73adab784f791f650e08fd9dc92aa65b /adenosine-pds/src/bsky.rs
parentb8eea211866766aabde8c5e55d1061deb799ddc6 (diff)
downloadadenosine-7af51292ddfc81d55dd5beff07c557757bb0075f.tar.gz
adenosine-7af51292ddfc81d55dd5beff07c557757bb0075f.zip
pds: more bsky posts/feeds/follow/like progress
Diffstat (limited to 'adenosine-pds/src/bsky.rs')
-rw-r--r--adenosine-pds/src/bsky.rs372
1 files changed, 353 insertions, 19 deletions
diff --git a/adenosine-pds/src/bsky.rs b/adenosine-pds/src/bsky.rs
index eeab9e3..66b05eb 100644
--- a/adenosine-pds/src/bsky.rs
+++ b/adenosine-pds/src/bsky.rs
@@ -91,14 +91,13 @@ pub fn bsky_get_profile(srv: &mut AtpService, did: &Did) -> Result<Profile> {
let mut stmt = srv
.atp_db
.conn
- .prepare_cached("SELECT COUNT(*) FROM bsky_ref WHERE ref_type = 'follow' AND did = $1")?;
+ .prepare_cached("SELECT COUNT(*) FROM bsky_follow WHERE did = $1")?;
let follows_count: u64 = stmt.query_row(params!(did.to_string()), |row| row.get(0))?;
- let uri = format!("at://{}", did);
let mut stmt = srv
.atp_db
.conn
- .prepare_cached("SELECT COUNT(*) FROM bsky_ref WHERE ref_type = 'follow' AND uri = $1")?;
- let followers_count: u64 = stmt.query_row(params!(uri), |row| row.get(0))?;
+ .prepare_cached("SELECT COUNT(*) FROM bsky_follow WHERE subject_did = $1")?;
+ let followers_count: u64 = stmt.query_row(params!(did.to_string()), |row| row.get(0))?;
Ok(Profile {
did: did.to_string(),
handle: handle,
@@ -120,7 +119,7 @@ pub fn bsky_update_profile(srv: &mut AtpService, did: &Did, profile: ProfileReco
let prefix = "/app.bsky.actor.profile/";
for (mst_key, _cid) in full_map.iter() {
if mst_key.starts_with(&prefix) {
- profile_tid = Some(Tid::from_str(mst_key.split('/').nth(1).unwrap())?);
+ profile_tid = Some(Tid::from_str(mst_key.split('/').nth(2).unwrap())?);
}
}
let profile_tid: Tid = profile_tid.unwrap_or(srv.tid_gen.next_tid());
@@ -139,6 +138,7 @@ struct FeedRow {
pub item_handle: String,
pub item_post_tid: Tid,
pub item_post_cid: Cid,
+ pub indexed_at: String,
}
fn feed_row(row: &rusqlite::Row) -> Result<FeedRow> {
@@ -149,45 +149,66 @@ fn feed_row(row: &rusqlite::Row) -> Result<FeedRow> {
let item_post_tid = Tid::from_str(&item_post_tid)?;
let cid_string: String = row.get(3)?;
let item_post_cid = Cid::from_str(&cid_string)?;
+ let indexed_at: String = row.get(4)?;
Ok(FeedRow {
item_did,
item_handle,
item_post_tid,
item_post_cid,
+ indexed_at,
})
}
fn feed_row_to_item(srv: &mut AtpService, row: FeedRow) -> Result<FeedItem> {
let record_ipld = srv.repo.get_ipld(&row.item_post_cid)?;
+ let uri = format!(
+ "at://{}/{}/{}",
+ row.item_did, "app.bsky.feed.post", row.item_post_tid
+ );
+
+ let mut stmt = srv.atp_db.conn.prepare_cached(
+ "SELECT COUNT(*) FROM bsky_ref WHERE ref_type = 'like' AND subject_uri = $1",
+ )?;
+ let like_count: u64 = stmt.query_row(params!(uri), |row| row.get(0))?;
+
+ let mut stmt = srv.atp_db.conn.prepare_cached(
+ "SELECT COUNT(*) FROM bsky_ref WHERE ref_type = 'repost' AND subject_uri = $1",
+ )?;
+ let repost_count: u64 = stmt.query_row(params!(uri), |row| row.get(0))?;
+
+ let mut stmt = srv
+ .atp_db
+ .conn
+ .prepare_cached("SELECT COUNT(*) FROM bsky_post WHERE reply_to_parent_uri = $1")?;
+ let reply_count: u64 = stmt.query_row(params!(uri), |row| row.get(0))?;
+
let feed_item = FeedItem {
- uri: format!(
- "at://{}/{}/{}",
- row.item_did, "app.bsky.feed.post", row.item_post_tid
- ),
- cid: row.item_post_cid.to_string(),
+ uri: uri,
+ cid: Some(row.item_post_cid.to_string()),
author: User {
did: row.item_did.to_string(),
handle: row.item_handle,
- displayName: None, // TODO
+ displayName: None, // TODO: fetch from profile (or cache)
},
repostedBy: None,
record: ipld_into_json_value(record_ipld),
embed: None,
- replyCount: 0, // TODO
- repostCount: 0, // TODO
- likeCount: 0, // TODO
- indexedAt: "TODO".to_string(), // TODO
+ replyCount: reply_count,
+ repostCount: repost_count,
+ likeCount: like_count,
+ indexedAt: row.indexed_at,
myState: None,
};
Ok(feed_item)
}
-pub fn bsky_get_author_feed(srv: &mut AtpService, did: &Did) -> Result<GenericFeed> {
+pub fn bsky_get_timeline(srv: &mut AtpService, did: &Did) -> Result<GenericFeed> {
let mut feed: Vec<FeedItem> = vec![];
+ // TODO: also handle reposts
let rows = {
let mut stmt = srv.atp_db
.conn
- .prepare_cached("SELECT account.did, account.handle, bsky_post.tid, bsky_post.cid, FROM bsky_post LEFT JOIN account ON bsky_post.did = account.did LEFT JOIN bsky_follow ON bsky_post.did = bsky_follow.subject_did WHERE bsky_follow.did = ?1 ORDER BY bsky_post.tid DESC LIMIT 20")?;
+ .prepare_cached("SELECT account.did, account.handle, bsky_post.tid, bsky_post.cid, bsky_post.indexed_at FROM bsky_post LEFT JOIN account ON bsky_post.did = account.did LEFT JOIN bsky_follow ON bsky_post.did = bsky_follow.subject_did WHERE bsky_follow.did = ?1 AND account.did IS NOT NULL ORDER BY bsky_post.tid DESC LIMIT 20")?;
let mut sql_rows = stmt.query(params!(did.to_string()))?;
let mut rows: Vec<FeedRow> = vec![];
while let Some(sql_row) = sql_rows.next()? {
@@ -202,12 +223,13 @@ pub fn bsky_get_author_feed(srv: &mut AtpService, did: &Did) -> Result<GenericFe
Ok(GenericFeed { feed })
}
-pub fn bsky_get_timeline(srv: &mut AtpService, did: &Did) -> Result<GenericFeed> {
+pub fn bsky_get_author_feed(srv: &mut AtpService, did: &Did) -> Result<GenericFeed> {
let mut feed: Vec<FeedItem> = vec![];
+ // TODO: also handle reposts
let rows = {
let mut stmt = srv.atp_db
.conn
- .prepare_cached("SELECT account.did, account.handle, bsky_post.tid, bsky_post.cid, FROM bsky_post LEFT JOIN account ON bsky_post.did = account.did WHERE bsky_post.did = ?1 ORDER BY bsky_post.tid DESC LIMIT 20")?;
+ .prepare_cached("SELECT account.did, account.handle, bsky_post.tid, bsky_post.cid, bsky_post.indexed_at FROM bsky_post LEFT JOIN account ON bsky_post.did = account.did WHERE bsky_post.did = ?1 ORDER BY bsky_post.tid DESC LIMIT 20")?;
let mut sql_rows = stmt.query(params!(did.to_string()))?;
let mut rows: Vec<FeedRow> = vec![];
while let Some(sql_row) = sql_rows.next()? {
@@ -231,3 +253,315 @@ pub fn bsky_get_thread(
// start?
unimplemented!()
}
+
+#[test]
+fn test_bsky_profile() {
+ use crate::{create_account, created_at_now};
+ use libipld::ipld;
+
+ let post_nsid = Nsid::from_str("app.bsky.feed.post").unwrap();
+ let follow_nsid = Nsid::from_str("app.bsky.graph.follow").unwrap();
+
+ let mut srv = AtpService::new_ephemeral().unwrap();
+ let req = AccountRequest {
+ email: "test@bogus.com".to_string(),
+ handle: "handle.test".to_string(),
+ password: "bogus".to_string(),
+ inviteCode: None,
+ recoveryKey: None,
+ };
+ let session = create_account(&mut srv, &req, true).unwrap();
+ let did = Did::from_str(&session.did).unwrap();
+ let profile = bsky_get_profile(&mut srv, &did).unwrap();
+ assert_eq!(profile.did, session.did);
+ assert_eq!(profile.handle, req.handle);
+ assert_eq!(profile.displayName, None);
+ assert_eq!(profile.description, None);
+ assert_eq!(profile.followersCount, 0);
+ assert_eq!(profile.followsCount, 0);
+ assert_eq!(profile.postsCount, 0);
+
+ let record = ProfileRecord {
+ displayName: "Test Name".to_string(),
+ description: Some("short description".to_string()),
+ };
+ bsky_update_profile(&mut srv, &did, record.clone()).unwrap();
+ let profile = bsky_get_profile(&mut srv, &did).unwrap();
+ assert_eq!(profile.displayName, Some(record.displayName));
+ assert_eq!(profile.description, record.description);
+
+ let record = ProfileRecord {
+ displayName: "New Test Name".to_string(),
+ description: Some("longer description".to_string()),
+ };
+ bsky_update_profile(&mut srv, &did, record.clone()).unwrap();
+ let profile = bsky_get_profile(&mut srv, &did).unwrap();
+ assert_eq!(profile.displayName, Some(record.displayName));
+ assert_eq!(profile.description, record.description);
+
+ let mutations = vec![
+ Mutation::Create(
+ follow_nsid.clone(),
+ srv.tid_gen.next_tid(),
+ ipld!({"subject": {"did": session.did}, "createdAt": created_at_now()}),
+ ),
+ Mutation::Create(
+ follow_nsid.clone(),
+ srv.tid_gen.next_tid(),
+ ipld!({"subject": {"did": "did:web:external.domain"}, "createdAt": created_at_now()}),
+ ),
+ Mutation::Create(
+ post_nsid.clone(),
+ srv.tid_gen.next_tid(),
+ ipld!({"text": "first post"}),
+ ),
+ Mutation::Create(
+ post_nsid.clone(),
+ srv.tid_gen.next_tid(),
+ ipld!({"text": "second post"}),
+ ),
+ Mutation::Create(
+ post_nsid.clone(),
+ srv.tid_gen.next_tid(),
+ ipld!({"text": "third post"}),
+ ),
+ ];
+ srv.repo
+ .mutate_repo(&did, &mutations, &srv.pds_keypair)
+ .unwrap();
+ bsky_mutate_db(&mut srv.atp_db, &did, mutations).unwrap();
+
+ let profile = bsky_get_profile(&mut srv, &did).unwrap();
+ assert_eq!(profile.followersCount, 1);
+ assert_eq!(profile.followsCount, 2);
+ assert_eq!(profile.postsCount, 3);
+}
+
+#[test]
+fn test_bsky_feeds() {
+ use crate::{create_account, created_at_now};
+ use libipld::ipld;
+
+ let post_nsid = Nsid::from_str("app.bsky.feed.post").unwrap();
+ let like_nsid = Nsid::from_str("app.bsky.feed.like").unwrap();
+ let repost_nsid = Nsid::from_str("app.bsky.feed.repost").unwrap();
+ let follow_nsid = Nsid::from_str("app.bsky.graph.follow").unwrap();
+
+ let mut srv = AtpService::new_ephemeral().unwrap();
+ let alice_did = {
+ let req = AccountRequest {
+ email: "alice@bogus.com".to_string(),
+ handle: "alice.test".to_string(),
+ password: "bogus".to_string(),
+ inviteCode: None,
+ recoveryKey: None,
+ };
+ let session = create_account(&mut srv, &req, true).unwrap();
+ Did::from_str(&session.did).unwrap()
+ };
+ let bob_did = {
+ let req = AccountRequest {
+ email: "bob@bogus.com".to_string(),
+ handle: "bob.test".to_string(),
+ password: "bogus".to_string(),
+ inviteCode: None,
+ recoveryKey: None,
+ };
+ let session = create_account(&mut srv, &req, true).unwrap();
+ Did::from_str(&session.did).unwrap()
+ };
+ let carol_did = {
+ let req = AccountRequest {
+ email: "carol@bogus.com".to_string(),
+ handle: "carol.test".to_string(),
+ password: "bogus".to_string(),
+ inviteCode: None,
+ recoveryKey: None,
+ };
+ let session = create_account(&mut srv, &req, true).unwrap();
+ Did::from_str(&session.did).unwrap()
+ };
+
+ // all feeds and timelines should be empty
+ let alice_feed = bsky_get_author_feed(&mut srv, &alice_did).unwrap();
+ let alice_timeline = bsky_get_timeline(&mut srv, &alice_did).unwrap();
+ assert!(alice_feed.feed.is_empty());
+ assert!(alice_timeline.feed.is_empty());
+ let bob_feed = bsky_get_author_feed(&mut srv, &bob_did).unwrap();
+ let bob_timeline = bsky_get_timeline(&mut srv, &bob_did).unwrap();
+ assert!(bob_feed.feed.is_empty());
+ assert!(bob_timeline.feed.is_empty());
+ let carol_feed = bsky_get_author_feed(&mut srv, &carol_did).unwrap();
+ let carol_timeline = bsky_get_timeline(&mut srv, &carol_did).unwrap();
+ assert!(carol_feed.feed.is_empty());
+ assert!(carol_timeline.feed.is_empty());
+
+ // alice does some posts
+ let alice_post1_tid = srv.tid_gen.next_tid();
+ let alice_post2_tid = srv.tid_gen.next_tid();
+ let alice_post3_tid = srv.tid_gen.next_tid();
+ assert!(alice_post1_tid < alice_post2_tid && alice_post2_tid < alice_post3_tid);
+ let mutations = vec![
+ Mutation::Create(
+ post_nsid.clone(),
+ alice_post1_tid.clone(),
+ ipld!({"text": "alice first post"}),
+ ),
+ Mutation::Create(
+ post_nsid.clone(),
+ alice_post2_tid.clone(),
+ ipld!({"text": "alice second post"}),
+ ),
+ Mutation::Create(
+ post_nsid.clone(),
+ alice_post3_tid.clone(),
+ ipld!({"text": "alice third post"}),
+ ),
+ ];
+ srv.repo
+ .mutate_repo(&alice_did, &mutations, &srv.pds_keypair)
+ .unwrap();
+ bsky_mutate_db(&mut srv.atp_db, &alice_did, mutations).unwrap();
+
+ // bob follows alice, likes first post, reposts second, replies third
+ let alice_post3_uri = format!(
+ "at://{}/{}/{}",
+ alice_did.to_string(),
+ post_nsid.to_string(),
+ alice_post3_tid.to_string()
+ );
+ let mutations = vec![
+ Mutation::Create(
+ follow_nsid.clone(),
+ srv.tid_gen.next_tid(),
+ ipld!({"subject": {"did": alice_did.to_string()}, "createdAt": created_at_now()}),
+ ),
+ Mutation::Create(
+ like_nsid.clone(),
+ srv.tid_gen.next_tid(),
+ ipld!({"subject": {"uri": format!("at://{}/{}/{}", alice_did.to_string(), post_nsid.to_string(), alice_post1_tid.to_string())}, "createdAt": created_at_now()}),
+ ),
+ Mutation::Create(
+ repost_nsid.clone(),
+ srv.tid_gen.next_tid(),
+ ipld!({"subject": {"uri": format!("at://{}/{}/{}", alice_did.to_string(), post_nsid.to_string(), alice_post2_tid.to_string())}, "createdAt": created_at_now()}),
+ ),
+ Mutation::Create(
+ post_nsid.clone(),
+ srv.tid_gen.next_tid(),
+ ipld!({"text": "bob comment on alice post3", "reply": {"parent": {"uri": alice_post3_uri.clone()}, "root": {"uri": alice_post3_uri.clone()}}}),
+ ),
+ ];
+ srv.repo
+ .mutate_repo(&bob_did, &mutations, &srv.pds_keypair)
+ .unwrap();
+ bsky_mutate_db(&mut srv.atp_db, &bob_did, mutations).unwrap();
+
+ // carol follows bob
+ let mutations = vec![Mutation::Create(
+ follow_nsid.clone(),
+ srv.tid_gen.next_tid(),
+ ipld!({"subject": {"did": bob_did.to_string()}, "createdAt": created_at_now()}),
+ )];
+ srv.repo
+ .mutate_repo(&bob_did, &mutations, &srv.pds_keypair)
+ .unwrap();
+ bsky_mutate_db(&mut srv.atp_db, &carol_did, mutations).unwrap();
+
+ // test alice profile: counts should be updated
+ let alice_profile = bsky_get_profile(&mut srv, &alice_did).unwrap();
+ assert_eq!(alice_profile.followersCount, 1);
+ assert_eq!(alice_profile.followsCount, 0);
+ assert_eq!(alice_profile.postsCount, 3);
+
+ // test alice timeline: still empty (?)
+ let alice_timeline = bsky_get_timeline(&mut srv, &alice_did).unwrap();
+ println!("{:?}", alice_timeline);
+ assert!(alice_timeline.feed.is_empty());
+
+ // test alice feed: should have 3 posts, with correct counts
+ let alice_feed = bsky_get_author_feed(&mut srv, &alice_did).unwrap();
+ assert_eq!(alice_feed.feed.len(), 3);
+
+ assert_eq!(
+ alice_feed.feed[2].uri,
+ format!(
+ "at://{}/{}/{}",
+ alice_did.to_string(),
+ post_nsid.to_string(),
+ alice_post1_tid.to_string()
+ )
+ );
+ // TODO: CID
+ assert_eq!(alice_feed.feed[2].author.did, alice_did.to_string());
+ assert_eq!(alice_feed.feed[2].author.handle, "alice.test");
+ assert_eq!(alice_feed.feed[2].repostedBy, None);
+ assert_eq!(
+ alice_feed.feed[2].record["text"].as_str().unwrap(),
+ "alice first post"
+ );
+ assert_eq!(alice_feed.feed[2].embed, None);
+ assert_eq!(alice_feed.feed[2].replyCount, 0);
+ assert_eq!(alice_feed.feed[2].repostCount, 0);
+ assert_eq!(alice_feed.feed[2].likeCount, 1);
+
+ assert_eq!(alice_feed.feed[1].author.did, alice_did.to_string());
+ assert_eq!(alice_feed.feed[1].replyCount, 0);
+ assert_eq!(alice_feed.feed[1].repostCount, 1);
+ assert_eq!(alice_feed.feed[1].likeCount, 0);
+
+ assert_eq!(alice_feed.feed[0].author.did, alice_did.to_string());
+ assert_eq!(alice_feed.feed[0].replyCount, 1);
+ assert_eq!(alice_feed.feed[0].repostCount, 0);
+ assert_eq!(alice_feed.feed[0].likeCount, 0);
+
+ // test bob timeline: should include alice posts
+ let bob_timeline = bsky_get_timeline(&mut srv, &bob_did).unwrap();
+ println!("BOB TIMELINE ======");
+ for item in bob_timeline.feed.iter() {
+ println!("{:?}", item);
+ }
+ assert_eq!(bob_timeline.feed.len(), 3);
+ assert_eq!(
+ bob_timeline.feed[2].uri,
+ format!(
+ "at://{}/{}/{}",
+ alice_did.to_string(),
+ post_nsid.to_string(),
+ alice_post1_tid.to_string()
+ )
+ );
+ // TODO: CID
+ assert_eq!(bob_timeline.feed[2].author.did, alice_did.to_string());
+ assert_eq!(bob_timeline.feed[2].author.handle, "alice.test");
+
+ // test bob feed: should include repost and reply
+ let bob_feed = bsky_get_author_feed(&mut srv, &bob_did).unwrap();
+ assert_eq!(bob_feed.feed.len(), 1);
+ // TODO: handle reposts
+ /*
+ assert_eq!(bob_feed.feed.len(), 2);
+ assert_eq!(bob_feed.feed[1].uri, format!("at://{}/{}/{}", alice_did.to_string(), post_nsid.to_string(), alice_post1_tid.to_string()));
+ // TODO: CID
+ assert_eq!(bob_feed.feed[1].author.did, alice_did.to_string());
+ assert_eq!(bob_feed.feed[1].author.handle, "alice.test");
+ assert_eq!(bob_feed.feed[1].repostedBy.as_ref().unwrap().did, bob_did.to_string());
+ assert_eq!(bob_feed.feed[1].repostedBy.as_ref().unwrap().handle, "bob.test");
+ // TODO: "is a repost" (check record?)
+ */
+
+ assert_eq!(bob_feed.feed[0].author.did, bob_did.to_string());
+ assert_eq!(bob_feed.feed[0].author.handle, "bob.test");
+
+ // test carol timeline: should include bob's repost and reply
+ let carol_timeline = bsky_get_timeline(&mut srv, &carol_did).unwrap();
+ // TODO: handle re-posts (+1 here)
+ assert_eq!(carol_timeline.feed.len(), 1);
+ // TODO: details
+
+ // test carol feed: still empty
+ let carol_feed = bsky_get_author_feed(&mut srv, &carol_did).unwrap();
+ assert!(carol_feed.feed.is_empty());
+}
+
+// TODO: post threads (include in the above test?)