diff options
Diffstat (limited to 'adenosine-pds')
-rw-r--r-- | adenosine-pds/Cargo.toml | 1 | ||||
-rw-r--r-- | adenosine-pds/src/atp_db.sql | 15 | ||||
-rw-r--r-- | adenosine-pds/src/bsky.rs | 372 | ||||
-rw-r--r-- | adenosine-pds/src/db.rs | 12 | ||||
-rw-r--r-- | adenosine-pds/src/lib.rs | 62 | ||||
-rw-r--r-- | adenosine-pds/src/models.rs | 17 |
6 files changed, 427 insertions, 52 deletions
diff --git a/adenosine-pds/Cargo.toml b/adenosine-pds/Cargo.toml index 7075e65..1637164 100644 --- a/adenosine-pds/Cargo.toml +++ b/adenosine-pds/Cargo.toml @@ -44,6 +44,7 @@ bs58 = "*" async-trait = "*" dotenv = "*" askama = "0.11" +time = { version = "*", features = ["formatting"] } # for vendored iroh-car thiserror = "1.0" diff --git a/adenosine-pds/src/atp_db.sql b/adenosine-pds/src/atp_db.sql index 71fbf6d..9a6e30c 100644 --- a/adenosine-pds/src/atp_db.sql +++ b/adenosine-pds/src/atp_db.sql @@ -33,19 +33,22 @@ CREATE TABLE bsky_post( tid TEXT NOT NULL, cid TEXT NOT NULL, record_json TEXT NOT NULL, - reply_root_uri TEXT, + reply_to_parent_uri TEXT, + reply_to_root_uri TEXT, created_at TIMESTAMP WITH TIME ZONE NOT NULL, indexed_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT ( DATETIME('now') ), PRIMARY KEY(did, tid) ); -CREATE INDEX bsky_post_reply_root_uri_idx on bsky_post(reply_root_uri); +CREATE INDEX bsky_post_reply_to_parent_uri_idx on bsky_post(reply_to_parent_uri); +CREATE INDEX bsky_post_reply_to_root_uri_idx on bsky_post(reply_to_root_uri); CREATE TABLE bsky_ref( ref_type TEXT NOT NULL, did TEXT NOT NULL, tid TEXT NOT NULL, subject_uri TEXT NOT NULL, - subject_cid TEXT NOT NULL, + -- TODO: NOT NULL on subject_cid + subject_cid TEXT, created_at TIMESTAMP WITH TIME ZONE NOT NULL, indexed_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT ( DATETIME('now') ), PRIMARY KEY(ref_type, did, tid) @@ -56,7 +59,8 @@ CREATE TABLE bsky_follow( did TEXT NOT NULL, tid TEXT NOT NULL, subject_did TEXT NOT NULL, - subject_cid TEXT NOT NULL, + -- TODO: NOT NULL on subject_cid + subject_cid TEXT, created_at TIMESTAMP WITH TIME ZONE NOT NULL, indexed_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT ( DATETIME('now') ), PRIMARY KEY(did, tid) @@ -68,7 +72,8 @@ CREATE TABLE bsky_notification( pk INTEGER PRIMARY KEY AUTOINCREMENT, user_did TEXT NOT NULL, subject_uri TEXT NOT NULL, - subject_cid TEXT NOT NULL, + -- TODO: NOT NULL on subject_cid + subject_cid TEXT, reason TEXT NOT NULL, seen_at TIMESTAMP WITH TIME ZONE, indexed_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT ( DATETIME('now') ) diff --git a/adenosine-pds/src/bsky.rs b/adenosine-pds/src/bsky.rs index eeab9e3..66b05eb 100644 --- a/adenosine-pds/src/bsky.rs +++ b/adenosine-pds/src/bsky.rs @@ -91,14 +91,13 @@ pub fn bsky_get_profile(srv: &mut AtpService, did: &Did) -> Result<Profile> { let mut stmt = srv .atp_db .conn - .prepare_cached("SELECT COUNT(*) FROM bsky_ref WHERE ref_type = 'follow' AND did = $1")?; + .prepare_cached("SELECT COUNT(*) FROM bsky_follow WHERE did = $1")?; let follows_count: u64 = stmt.query_row(params!(did.to_string()), |row| row.get(0))?; - let uri = format!("at://{}", did); let mut stmt = srv .atp_db .conn - .prepare_cached("SELECT COUNT(*) FROM bsky_ref WHERE ref_type = 'follow' AND uri = $1")?; - let followers_count: u64 = stmt.query_row(params!(uri), |row| row.get(0))?; + .prepare_cached("SELECT COUNT(*) FROM bsky_follow WHERE subject_did = $1")?; + let followers_count: u64 = stmt.query_row(params!(did.to_string()), |row| row.get(0))?; Ok(Profile { did: did.to_string(), handle: handle, @@ -120,7 +119,7 @@ pub fn bsky_update_profile(srv: &mut AtpService, did: &Did, profile: ProfileReco let prefix = "/app.bsky.actor.profile/"; for (mst_key, _cid) in full_map.iter() { if mst_key.starts_with(&prefix) { - profile_tid = Some(Tid::from_str(mst_key.split('/').nth(1).unwrap())?); + profile_tid = Some(Tid::from_str(mst_key.split('/').nth(2).unwrap())?); } } let profile_tid: Tid = profile_tid.unwrap_or(srv.tid_gen.next_tid()); @@ -139,6 +138,7 @@ struct FeedRow { pub item_handle: String, pub item_post_tid: Tid, pub item_post_cid: Cid, + pub indexed_at: String, } fn feed_row(row: &rusqlite::Row) -> Result<FeedRow> { @@ -149,45 +149,66 @@ fn feed_row(row: &rusqlite::Row) -> Result<FeedRow> { let item_post_tid = Tid::from_str(&item_post_tid)?; let cid_string: String = row.get(3)?; let item_post_cid = Cid::from_str(&cid_string)?; + let indexed_at: String = row.get(4)?; Ok(FeedRow { item_did, item_handle, item_post_tid, item_post_cid, + indexed_at, }) } fn feed_row_to_item(srv: &mut AtpService, row: FeedRow) -> Result<FeedItem> { let record_ipld = srv.repo.get_ipld(&row.item_post_cid)?; + let uri = format!( + "at://{}/{}/{}", + row.item_did, "app.bsky.feed.post", row.item_post_tid + ); + + let mut stmt = srv.atp_db.conn.prepare_cached( + "SELECT COUNT(*) FROM bsky_ref WHERE ref_type = 'like' AND subject_uri = $1", + )?; + let like_count: u64 = stmt.query_row(params!(uri), |row| row.get(0))?; + + let mut stmt = srv.atp_db.conn.prepare_cached( + "SELECT COUNT(*) FROM bsky_ref WHERE ref_type = 'repost' AND subject_uri = $1", + )?; + let repost_count: u64 = stmt.query_row(params!(uri), |row| row.get(0))?; + + let mut stmt = srv + .atp_db + .conn + .prepare_cached("SELECT COUNT(*) FROM bsky_post WHERE reply_to_parent_uri = $1")?; + let reply_count: u64 = stmt.query_row(params!(uri), |row| row.get(0))?; + let feed_item = FeedItem { - uri: format!( - "at://{}/{}/{}", - row.item_did, "app.bsky.feed.post", row.item_post_tid - ), - cid: row.item_post_cid.to_string(), + uri: uri, + cid: Some(row.item_post_cid.to_string()), author: User { did: row.item_did.to_string(), handle: row.item_handle, - displayName: None, // TODO + displayName: None, // TODO: fetch from profile (or cache) }, repostedBy: None, record: ipld_into_json_value(record_ipld), embed: None, - replyCount: 0, // TODO - repostCount: 0, // TODO - likeCount: 0, // TODO - indexedAt: "TODO".to_string(), // TODO + replyCount: reply_count, + repostCount: repost_count, + likeCount: like_count, + indexedAt: row.indexed_at, myState: None, }; Ok(feed_item) } -pub fn bsky_get_author_feed(srv: &mut AtpService, did: &Did) -> Result<GenericFeed> { +pub fn bsky_get_timeline(srv: &mut AtpService, did: &Did) -> Result<GenericFeed> { let mut feed: Vec<FeedItem> = vec![]; + // TODO: also handle reposts let rows = { let mut stmt = srv.atp_db .conn - .prepare_cached("SELECT account.did, account.handle, bsky_post.tid, bsky_post.cid, FROM bsky_post LEFT JOIN account ON bsky_post.did = account.did LEFT JOIN bsky_follow ON bsky_post.did = bsky_follow.subject_did WHERE bsky_follow.did = ?1 ORDER BY bsky_post.tid DESC LIMIT 20")?; + .prepare_cached("SELECT account.did, account.handle, bsky_post.tid, bsky_post.cid, bsky_post.indexed_at FROM bsky_post LEFT JOIN account ON bsky_post.did = account.did LEFT JOIN bsky_follow ON bsky_post.did = bsky_follow.subject_did WHERE bsky_follow.did = ?1 AND account.did IS NOT NULL ORDER BY bsky_post.tid DESC LIMIT 20")?; let mut sql_rows = stmt.query(params!(did.to_string()))?; let mut rows: Vec<FeedRow> = vec![]; while let Some(sql_row) = sql_rows.next()? { @@ -202,12 +223,13 @@ pub fn bsky_get_author_feed(srv: &mut AtpService, did: &Did) -> Result<GenericFe Ok(GenericFeed { feed }) } -pub fn bsky_get_timeline(srv: &mut AtpService, did: &Did) -> Result<GenericFeed> { +pub fn bsky_get_author_feed(srv: &mut AtpService, did: &Did) -> Result<GenericFeed> { let mut feed: Vec<FeedItem> = vec![]; + // TODO: also handle reposts let rows = { let mut stmt = srv.atp_db .conn - .prepare_cached("SELECT account.did, account.handle, bsky_post.tid, bsky_post.cid, FROM bsky_post LEFT JOIN account ON bsky_post.did = account.did WHERE bsky_post.did = ?1 ORDER BY bsky_post.tid DESC LIMIT 20")?; + .prepare_cached("SELECT account.did, account.handle, bsky_post.tid, bsky_post.cid, bsky_post.indexed_at FROM bsky_post LEFT JOIN account ON bsky_post.did = account.did WHERE bsky_post.did = ?1 ORDER BY bsky_post.tid DESC LIMIT 20")?; let mut sql_rows = stmt.query(params!(did.to_string()))?; let mut rows: Vec<FeedRow> = vec![]; while let Some(sql_row) = sql_rows.next()? { @@ -231,3 +253,315 @@ pub fn bsky_get_thread( // start? unimplemented!() } + +#[test] +fn test_bsky_profile() { + use crate::{create_account, created_at_now}; + use libipld::ipld; + + let post_nsid = Nsid::from_str("app.bsky.feed.post").unwrap(); + let follow_nsid = Nsid::from_str("app.bsky.graph.follow").unwrap(); + + let mut srv = AtpService::new_ephemeral().unwrap(); + let req = AccountRequest { + email: "test@bogus.com".to_string(), + handle: "handle.test".to_string(), + password: "bogus".to_string(), + inviteCode: None, + recoveryKey: None, + }; + let session = create_account(&mut srv, &req, true).unwrap(); + let did = Did::from_str(&session.did).unwrap(); + let profile = bsky_get_profile(&mut srv, &did).unwrap(); + assert_eq!(profile.did, session.did); + assert_eq!(profile.handle, req.handle); + assert_eq!(profile.displayName, None); + assert_eq!(profile.description, None); + assert_eq!(profile.followersCount, 0); + assert_eq!(profile.followsCount, 0); + assert_eq!(profile.postsCount, 0); + + let record = ProfileRecord { + displayName: "Test Name".to_string(), + description: Some("short description".to_string()), + }; + bsky_update_profile(&mut srv, &did, record.clone()).unwrap(); + let profile = bsky_get_profile(&mut srv, &did).unwrap(); + assert_eq!(profile.displayName, Some(record.displayName)); + assert_eq!(profile.description, record.description); + + let record = ProfileRecord { + displayName: "New Test Name".to_string(), + description: Some("longer description".to_string()), + }; + bsky_update_profile(&mut srv, &did, record.clone()).unwrap(); + let profile = bsky_get_profile(&mut srv, &did).unwrap(); + assert_eq!(profile.displayName, Some(record.displayName)); + assert_eq!(profile.description, record.description); + + let mutations = vec![ + Mutation::Create( + follow_nsid.clone(), + srv.tid_gen.next_tid(), + ipld!({"subject": {"did": session.did}, "createdAt": created_at_now()}), + ), + Mutation::Create( + follow_nsid.clone(), + srv.tid_gen.next_tid(), + ipld!({"subject": {"did": "did:web:external.domain"}, "createdAt": created_at_now()}), + ), + Mutation::Create( + post_nsid.clone(), + srv.tid_gen.next_tid(), + ipld!({"text": "first post"}), + ), + Mutation::Create( + post_nsid.clone(), + srv.tid_gen.next_tid(), + ipld!({"text": "second post"}), + ), + Mutation::Create( + post_nsid.clone(), + srv.tid_gen.next_tid(), + ipld!({"text": "third post"}), + ), + ]; + srv.repo + .mutate_repo(&did, &mutations, &srv.pds_keypair) + .unwrap(); + bsky_mutate_db(&mut srv.atp_db, &did, mutations).unwrap(); + + let profile = bsky_get_profile(&mut srv, &did).unwrap(); + assert_eq!(profile.followersCount, 1); + assert_eq!(profile.followsCount, 2); + assert_eq!(profile.postsCount, 3); +} + +#[test] +fn test_bsky_feeds() { + use crate::{create_account, created_at_now}; + use libipld::ipld; + + let post_nsid = Nsid::from_str("app.bsky.feed.post").unwrap(); + let like_nsid = Nsid::from_str("app.bsky.feed.like").unwrap(); + let repost_nsid = Nsid::from_str("app.bsky.feed.repost").unwrap(); + let follow_nsid = Nsid::from_str("app.bsky.graph.follow").unwrap(); + + let mut srv = AtpService::new_ephemeral().unwrap(); + let alice_did = { + let req = AccountRequest { + email: "alice@bogus.com".to_string(), + handle: "alice.test".to_string(), + password: "bogus".to_string(), + inviteCode: None, + recoveryKey: None, + }; + let session = create_account(&mut srv, &req, true).unwrap(); + Did::from_str(&session.did).unwrap() + }; + let bob_did = { + let req = AccountRequest { + email: "bob@bogus.com".to_string(), + handle: "bob.test".to_string(), + password: "bogus".to_string(), + inviteCode: None, + recoveryKey: None, + }; + let session = create_account(&mut srv, &req, true).unwrap(); + Did::from_str(&session.did).unwrap() + }; + let carol_did = { + let req = AccountRequest { + email: "carol@bogus.com".to_string(), + handle: "carol.test".to_string(), + password: "bogus".to_string(), + inviteCode: None, + recoveryKey: None, + }; + let session = create_account(&mut srv, &req, true).unwrap(); + Did::from_str(&session.did).unwrap() + }; + + // all feeds and timelines should be empty + let alice_feed = bsky_get_author_feed(&mut srv, &alice_did).unwrap(); + let alice_timeline = bsky_get_timeline(&mut srv, &alice_did).unwrap(); + assert!(alice_feed.feed.is_empty()); + assert!(alice_timeline.feed.is_empty()); + let bob_feed = bsky_get_author_feed(&mut srv, &bob_did).unwrap(); + let bob_timeline = bsky_get_timeline(&mut srv, &bob_did).unwrap(); + assert!(bob_feed.feed.is_empty()); + assert!(bob_timeline.feed.is_empty()); + let carol_feed = bsky_get_author_feed(&mut srv, &carol_did).unwrap(); + let carol_timeline = bsky_get_timeline(&mut srv, &carol_did).unwrap(); + assert!(carol_feed.feed.is_empty()); + assert!(carol_timeline.feed.is_empty()); + + // alice does some posts + let alice_post1_tid = srv.tid_gen.next_tid(); + let alice_post2_tid = srv.tid_gen.next_tid(); + let alice_post3_tid = srv.tid_gen.next_tid(); + assert!(alice_post1_tid < alice_post2_tid && alice_post2_tid < alice_post3_tid); + let mutations = vec![ + Mutation::Create( + post_nsid.clone(), + alice_post1_tid.clone(), + ipld!({"text": "alice first post"}), + ), + Mutation::Create( + post_nsid.clone(), + alice_post2_tid.clone(), + ipld!({"text": "alice second post"}), + ), + Mutation::Create( + post_nsid.clone(), + alice_post3_tid.clone(), + ipld!({"text": "alice third post"}), + ), + ]; + srv.repo + .mutate_repo(&alice_did, &mutations, &srv.pds_keypair) + .unwrap(); + bsky_mutate_db(&mut srv.atp_db, &alice_did, mutations).unwrap(); + + // bob follows alice, likes first post, reposts second, replies third + let alice_post3_uri = format!( + "at://{}/{}/{}", + alice_did.to_string(), + post_nsid.to_string(), + alice_post3_tid.to_string() + ); + let mutations = vec![ + Mutation::Create( + follow_nsid.clone(), + srv.tid_gen.next_tid(), + ipld!({"subject": {"did": alice_did.to_string()}, "createdAt": created_at_now()}), + ), + Mutation::Create( + like_nsid.clone(), + srv.tid_gen.next_tid(), + ipld!({"subject": {"uri": format!("at://{}/{}/{}", alice_did.to_string(), post_nsid.to_string(), alice_post1_tid.to_string())}, "createdAt": created_at_now()}), + ), + Mutation::Create( + repost_nsid.clone(), + srv.tid_gen.next_tid(), + ipld!({"subject": {"uri": format!("at://{}/{}/{}", alice_did.to_string(), post_nsid.to_string(), alice_post2_tid.to_string())}, "createdAt": created_at_now()}), + ), + Mutation::Create( + post_nsid.clone(), + srv.tid_gen.next_tid(), + ipld!({"text": "bob comment on alice post3", "reply": {"parent": {"uri": alice_post3_uri.clone()}, "root": {"uri": alice_post3_uri.clone()}}}), + ), + ]; + srv.repo + .mutate_repo(&bob_did, &mutations, &srv.pds_keypair) + .unwrap(); + bsky_mutate_db(&mut srv.atp_db, &bob_did, mutations).unwrap(); + + // carol follows bob + let mutations = vec![Mutation::Create( + follow_nsid.clone(), + srv.tid_gen.next_tid(), + ipld!({"subject": {"did": bob_did.to_string()}, "createdAt": created_at_now()}), + )]; + srv.repo + .mutate_repo(&bob_did, &mutations, &srv.pds_keypair) + .unwrap(); + bsky_mutate_db(&mut srv.atp_db, &carol_did, mutations).unwrap(); + + // test alice profile: counts should be updated + let alice_profile = bsky_get_profile(&mut srv, &alice_did).unwrap(); + assert_eq!(alice_profile.followersCount, 1); + assert_eq!(alice_profile.followsCount, 0); + assert_eq!(alice_profile.postsCount, 3); + + // test alice timeline: still empty (?) + let alice_timeline = bsky_get_timeline(&mut srv, &alice_did).unwrap(); + println!("{:?}", alice_timeline); + assert!(alice_timeline.feed.is_empty()); + + // test alice feed: should have 3 posts, with correct counts + let alice_feed = bsky_get_author_feed(&mut srv, &alice_did).unwrap(); + assert_eq!(alice_feed.feed.len(), 3); + + assert_eq!( + alice_feed.feed[2].uri, + format!( + "at://{}/{}/{}", + alice_did.to_string(), + post_nsid.to_string(), + alice_post1_tid.to_string() + ) + ); + // TODO: CID + assert_eq!(alice_feed.feed[2].author.did, alice_did.to_string()); + assert_eq!(alice_feed.feed[2].author.handle, "alice.test"); + assert_eq!(alice_feed.feed[2].repostedBy, None); + assert_eq!( + alice_feed.feed[2].record["text"].as_str().unwrap(), + "alice first post" + ); + assert_eq!(alice_feed.feed[2].embed, None); + assert_eq!(alice_feed.feed[2].replyCount, 0); + assert_eq!(alice_feed.feed[2].repostCount, 0); + assert_eq!(alice_feed.feed[2].likeCount, 1); + + assert_eq!(alice_feed.feed[1].author.did, alice_did.to_string()); + assert_eq!(alice_feed.feed[1].replyCount, 0); + assert_eq!(alice_feed.feed[1].repostCount, 1); + assert_eq!(alice_feed.feed[1].likeCount, 0); + + assert_eq!(alice_feed.feed[0].author.did, alice_did.to_string()); + assert_eq!(alice_feed.feed[0].replyCount, 1); + assert_eq!(alice_feed.feed[0].repostCount, 0); + assert_eq!(alice_feed.feed[0].likeCount, 0); + + // test bob timeline: should include alice posts + let bob_timeline = bsky_get_timeline(&mut srv, &bob_did).unwrap(); + println!("BOB TIMELINE ======"); + for item in bob_timeline.feed.iter() { + println!("{:?}", item); + } + assert_eq!(bob_timeline.feed.len(), 3); + assert_eq!( + bob_timeline.feed[2].uri, + format!( + "at://{}/{}/{}", + alice_did.to_string(), + post_nsid.to_string(), + alice_post1_tid.to_string() + ) + ); + // TODO: CID + assert_eq!(bob_timeline.feed[2].author.did, alice_did.to_string()); + assert_eq!(bob_timeline.feed[2].author.handle, "alice.test"); + + // test bob feed: should include repost and reply + let bob_feed = bsky_get_author_feed(&mut srv, &bob_did).unwrap(); + assert_eq!(bob_feed.feed.len(), 1); + // TODO: handle reposts + /* + assert_eq!(bob_feed.feed.len(), 2); + assert_eq!(bob_feed.feed[1].uri, format!("at://{}/{}/{}", alice_did.to_string(), post_nsid.to_string(), alice_post1_tid.to_string())); + // TODO: CID + assert_eq!(bob_feed.feed[1].author.did, alice_did.to_string()); + assert_eq!(bob_feed.feed[1].author.handle, "alice.test"); + assert_eq!(bob_feed.feed[1].repostedBy.as_ref().unwrap().did, bob_did.to_string()); + assert_eq!(bob_feed.feed[1].repostedBy.as_ref().unwrap().handle, "bob.test"); + // TODO: "is a repost" (check record?) + */ + + assert_eq!(bob_feed.feed[0].author.did, bob_did.to_string()); + assert_eq!(bob_feed.feed[0].author.handle, "bob.test"); + + // test carol timeline: should include bob's repost and reply + let carol_timeline = bsky_get_timeline(&mut srv, &carol_did).unwrap(); + // TODO: handle re-posts (+1 here) + assert_eq!(carol_timeline.feed.len(), 1); + // TODO: details + + // test carol feed: still empty + let carol_feed = bsky_get_author_feed(&mut srv, &carol_did).unwrap(); + assert!(carol_feed.feed.is_empty()); +} + +// TODO: post threads (include in the above test?) diff --git a/adenosine-pds/src/db.rs b/adenosine-pds/src/db.rs index 384b498..051588d 100644 --- a/adenosine-pds/src/db.rs +++ b/adenosine-pds/src/db.rs @@ -1,6 +1,6 @@ use crate::models::{FollowRecord, Post, RefRecord}; /// ATP database (as distinct from blockstore) -use crate::{ipld_into_json_value, AtpSession, Did, KeyPair, Tid}; +use crate::{created_at_now, ipld_into_json_value, AtpSession, Did, KeyPair, Tid}; use anyhow::{anyhow, Result}; use lazy_static::lazy_static; use libipld::cbor::DagCborCodec; @@ -167,15 +167,21 @@ impl AtpDatabase { let block = Block::<DefaultParams>::encode(DagCborCodec, Code::Sha2_256, &val)?; let cid = *block.cid(); let post: Post = serde_json::from_value(ipld_into_json_value(val))?; + let (reply_to_parent_uri, reply_to_root_uri) = match post.reply { + Some(ref reply) => (Some(reply.parent.uri.clone()), Some(reply.root.uri.clone())), + None => (None, None), + }; let mut stmt = self .conn - .prepare_cached("INSERT INTO bsky_post (did, tid, cid, record_json, created_at) VALUES (?1, ?2, ?3, ?4, ?5)")?; + .prepare_cached("INSERT INTO bsky_post (did, tid, cid, reply_to_parent_uri, reply_to_root_uri, record_json, created_at) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)")?; stmt.execute(params!( did.to_string(), tid.to_string(), cid.to_string(), + reply_to_parent_uri, + reply_to_root_uri, serde_json::to_string(&post)?, - post.createdAt + post.createdAt.unwrap_or_else(|| created_at_now()) ))?; } else { let mut stmt = self diff --git a/adenosine-pds/src/lib.rs b/adenosine-pds/src/lib.rs index c7bb336..738a061 100644 --- a/adenosine-pds/src/lib.rs +++ b/adenosine-pds/src/lib.rs @@ -138,6 +138,16 @@ impl AtpService { }) } + pub fn new_ephemeral() -> Result<Self> { + Ok(AtpService { + repo: RepoStore::open_ephemeral()?, + atp_db: AtpDatabase::open_ephemeral()?, + pds_keypair: KeyPair::new_random(), + tid_gen: TidLord::new(), + config: AtpServiceConfig::default(), + }) + } + pub fn run_server(self) -> Result<()> { let config = self.config.clone(); let srv = Mutex::new(self); @@ -389,6 +399,29 @@ fn xrpc_get_handler( }; Ok(json!(desc)) } + // =========== app.bsky methods + "app.bsky.actor.getProfile" => { + // TODO did or handle + let did = Did::from_str(&xrpc_required_param(request, "user")?)?; + let mut srv = srv.lock().unwrap(); + Ok(json!(bsky_get_profile(&mut srv, &did)?)) + } + "app.bsky.feed.getAuthorFeed" => { + // TODO did or handle + let did = Did::from_str(&xrpc_required_param(request, "author")?)?; + let mut srv = srv.lock().unwrap(); + Ok(json!(bsky_get_author_feed(&mut srv, &did)?)) + } + "app.bsky.feed.getTimeline" => { + let mut srv = srv.lock().unwrap(); + let auth_did = &xrpc_check_auth_header(&mut srv, request, None)?; + Ok(json!(bsky_get_timeline(&mut srv, &auth_did)?)) + } + "app.bsky.feed.getPostThread" => { + let uri = AtUri::from_str(&xrpc_required_param(request, "uri")?)?; + let mut srv = srv.lock().unwrap(); + Ok(json!(bsky_get_thread(&mut srv, &uri, None)?)) + } _ => Err(anyhow!(XrpcError::NotFound(format!( "XRPC endpoint handler not found: {}", method @@ -609,12 +642,6 @@ fn xrpc_post_handler( Ok(json!({})) } // =========== app.bsky methods - "app.bsky.actor.getProfile" => { - // TODO did or handle - let did = Did::from_str(&xrpc_required_param(request, "user")?)?; - let mut srv = srv.lock().unwrap(); - Ok(json!(bsky_get_profile(&mut srv, &did)?)) - } "app.bsky.actor.updateProfile" => { let profile: ProfileRecord = rouille::input::json_input(request)?; let mut srv = srv.lock().unwrap(); @@ -622,22 +649,6 @@ fn xrpc_post_handler( bsky_update_profile(&mut srv, &auth_did, profile)?; Ok(json!({})) } - "app.bsky.feed.getAuthorFeed" => { - // TODO did or handle - let did = Did::from_str(&xrpc_required_param(request, "author")?)?; - let mut srv = srv.lock().unwrap(); - Ok(json!(bsky_get_author_feed(&mut srv, &did)?)) - } - "app.bsky.feed.getTimeline" => { - let mut srv = srv.lock().unwrap(); - let auth_did = &xrpc_check_auth_header(&mut srv, request, None)?; - Ok(json!(bsky_get_timeline(&mut srv, &auth_did)?)) - } - "app.bsky.feed.getPostThread" => { - let uri = AtUri::from_str(&xrpc_required_param(request, "uri")?)?; - let mut srv = srv.lock().unwrap(); - Ok(json!(bsky_get_thread(&mut srv, &uri, None)?)) - } _ => Err(anyhow!(XrpcError::NotFound(format!( "XRPC endpoint handler not found: {}", method @@ -810,3 +821,10 @@ fn record_handler( } .render()?) } + +/// Helper to generate the current timestamp as right now, UTC, formatted as a string +pub fn created_at_now() -> String { + let now = time::OffsetDateTime::now_utc(); + now.format(&time::format_description::well_known::Rfc3339) + .unwrap() +} diff --git a/adenosine-pds/src/models.rs b/adenosine-pds/src/models.rs index 116ac53..0f47c2d 100644 --- a/adenosine-pds/src/models.rs +++ b/adenosine-pds/src/models.rs @@ -59,7 +59,8 @@ pub struct RepoBatchWrite { #[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq)] pub struct Subject { pub uri: String, - pub cid: String, + // TODO: CID is required + pub cid: Option<String>, } /// Generic over Re-post and Like @@ -122,7 +123,8 @@ pub struct User { #[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq)] pub struct FeedItem { pub uri: String, - pub cid: String, + // TODO: cid is required + pub cid: Option<String>, pub author: User, pub repostedBy: Option<User>, pub record: Value, @@ -139,11 +141,19 @@ pub struct FeedItem { #[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq)] pub struct Post { pub text: String, + pub reply: Option<PostReply>, pub createdAt: Option<String>, } #[allow(non_snake_case)] #[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq)] +pub struct PostReply { + pub parent: Subject, + pub root: Subject, +} + +#[allow(non_snake_case)] +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq)] pub struct PostThread { pub thread: ThreadItem, } @@ -152,7 +162,8 @@ pub struct PostThread { #[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq)] pub struct ThreadItem { pub uri: String, - pub cid: String, + // TODO: CID is required + pub cid: Option<String>, pub author: User, pub record: Value, //pub embed?: RecordEmbed | ExternalEmbed | UnknownEmbed, |