aboutsummaryrefslogtreecommitdiffstats
path: root/adenosine-pds
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2022-11-09 15:35:17 -0800
committerBryan Newbold <bnewbold@robocracy.org>2022-11-09 15:35:38 -0800
commit7af51292ddfc81d55dd5beff07c557757bb0075f (patch)
tree6601cc2c73adab784f791f650e08fd9dc92aa65b /adenosine-pds
parentb8eea211866766aabde8c5e55d1061deb799ddc6 (diff)
downloadadenosine-7af51292ddfc81d55dd5beff07c557757bb0075f.tar.gz
adenosine-7af51292ddfc81d55dd5beff07c557757bb0075f.zip
pds: more bsky posts/feeds/follow/like progress
Diffstat (limited to 'adenosine-pds')
-rw-r--r--adenosine-pds/Cargo.toml1
-rw-r--r--adenosine-pds/src/atp_db.sql15
-rw-r--r--adenosine-pds/src/bsky.rs372
-rw-r--r--adenosine-pds/src/db.rs12
-rw-r--r--adenosine-pds/src/lib.rs62
-rw-r--r--adenosine-pds/src/models.rs17
6 files changed, 427 insertions, 52 deletions
diff --git a/adenosine-pds/Cargo.toml b/adenosine-pds/Cargo.toml
index 7075e65..1637164 100644
--- a/adenosine-pds/Cargo.toml
+++ b/adenosine-pds/Cargo.toml
@@ -44,6 +44,7 @@ bs58 = "*"
async-trait = "*"
dotenv = "*"
askama = "0.11"
+time = { version = "*", features = ["formatting"] }
# for vendored iroh-car
thiserror = "1.0"
diff --git a/adenosine-pds/src/atp_db.sql b/adenosine-pds/src/atp_db.sql
index 71fbf6d..9a6e30c 100644
--- a/adenosine-pds/src/atp_db.sql
+++ b/adenosine-pds/src/atp_db.sql
@@ -33,19 +33,22 @@ CREATE TABLE bsky_post(
tid TEXT NOT NULL,
cid TEXT NOT NULL,
record_json TEXT NOT NULL,
- reply_root_uri TEXT,
+ reply_to_parent_uri TEXT,
+ reply_to_root_uri TEXT,
created_at TIMESTAMP WITH TIME ZONE NOT NULL,
indexed_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT ( DATETIME('now') ),
PRIMARY KEY(did, tid)
);
-CREATE INDEX bsky_post_reply_root_uri_idx on bsky_post(reply_root_uri);
+CREATE INDEX bsky_post_reply_to_parent_uri_idx on bsky_post(reply_to_parent_uri);
+CREATE INDEX bsky_post_reply_to_root_uri_idx on bsky_post(reply_to_root_uri);
CREATE TABLE bsky_ref(
ref_type TEXT NOT NULL,
did TEXT NOT NULL,
tid TEXT NOT NULL,
subject_uri TEXT NOT NULL,
- subject_cid TEXT NOT NULL,
+ -- TODO: NOT NULL on subject_cid
+ subject_cid TEXT,
created_at TIMESTAMP WITH TIME ZONE NOT NULL,
indexed_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT ( DATETIME('now') ),
PRIMARY KEY(ref_type, did, tid)
@@ -56,7 +59,8 @@ CREATE TABLE bsky_follow(
did TEXT NOT NULL,
tid TEXT NOT NULL,
subject_did TEXT NOT NULL,
- subject_cid TEXT NOT NULL,
+ -- TODO: NOT NULL on subject_cid
+ subject_cid TEXT,
created_at TIMESTAMP WITH TIME ZONE NOT NULL,
indexed_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT ( DATETIME('now') ),
PRIMARY KEY(did, tid)
@@ -68,7 +72,8 @@ CREATE TABLE bsky_notification(
pk INTEGER PRIMARY KEY AUTOINCREMENT,
user_did TEXT NOT NULL,
subject_uri TEXT NOT NULL,
- subject_cid TEXT NOT NULL,
+ -- TODO: NOT NULL on subject_cid
+ subject_cid TEXT,
reason TEXT NOT NULL,
seen_at TIMESTAMP WITH TIME ZONE,
indexed_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT ( DATETIME('now') )
diff --git a/adenosine-pds/src/bsky.rs b/adenosine-pds/src/bsky.rs
index eeab9e3..66b05eb 100644
--- a/adenosine-pds/src/bsky.rs
+++ b/adenosine-pds/src/bsky.rs
@@ -91,14 +91,13 @@ pub fn bsky_get_profile(srv: &mut AtpService, did: &Did) -> Result<Profile> {
let mut stmt = srv
.atp_db
.conn
- .prepare_cached("SELECT COUNT(*) FROM bsky_ref WHERE ref_type = 'follow' AND did = $1")?;
+ .prepare_cached("SELECT COUNT(*) FROM bsky_follow WHERE did = $1")?;
let follows_count: u64 = stmt.query_row(params!(did.to_string()), |row| row.get(0))?;
- let uri = format!("at://{}", did);
let mut stmt = srv
.atp_db
.conn
- .prepare_cached("SELECT COUNT(*) FROM bsky_ref WHERE ref_type = 'follow' AND uri = $1")?;
- let followers_count: u64 = stmt.query_row(params!(uri), |row| row.get(0))?;
+ .prepare_cached("SELECT COUNT(*) FROM bsky_follow WHERE subject_did = $1")?;
+ let followers_count: u64 = stmt.query_row(params!(did.to_string()), |row| row.get(0))?;
Ok(Profile {
did: did.to_string(),
handle: handle,
@@ -120,7 +119,7 @@ pub fn bsky_update_profile(srv: &mut AtpService, did: &Did, profile: ProfileReco
let prefix = "/app.bsky.actor.profile/";
for (mst_key, _cid) in full_map.iter() {
if mst_key.starts_with(&prefix) {
- profile_tid = Some(Tid::from_str(mst_key.split('/').nth(1).unwrap())?);
+ profile_tid = Some(Tid::from_str(mst_key.split('/').nth(2).unwrap())?);
}
}
let profile_tid: Tid = profile_tid.unwrap_or(srv.tid_gen.next_tid());
@@ -139,6 +138,7 @@ struct FeedRow {
pub item_handle: String,
pub item_post_tid: Tid,
pub item_post_cid: Cid,
+ pub indexed_at: String,
}
fn feed_row(row: &rusqlite::Row) -> Result<FeedRow> {
@@ -149,45 +149,66 @@ fn feed_row(row: &rusqlite::Row) -> Result<FeedRow> {
let item_post_tid = Tid::from_str(&item_post_tid)?;
let cid_string: String = row.get(3)?;
let item_post_cid = Cid::from_str(&cid_string)?;
+ let indexed_at: String = row.get(4)?;
Ok(FeedRow {
item_did,
item_handle,
item_post_tid,
item_post_cid,
+ indexed_at,
})
}
fn feed_row_to_item(srv: &mut AtpService, row: FeedRow) -> Result<FeedItem> {
let record_ipld = srv.repo.get_ipld(&row.item_post_cid)?;
+ let uri = format!(
+ "at://{}/{}/{}",
+ row.item_did, "app.bsky.feed.post", row.item_post_tid
+ );
+
+ let mut stmt = srv.atp_db.conn.prepare_cached(
+ "SELECT COUNT(*) FROM bsky_ref WHERE ref_type = 'like' AND subject_uri = $1",
+ )?;
+ let like_count: u64 = stmt.query_row(params!(uri), |row| row.get(0))?;
+
+ let mut stmt = srv.atp_db.conn.prepare_cached(
+ "SELECT COUNT(*) FROM bsky_ref WHERE ref_type = 'repost' AND subject_uri = $1",
+ )?;
+ let repost_count: u64 = stmt.query_row(params!(uri), |row| row.get(0))?;
+
+ let mut stmt = srv
+ .atp_db
+ .conn
+ .prepare_cached("SELECT COUNT(*) FROM bsky_post WHERE reply_to_parent_uri = $1")?;
+ let reply_count: u64 = stmt.query_row(params!(uri), |row| row.get(0))?;
+
let feed_item = FeedItem {
- uri: format!(
- "at://{}/{}/{}",
- row.item_did, "app.bsky.feed.post", row.item_post_tid
- ),
- cid: row.item_post_cid.to_string(),
+ uri: uri,
+ cid: Some(row.item_post_cid.to_string()),
author: User {
did: row.item_did.to_string(),
handle: row.item_handle,
- displayName: None, // TODO
+ displayName: None, // TODO: fetch from profile (or cache)
},
repostedBy: None,
record: ipld_into_json_value(record_ipld),
embed: None,
- replyCount: 0, // TODO
- repostCount: 0, // TODO
- likeCount: 0, // TODO
- indexedAt: "TODO".to_string(), // TODO
+ replyCount: reply_count,
+ repostCount: repost_count,
+ likeCount: like_count,
+ indexedAt: row.indexed_at,
myState: None,
};
Ok(feed_item)
}
-pub fn bsky_get_author_feed(srv: &mut AtpService, did: &Did) -> Result<GenericFeed> {
+pub fn bsky_get_timeline(srv: &mut AtpService, did: &Did) -> Result<GenericFeed> {
let mut feed: Vec<FeedItem> = vec![];
+ // TODO: also handle reposts
let rows = {
let mut stmt = srv.atp_db
.conn
- .prepare_cached("SELECT account.did, account.handle, bsky_post.tid, bsky_post.cid, FROM bsky_post LEFT JOIN account ON bsky_post.did = account.did LEFT JOIN bsky_follow ON bsky_post.did = bsky_follow.subject_did WHERE bsky_follow.did = ?1 ORDER BY bsky_post.tid DESC LIMIT 20")?;
+ .prepare_cached("SELECT account.did, account.handle, bsky_post.tid, bsky_post.cid, bsky_post.indexed_at FROM bsky_post LEFT JOIN account ON bsky_post.did = account.did LEFT JOIN bsky_follow ON bsky_post.did = bsky_follow.subject_did WHERE bsky_follow.did = ?1 AND account.did IS NOT NULL ORDER BY bsky_post.tid DESC LIMIT 20")?;
let mut sql_rows = stmt.query(params!(did.to_string()))?;
let mut rows: Vec<FeedRow> = vec![];
while let Some(sql_row) = sql_rows.next()? {
@@ -202,12 +223,13 @@ pub fn bsky_get_author_feed(srv: &mut AtpService, did: &Did) -> Result<GenericFe
Ok(GenericFeed { feed })
}
-pub fn bsky_get_timeline(srv: &mut AtpService, did: &Did) -> Result<GenericFeed> {
+pub fn bsky_get_author_feed(srv: &mut AtpService, did: &Did) -> Result<GenericFeed> {
let mut feed: Vec<FeedItem> = vec![];
+ // TODO: also handle reposts
let rows = {
let mut stmt = srv.atp_db
.conn
- .prepare_cached("SELECT account.did, account.handle, bsky_post.tid, bsky_post.cid, FROM bsky_post LEFT JOIN account ON bsky_post.did = account.did WHERE bsky_post.did = ?1 ORDER BY bsky_post.tid DESC LIMIT 20")?;
+ .prepare_cached("SELECT account.did, account.handle, bsky_post.tid, bsky_post.cid, bsky_post.indexed_at FROM bsky_post LEFT JOIN account ON bsky_post.did = account.did WHERE bsky_post.did = ?1 ORDER BY bsky_post.tid DESC LIMIT 20")?;
let mut sql_rows = stmt.query(params!(did.to_string()))?;
let mut rows: Vec<FeedRow> = vec![];
while let Some(sql_row) = sql_rows.next()? {
@@ -231,3 +253,315 @@ pub fn bsky_get_thread(
// start?
unimplemented!()
}
+
+#[test]
+fn test_bsky_profile() {
+ use crate::{create_account, created_at_now};
+ use libipld::ipld;
+
+ let post_nsid = Nsid::from_str("app.bsky.feed.post").unwrap();
+ let follow_nsid = Nsid::from_str("app.bsky.graph.follow").unwrap();
+
+ let mut srv = AtpService::new_ephemeral().unwrap();
+ let req = AccountRequest {
+ email: "test@bogus.com".to_string(),
+ handle: "handle.test".to_string(),
+ password: "bogus".to_string(),
+ inviteCode: None,
+ recoveryKey: None,
+ };
+ let session = create_account(&mut srv, &req, true).unwrap();
+ let did = Did::from_str(&session.did).unwrap();
+ let profile = bsky_get_profile(&mut srv, &did).unwrap();
+ assert_eq!(profile.did, session.did);
+ assert_eq!(profile.handle, req.handle);
+ assert_eq!(profile.displayName, None);
+ assert_eq!(profile.description, None);
+ assert_eq!(profile.followersCount, 0);
+ assert_eq!(profile.followsCount, 0);
+ assert_eq!(profile.postsCount, 0);
+
+ let record = ProfileRecord {
+ displayName: "Test Name".to_string(),
+ description: Some("short description".to_string()),
+ };
+ bsky_update_profile(&mut srv, &did, record.clone()).unwrap();
+ let profile = bsky_get_profile(&mut srv, &did).unwrap();
+ assert_eq!(profile.displayName, Some(record.displayName));
+ assert_eq!(profile.description, record.description);
+
+ let record = ProfileRecord {
+ displayName: "New Test Name".to_string(),
+ description: Some("longer description".to_string()),
+ };
+ bsky_update_profile(&mut srv, &did, record.clone()).unwrap();
+ let profile = bsky_get_profile(&mut srv, &did).unwrap();
+ assert_eq!(profile.displayName, Some(record.displayName));
+ assert_eq!(profile.description, record.description);
+
+ let mutations = vec![
+ Mutation::Create(
+ follow_nsid.clone(),
+ srv.tid_gen.next_tid(),
+ ipld!({"subject": {"did": session.did}, "createdAt": created_at_now()}),
+ ),
+ Mutation::Create(
+ follow_nsid.clone(),
+ srv.tid_gen.next_tid(),
+ ipld!({"subject": {"did": "did:web:external.domain"}, "createdAt": created_at_now()}),
+ ),
+ Mutation::Create(
+ post_nsid.clone(),
+ srv.tid_gen.next_tid(),
+ ipld!({"text": "first post"}),
+ ),
+ Mutation::Create(
+ post_nsid.clone(),
+ srv.tid_gen.next_tid(),
+ ipld!({"text": "second post"}),
+ ),
+ Mutation::Create(
+ post_nsid.clone(),
+ srv.tid_gen.next_tid(),
+ ipld!({"text": "third post"}),
+ ),
+ ];
+ srv.repo
+ .mutate_repo(&did, &mutations, &srv.pds_keypair)
+ .unwrap();
+ bsky_mutate_db(&mut srv.atp_db, &did, mutations).unwrap();
+
+ let profile = bsky_get_profile(&mut srv, &did).unwrap();
+ assert_eq!(profile.followersCount, 1);
+ assert_eq!(profile.followsCount, 2);
+ assert_eq!(profile.postsCount, 3);
+}
+
+#[test]
+fn test_bsky_feeds() {
+ use crate::{create_account, created_at_now};
+ use libipld::ipld;
+
+ let post_nsid = Nsid::from_str("app.bsky.feed.post").unwrap();
+ let like_nsid = Nsid::from_str("app.bsky.feed.like").unwrap();
+ let repost_nsid = Nsid::from_str("app.bsky.feed.repost").unwrap();
+ let follow_nsid = Nsid::from_str("app.bsky.graph.follow").unwrap();
+
+ let mut srv = AtpService::new_ephemeral().unwrap();
+ let alice_did = {
+ let req = AccountRequest {
+ email: "alice@bogus.com".to_string(),
+ handle: "alice.test".to_string(),
+ password: "bogus".to_string(),
+ inviteCode: None,
+ recoveryKey: None,
+ };
+ let session = create_account(&mut srv, &req, true).unwrap();
+ Did::from_str(&session.did).unwrap()
+ };
+ let bob_did = {
+ let req = AccountRequest {
+ email: "bob@bogus.com".to_string(),
+ handle: "bob.test".to_string(),
+ password: "bogus".to_string(),
+ inviteCode: None,
+ recoveryKey: None,
+ };
+ let session = create_account(&mut srv, &req, true).unwrap();
+ Did::from_str(&session.did).unwrap()
+ };
+ let carol_did = {
+ let req = AccountRequest {
+ email: "carol@bogus.com".to_string(),
+ handle: "carol.test".to_string(),
+ password: "bogus".to_string(),
+ inviteCode: None,
+ recoveryKey: None,
+ };
+ let session = create_account(&mut srv, &req, true).unwrap();
+ Did::from_str(&session.did).unwrap()
+ };
+
+ // all feeds and timelines should be empty
+ let alice_feed = bsky_get_author_feed(&mut srv, &alice_did).unwrap();
+ let alice_timeline = bsky_get_timeline(&mut srv, &alice_did).unwrap();
+ assert!(alice_feed.feed.is_empty());
+ assert!(alice_timeline.feed.is_empty());
+ let bob_feed = bsky_get_author_feed(&mut srv, &bob_did).unwrap();
+ let bob_timeline = bsky_get_timeline(&mut srv, &bob_did).unwrap();
+ assert!(bob_feed.feed.is_empty());
+ assert!(bob_timeline.feed.is_empty());
+ let carol_feed = bsky_get_author_feed(&mut srv, &carol_did).unwrap();
+ let carol_timeline = bsky_get_timeline(&mut srv, &carol_did).unwrap();
+ assert!(carol_feed.feed.is_empty());
+ assert!(carol_timeline.feed.is_empty());
+
+ // alice does some posts
+ let alice_post1_tid = srv.tid_gen.next_tid();
+ let alice_post2_tid = srv.tid_gen.next_tid();
+ let alice_post3_tid = srv.tid_gen.next_tid();
+ assert!(alice_post1_tid < alice_post2_tid && alice_post2_tid < alice_post3_tid);
+ let mutations = vec![
+ Mutation::Create(
+ post_nsid.clone(),
+ alice_post1_tid.clone(),
+ ipld!({"text": "alice first post"}),
+ ),
+ Mutation::Create(
+ post_nsid.clone(),
+ alice_post2_tid.clone(),
+ ipld!({"text": "alice second post"}),
+ ),
+ Mutation::Create(
+ post_nsid.clone(),
+ alice_post3_tid.clone(),
+ ipld!({"text": "alice third post"}),
+ ),
+ ];
+ srv.repo
+ .mutate_repo(&alice_did, &mutations, &srv.pds_keypair)
+ .unwrap();
+ bsky_mutate_db(&mut srv.atp_db, &alice_did, mutations).unwrap();
+
+ // bob follows alice, likes first post, reposts second, replies third
+ let alice_post3_uri = format!(
+ "at://{}/{}/{}",
+ alice_did.to_string(),
+ post_nsid.to_string(),
+ alice_post3_tid.to_string()
+ );
+ let mutations = vec![
+ Mutation::Create(
+ follow_nsid.clone(),
+ srv.tid_gen.next_tid(),
+ ipld!({"subject": {"did": alice_did.to_string()}, "createdAt": created_at_now()}),
+ ),
+ Mutation::Create(
+ like_nsid.clone(),
+ srv.tid_gen.next_tid(),
+ ipld!({"subject": {"uri": format!("at://{}/{}/{}", alice_did.to_string(), post_nsid.to_string(), alice_post1_tid.to_string())}, "createdAt": created_at_now()}),
+ ),
+ Mutation::Create(
+ repost_nsid.clone(),
+ srv.tid_gen.next_tid(),
+ ipld!({"subject": {"uri": format!("at://{}/{}/{}", alice_did.to_string(), post_nsid.to_string(), alice_post2_tid.to_string())}, "createdAt": created_at_now()}),
+ ),
+ Mutation::Create(
+ post_nsid.clone(),
+ srv.tid_gen.next_tid(),
+ ipld!({"text": "bob comment on alice post3", "reply": {"parent": {"uri": alice_post3_uri.clone()}, "root": {"uri": alice_post3_uri.clone()}}}),
+ ),
+ ];
+ srv.repo
+ .mutate_repo(&bob_did, &mutations, &srv.pds_keypair)
+ .unwrap();
+ bsky_mutate_db(&mut srv.atp_db, &bob_did, mutations).unwrap();
+
+ // carol follows bob
+ let mutations = vec![Mutation::Create(
+ follow_nsid.clone(),
+ srv.tid_gen.next_tid(),
+ ipld!({"subject": {"did": bob_did.to_string()}, "createdAt": created_at_now()}),
+ )];
+ srv.repo
+ .mutate_repo(&bob_did, &mutations, &srv.pds_keypair)
+ .unwrap();
+ bsky_mutate_db(&mut srv.atp_db, &carol_did, mutations).unwrap();
+
+ // test alice profile: counts should be updated
+ let alice_profile = bsky_get_profile(&mut srv, &alice_did).unwrap();
+ assert_eq!(alice_profile.followersCount, 1);
+ assert_eq!(alice_profile.followsCount, 0);
+ assert_eq!(alice_profile.postsCount, 3);
+
+ // test alice timeline: still empty (?)
+ let alice_timeline = bsky_get_timeline(&mut srv, &alice_did).unwrap();
+ println!("{:?}", alice_timeline);
+ assert!(alice_timeline.feed.is_empty());
+
+ // test alice feed: should have 3 posts, with correct counts
+ let alice_feed = bsky_get_author_feed(&mut srv, &alice_did).unwrap();
+ assert_eq!(alice_feed.feed.len(), 3);
+
+ assert_eq!(
+ alice_feed.feed[2].uri,
+ format!(
+ "at://{}/{}/{}",
+ alice_did.to_string(),
+ post_nsid.to_string(),
+ alice_post1_tid.to_string()
+ )
+ );
+ // TODO: CID
+ assert_eq!(alice_feed.feed[2].author.did, alice_did.to_string());
+ assert_eq!(alice_feed.feed[2].author.handle, "alice.test");
+ assert_eq!(alice_feed.feed[2].repostedBy, None);
+ assert_eq!(
+ alice_feed.feed[2].record["text"].as_str().unwrap(),
+ "alice first post"
+ );
+ assert_eq!(alice_feed.feed[2].embed, None);
+ assert_eq!(alice_feed.feed[2].replyCount, 0);
+ assert_eq!(alice_feed.feed[2].repostCount, 0);
+ assert_eq!(alice_feed.feed[2].likeCount, 1);
+
+ assert_eq!(alice_feed.feed[1].author.did, alice_did.to_string());
+ assert_eq!(alice_feed.feed[1].replyCount, 0);
+ assert_eq!(alice_feed.feed[1].repostCount, 1);
+ assert_eq!(alice_feed.feed[1].likeCount, 0);
+
+ assert_eq!(alice_feed.feed[0].author.did, alice_did.to_string());
+ assert_eq!(alice_feed.feed[0].replyCount, 1);
+ assert_eq!(alice_feed.feed[0].repostCount, 0);
+ assert_eq!(alice_feed.feed[0].likeCount, 0);
+
+ // test bob timeline: should include alice posts
+ let bob_timeline = bsky_get_timeline(&mut srv, &bob_did).unwrap();
+ println!("BOB TIMELINE ======");
+ for item in bob_timeline.feed.iter() {
+ println!("{:?}", item);
+ }
+ assert_eq!(bob_timeline.feed.len(), 3);
+ assert_eq!(
+ bob_timeline.feed[2].uri,
+ format!(
+ "at://{}/{}/{}",
+ alice_did.to_string(),
+ post_nsid.to_string(),
+ alice_post1_tid.to_string()
+ )
+ );
+ // TODO: CID
+ assert_eq!(bob_timeline.feed[2].author.did, alice_did.to_string());
+ assert_eq!(bob_timeline.feed[2].author.handle, "alice.test");
+
+ // test bob feed: should include repost and reply
+ let bob_feed = bsky_get_author_feed(&mut srv, &bob_did).unwrap();
+ assert_eq!(bob_feed.feed.len(), 1);
+ // TODO: handle reposts
+ /*
+ assert_eq!(bob_feed.feed.len(), 2);
+ assert_eq!(bob_feed.feed[1].uri, format!("at://{}/{}/{}", alice_did.to_string(), post_nsid.to_string(), alice_post1_tid.to_string()));
+ // TODO: CID
+ assert_eq!(bob_feed.feed[1].author.did, alice_did.to_string());
+ assert_eq!(bob_feed.feed[1].author.handle, "alice.test");
+ assert_eq!(bob_feed.feed[1].repostedBy.as_ref().unwrap().did, bob_did.to_string());
+ assert_eq!(bob_feed.feed[1].repostedBy.as_ref().unwrap().handle, "bob.test");
+ // TODO: "is a repost" (check record?)
+ */
+
+ assert_eq!(bob_feed.feed[0].author.did, bob_did.to_string());
+ assert_eq!(bob_feed.feed[0].author.handle, "bob.test");
+
+ // test carol timeline: should include bob's repost and reply
+ let carol_timeline = bsky_get_timeline(&mut srv, &carol_did).unwrap();
+ // TODO: handle re-posts (+1 here)
+ assert_eq!(carol_timeline.feed.len(), 1);
+ // TODO: details
+
+ // test carol feed: still empty
+ let carol_feed = bsky_get_author_feed(&mut srv, &carol_did).unwrap();
+ assert!(carol_feed.feed.is_empty());
+}
+
+// TODO: post threads (include in the above test?)
diff --git a/adenosine-pds/src/db.rs b/adenosine-pds/src/db.rs
index 384b498..051588d 100644
--- a/adenosine-pds/src/db.rs
+++ b/adenosine-pds/src/db.rs
@@ -1,6 +1,6 @@
use crate::models::{FollowRecord, Post, RefRecord};
/// ATP database (as distinct from blockstore)
-use crate::{ipld_into_json_value, AtpSession, Did, KeyPair, Tid};
+use crate::{created_at_now, ipld_into_json_value, AtpSession, Did, KeyPair, Tid};
use anyhow::{anyhow, Result};
use lazy_static::lazy_static;
use libipld::cbor::DagCborCodec;
@@ -167,15 +167,21 @@ impl AtpDatabase {
let block = Block::<DefaultParams>::encode(DagCborCodec, Code::Sha2_256, &val)?;
let cid = *block.cid();
let post: Post = serde_json::from_value(ipld_into_json_value(val))?;
+ let (reply_to_parent_uri, reply_to_root_uri) = match post.reply {
+ Some(ref reply) => (Some(reply.parent.uri.clone()), Some(reply.root.uri.clone())),
+ None => (None, None),
+ };
let mut stmt = self
.conn
- .prepare_cached("INSERT INTO bsky_post (did, tid, cid, record_json, created_at) VALUES (?1, ?2, ?3, ?4, ?5)")?;
+ .prepare_cached("INSERT INTO bsky_post (did, tid, cid, reply_to_parent_uri, reply_to_root_uri, record_json, created_at) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)")?;
stmt.execute(params!(
did.to_string(),
tid.to_string(),
cid.to_string(),
+ reply_to_parent_uri,
+ reply_to_root_uri,
serde_json::to_string(&post)?,
- post.createdAt
+ post.createdAt.unwrap_or_else(|| created_at_now())
))?;
} else {
let mut stmt = self
diff --git a/adenosine-pds/src/lib.rs b/adenosine-pds/src/lib.rs
index c7bb336..738a061 100644
--- a/adenosine-pds/src/lib.rs
+++ b/adenosine-pds/src/lib.rs
@@ -138,6 +138,16 @@ impl AtpService {
})
}
+ pub fn new_ephemeral() -> Result<Self> {
+ Ok(AtpService {
+ repo: RepoStore::open_ephemeral()?,
+ atp_db: AtpDatabase::open_ephemeral()?,
+ pds_keypair: KeyPair::new_random(),
+ tid_gen: TidLord::new(),
+ config: AtpServiceConfig::default(),
+ })
+ }
+
pub fn run_server(self) -> Result<()> {
let config = self.config.clone();
let srv = Mutex::new(self);
@@ -389,6 +399,29 @@ fn xrpc_get_handler(
};
Ok(json!(desc))
}
+ // =========== app.bsky methods
+ "app.bsky.actor.getProfile" => {
+ // TODO did or handle
+ let did = Did::from_str(&xrpc_required_param(request, "user")?)?;
+ let mut srv = srv.lock().unwrap();
+ Ok(json!(bsky_get_profile(&mut srv, &did)?))
+ }
+ "app.bsky.feed.getAuthorFeed" => {
+ // TODO did or handle
+ let did = Did::from_str(&xrpc_required_param(request, "author")?)?;
+ let mut srv = srv.lock().unwrap();
+ Ok(json!(bsky_get_author_feed(&mut srv, &did)?))
+ }
+ "app.bsky.feed.getTimeline" => {
+ let mut srv = srv.lock().unwrap();
+ let auth_did = &xrpc_check_auth_header(&mut srv, request, None)?;
+ Ok(json!(bsky_get_timeline(&mut srv, &auth_did)?))
+ }
+ "app.bsky.feed.getPostThread" => {
+ let uri = AtUri::from_str(&xrpc_required_param(request, "uri")?)?;
+ let mut srv = srv.lock().unwrap();
+ Ok(json!(bsky_get_thread(&mut srv, &uri, None)?))
+ }
_ => Err(anyhow!(XrpcError::NotFound(format!(
"XRPC endpoint handler not found: {}",
method
@@ -609,12 +642,6 @@ fn xrpc_post_handler(
Ok(json!({}))
}
// =========== app.bsky methods
- "app.bsky.actor.getProfile" => {
- // TODO did or handle
- let did = Did::from_str(&xrpc_required_param(request, "user")?)?;
- let mut srv = srv.lock().unwrap();
- Ok(json!(bsky_get_profile(&mut srv, &did)?))
- }
"app.bsky.actor.updateProfile" => {
let profile: ProfileRecord = rouille::input::json_input(request)?;
let mut srv = srv.lock().unwrap();
@@ -622,22 +649,6 @@ fn xrpc_post_handler(
bsky_update_profile(&mut srv, &auth_did, profile)?;
Ok(json!({}))
}
- "app.bsky.feed.getAuthorFeed" => {
- // TODO did or handle
- let did = Did::from_str(&xrpc_required_param(request, "author")?)?;
- let mut srv = srv.lock().unwrap();
- Ok(json!(bsky_get_author_feed(&mut srv, &did)?))
- }
- "app.bsky.feed.getTimeline" => {
- let mut srv = srv.lock().unwrap();
- let auth_did = &xrpc_check_auth_header(&mut srv, request, None)?;
- Ok(json!(bsky_get_timeline(&mut srv, &auth_did)?))
- }
- "app.bsky.feed.getPostThread" => {
- let uri = AtUri::from_str(&xrpc_required_param(request, "uri")?)?;
- let mut srv = srv.lock().unwrap();
- Ok(json!(bsky_get_thread(&mut srv, &uri, None)?))
- }
_ => Err(anyhow!(XrpcError::NotFound(format!(
"XRPC endpoint handler not found: {}",
method
@@ -810,3 +821,10 @@ fn record_handler(
}
.render()?)
}
+
+/// Helper to generate the current timestamp as right now, UTC, formatted as a string
+pub fn created_at_now() -> String {
+ let now = time::OffsetDateTime::now_utc();
+ now.format(&time::format_description::well_known::Rfc3339)
+ .unwrap()
+}
diff --git a/adenosine-pds/src/models.rs b/adenosine-pds/src/models.rs
index 116ac53..0f47c2d 100644
--- a/adenosine-pds/src/models.rs
+++ b/adenosine-pds/src/models.rs
@@ -59,7 +59,8 @@ pub struct RepoBatchWrite {
#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq)]
pub struct Subject {
pub uri: String,
- pub cid: String,
+ // TODO: CID is required
+ pub cid: Option<String>,
}
/// Generic over Re-post and Like
@@ -122,7 +123,8 @@ pub struct User {
#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq)]
pub struct FeedItem {
pub uri: String,
- pub cid: String,
+ // TODO: cid is required
+ pub cid: Option<String>,
pub author: User,
pub repostedBy: Option<User>,
pub record: Value,
@@ -139,11 +141,19 @@ pub struct FeedItem {
#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq)]
pub struct Post {
pub text: String,
+ pub reply: Option<PostReply>,
pub createdAt: Option<String>,
}
#[allow(non_snake_case)]
#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq)]
+pub struct PostReply {
+ pub parent: Subject,
+ pub root: Subject,
+}
+
+#[allow(non_snake_case)]
+#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq)]
pub struct PostThread {
pub thread: ThreadItem,
}
@@ -152,7 +162,8 @@ pub struct PostThread {
#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq)]
pub struct ThreadItem {
pub uri: String,
- pub cid: String,
+ // TODO: CID is required
+ pub cid: Option<String>,
pub author: User,
pub record: Value,
//pub embed?: RecordEmbed | ExternalEmbed | UnknownEmbed,