diff options
Diffstat (limited to 'adenosine-pds')
| -rw-r--r-- | adenosine-pds/Cargo.toml | 1 | ||||
| -rw-r--r-- | adenosine-pds/src/atp_db.sql | 15 | ||||
| -rw-r--r-- | adenosine-pds/src/bsky.rs | 372 | ||||
| -rw-r--r-- | adenosine-pds/src/db.rs | 12 | ||||
| -rw-r--r-- | adenosine-pds/src/lib.rs | 62 | ||||
| -rw-r--r-- | adenosine-pds/src/models.rs | 17 | 
6 files changed, 427 insertions, 52 deletions
| diff --git a/adenosine-pds/Cargo.toml b/adenosine-pds/Cargo.toml index 7075e65..1637164 100644 --- a/adenosine-pds/Cargo.toml +++ b/adenosine-pds/Cargo.toml @@ -44,6 +44,7 @@ bs58 = "*"  async-trait = "*"  dotenv = "*"  askama = "0.11" +time = { version = "*", features = ["formatting"] }  # for vendored iroh-car  thiserror = "1.0" diff --git a/adenosine-pds/src/atp_db.sql b/adenosine-pds/src/atp_db.sql index 71fbf6d..9a6e30c 100644 --- a/adenosine-pds/src/atp_db.sql +++ b/adenosine-pds/src/atp_db.sql @@ -33,19 +33,22 @@ CREATE TABLE bsky_post(      tid                 TEXT NOT NULL,      cid                 TEXT NOT NULL,      record_json         TEXT NOT NULL, -    reply_root_uri      TEXT, +    reply_to_parent_uri    TEXT, +    reply_to_root_uri      TEXT,      created_at          TIMESTAMP WITH TIME ZONE  NOT NULL,      indexed_at          TIMESTAMP WITH TIME ZONE  NOT NULL DEFAULT ( DATETIME('now') ),      PRIMARY KEY(did, tid)  ); -CREATE INDEX bsky_post_reply_root_uri_idx on bsky_post(reply_root_uri); +CREATE INDEX bsky_post_reply_to_parent_uri_idx on bsky_post(reply_to_parent_uri); +CREATE INDEX bsky_post_reply_to_root_uri_idx on bsky_post(reply_to_root_uri);  CREATE TABLE bsky_ref(      ref_type            TEXT NOT NULL,      did                 TEXT NOT NULL,      tid                 TEXT NOT NULL,      subject_uri         TEXT NOT NULL, -    subject_cid         TEXT NOT NULL, +    -- TODO: NOT NULL on subject_cid +    subject_cid         TEXT,      created_at          TIMESTAMP WITH TIME ZONE  NOT NULL,      indexed_at          TIMESTAMP WITH TIME ZONE  NOT NULL DEFAULT ( DATETIME('now') ),      PRIMARY KEY(ref_type, did, tid) @@ -56,7 +59,8 @@ CREATE TABLE bsky_follow(      did                 TEXT NOT NULL,      tid                 TEXT NOT NULL,      subject_did         TEXT NOT NULL, -    subject_cid         TEXT NOT NULL, +    -- TODO: NOT NULL on subject_cid +    subject_cid         TEXT,      created_at          TIMESTAMP WITH TIME ZONE  NOT NULL,      indexed_at          TIMESTAMP WITH TIME ZONE  NOT NULL DEFAULT ( DATETIME('now') ),      PRIMARY KEY(did, tid) @@ -68,7 +72,8 @@ CREATE TABLE bsky_notification(      pk                  INTEGER PRIMARY KEY AUTOINCREMENT,      user_did            TEXT NOT NULL,      subject_uri         TEXT NOT NULL, -    subject_cid         TEXT NOT NULL, +    -- TODO: NOT NULL on subject_cid +    subject_cid         TEXT,      reason              TEXT NOT NULL,      seen_at             TIMESTAMP WITH TIME ZONE,      indexed_at          TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT ( DATETIME('now') ) diff --git a/adenosine-pds/src/bsky.rs b/adenosine-pds/src/bsky.rs index eeab9e3..66b05eb 100644 --- a/adenosine-pds/src/bsky.rs +++ b/adenosine-pds/src/bsky.rs @@ -91,14 +91,13 @@ pub fn bsky_get_profile(srv: &mut AtpService, did: &Did) -> Result<Profile> {      let mut stmt = srv          .atp_db          .conn -        .prepare_cached("SELECT COUNT(*) FROM bsky_ref WHERE ref_type = 'follow' AND did = $1")?; +        .prepare_cached("SELECT COUNT(*) FROM bsky_follow WHERE did = $1")?;      let follows_count: u64 = stmt.query_row(params!(did.to_string()), |row| row.get(0))?; -    let uri = format!("at://{}", did);      let mut stmt = srv          .atp_db          .conn -        .prepare_cached("SELECT COUNT(*) FROM bsky_ref WHERE ref_type = 'follow' AND uri = $1")?; -    let followers_count: u64 = stmt.query_row(params!(uri), |row| row.get(0))?; +        .prepare_cached("SELECT COUNT(*) FROM bsky_follow WHERE subject_did = $1")?; +    let followers_count: u64 = stmt.query_row(params!(did.to_string()), |row| row.get(0))?;      Ok(Profile {          did: did.to_string(),          handle: handle, @@ -120,7 +119,7 @@ pub fn bsky_update_profile(srv: &mut AtpService, did: &Did, profile: ProfileReco      let prefix = "/app.bsky.actor.profile/";      for (mst_key, _cid) in full_map.iter() {          if mst_key.starts_with(&prefix) { -            profile_tid = Some(Tid::from_str(mst_key.split('/').nth(1).unwrap())?); +            profile_tid = Some(Tid::from_str(mst_key.split('/').nth(2).unwrap())?);          }      }      let profile_tid: Tid = profile_tid.unwrap_or(srv.tid_gen.next_tid()); @@ -139,6 +138,7 @@ struct FeedRow {      pub item_handle: String,      pub item_post_tid: Tid,      pub item_post_cid: Cid, +    pub indexed_at: String,  }  fn feed_row(row: &rusqlite::Row) -> Result<FeedRow> { @@ -149,45 +149,66 @@ fn feed_row(row: &rusqlite::Row) -> Result<FeedRow> {      let item_post_tid = Tid::from_str(&item_post_tid)?;      let cid_string: String = row.get(3)?;      let item_post_cid = Cid::from_str(&cid_string)?; +    let indexed_at: String = row.get(4)?;      Ok(FeedRow {          item_did,          item_handle,          item_post_tid,          item_post_cid, +        indexed_at,      })  }  fn feed_row_to_item(srv: &mut AtpService, row: FeedRow) -> Result<FeedItem> {      let record_ipld = srv.repo.get_ipld(&row.item_post_cid)?; +    let uri = format!( +        "at://{}/{}/{}", +        row.item_did, "app.bsky.feed.post", row.item_post_tid +    ); + +    let mut stmt = srv.atp_db.conn.prepare_cached( +        "SELECT COUNT(*) FROM bsky_ref WHERE ref_type = 'like' AND subject_uri = $1", +    )?; +    let like_count: u64 = stmt.query_row(params!(uri), |row| row.get(0))?; + +    let mut stmt = srv.atp_db.conn.prepare_cached( +        "SELECT COUNT(*) FROM bsky_ref WHERE ref_type = 'repost' AND subject_uri = $1", +    )?; +    let repost_count: u64 = stmt.query_row(params!(uri), |row| row.get(0))?; + +    let mut stmt = srv +        .atp_db +        .conn +        .prepare_cached("SELECT COUNT(*) FROM bsky_post WHERE reply_to_parent_uri = $1")?; +    let reply_count: u64 = stmt.query_row(params!(uri), |row| row.get(0))?; +      let feed_item = FeedItem { -        uri: format!( -            "at://{}/{}/{}", -            row.item_did, "app.bsky.feed.post", row.item_post_tid -        ), -        cid: row.item_post_cid.to_string(), +        uri: uri, +        cid: Some(row.item_post_cid.to_string()),          author: User {              did: row.item_did.to_string(),              handle: row.item_handle, -            displayName: None, // TODO +            displayName: None, // TODO: fetch from profile (or cache)          },          repostedBy: None,          record: ipld_into_json_value(record_ipld),          embed: None, -        replyCount: 0,                 // TODO -        repostCount: 0,                // TODO -        likeCount: 0,                  // TODO -        indexedAt: "TODO".to_string(), // TODO +        replyCount: reply_count, +        repostCount: repost_count, +        likeCount: like_count, +        indexedAt: row.indexed_at,          myState: None,      };      Ok(feed_item)  } -pub fn bsky_get_author_feed(srv: &mut AtpService, did: &Did) -> Result<GenericFeed> { +pub fn bsky_get_timeline(srv: &mut AtpService, did: &Did) -> Result<GenericFeed> {      let mut feed: Vec<FeedItem> = vec![]; +    // TODO: also handle reposts      let rows = {          let mut stmt = srv.atp_db              .conn -            .prepare_cached("SELECT account.did, account.handle, bsky_post.tid, bsky_post.cid, FROM bsky_post LEFT JOIN account ON bsky_post.did = account.did LEFT JOIN bsky_follow ON bsky_post.did = bsky_follow.subject_did WHERE bsky_follow.did = ?1 ORDER BY bsky_post.tid DESC LIMIT 20")?; +            .prepare_cached("SELECT account.did, account.handle, bsky_post.tid, bsky_post.cid, bsky_post.indexed_at FROM bsky_post LEFT JOIN account ON bsky_post.did = account.did LEFT JOIN bsky_follow ON bsky_post.did = bsky_follow.subject_did WHERE bsky_follow.did = ?1 AND account.did IS NOT NULL ORDER BY bsky_post.tid DESC LIMIT 20")?;          let mut sql_rows = stmt.query(params!(did.to_string()))?;          let mut rows: Vec<FeedRow> = vec![];          while let Some(sql_row) = sql_rows.next()? { @@ -202,12 +223,13 @@ pub fn bsky_get_author_feed(srv: &mut AtpService, did: &Did) -> Result<GenericFe      Ok(GenericFeed { feed })  } -pub fn bsky_get_timeline(srv: &mut AtpService, did: &Did) -> Result<GenericFeed> { +pub fn bsky_get_author_feed(srv: &mut AtpService, did: &Did) -> Result<GenericFeed> {      let mut feed: Vec<FeedItem> = vec![]; +    // TODO: also handle reposts      let rows = {          let mut stmt = srv.atp_db              .conn -            .prepare_cached("SELECT account.did, account.handle, bsky_post.tid, bsky_post.cid, FROM bsky_post LEFT JOIN account ON bsky_post.did = account.did WHERE bsky_post.did = ?1 ORDER BY bsky_post.tid DESC LIMIT 20")?; +            .prepare_cached("SELECT account.did, account.handle, bsky_post.tid, bsky_post.cid, bsky_post.indexed_at FROM bsky_post LEFT JOIN account ON bsky_post.did = account.did WHERE bsky_post.did = ?1 ORDER BY bsky_post.tid DESC LIMIT 20")?;          let mut sql_rows = stmt.query(params!(did.to_string()))?;          let mut rows: Vec<FeedRow> = vec![];          while let Some(sql_row) = sql_rows.next()? { @@ -231,3 +253,315 @@ pub fn bsky_get_thread(      // start?      unimplemented!()  } + +#[test] +fn test_bsky_profile() { +    use crate::{create_account, created_at_now}; +    use libipld::ipld; + +    let post_nsid = Nsid::from_str("app.bsky.feed.post").unwrap(); +    let follow_nsid = Nsid::from_str("app.bsky.graph.follow").unwrap(); + +    let mut srv = AtpService::new_ephemeral().unwrap(); +    let req = AccountRequest { +        email: "test@bogus.com".to_string(), +        handle: "handle.test".to_string(), +        password: "bogus".to_string(), +        inviteCode: None, +        recoveryKey: None, +    }; +    let session = create_account(&mut srv, &req, true).unwrap(); +    let did = Did::from_str(&session.did).unwrap(); +    let profile = bsky_get_profile(&mut srv, &did).unwrap(); +    assert_eq!(profile.did, session.did); +    assert_eq!(profile.handle, req.handle); +    assert_eq!(profile.displayName, None); +    assert_eq!(profile.description, None); +    assert_eq!(profile.followersCount, 0); +    assert_eq!(profile.followsCount, 0); +    assert_eq!(profile.postsCount, 0); + +    let record = ProfileRecord { +        displayName: "Test Name".to_string(), +        description: Some("short description".to_string()), +    }; +    bsky_update_profile(&mut srv, &did, record.clone()).unwrap(); +    let profile = bsky_get_profile(&mut srv, &did).unwrap(); +    assert_eq!(profile.displayName, Some(record.displayName)); +    assert_eq!(profile.description, record.description); + +    let record = ProfileRecord { +        displayName: "New Test Name".to_string(), +        description: Some("longer description".to_string()), +    }; +    bsky_update_profile(&mut srv, &did, record.clone()).unwrap(); +    let profile = bsky_get_profile(&mut srv, &did).unwrap(); +    assert_eq!(profile.displayName, Some(record.displayName)); +    assert_eq!(profile.description, record.description); + +    let mutations = vec![ +        Mutation::Create( +            follow_nsid.clone(), +            srv.tid_gen.next_tid(), +            ipld!({"subject": {"did": session.did}, "createdAt": created_at_now()}), +        ), +        Mutation::Create( +            follow_nsid.clone(), +            srv.tid_gen.next_tid(), +            ipld!({"subject": {"did": "did:web:external.domain"}, "createdAt": created_at_now()}), +        ), +        Mutation::Create( +            post_nsid.clone(), +            srv.tid_gen.next_tid(), +            ipld!({"text": "first post"}), +        ), +        Mutation::Create( +            post_nsid.clone(), +            srv.tid_gen.next_tid(), +            ipld!({"text": "second post"}), +        ), +        Mutation::Create( +            post_nsid.clone(), +            srv.tid_gen.next_tid(), +            ipld!({"text": "third post"}), +        ), +    ]; +    srv.repo +        .mutate_repo(&did, &mutations, &srv.pds_keypair) +        .unwrap(); +    bsky_mutate_db(&mut srv.atp_db, &did, mutations).unwrap(); + +    let profile = bsky_get_profile(&mut srv, &did).unwrap(); +    assert_eq!(profile.followersCount, 1); +    assert_eq!(profile.followsCount, 2); +    assert_eq!(profile.postsCount, 3); +} + +#[test] +fn test_bsky_feeds() { +    use crate::{create_account, created_at_now}; +    use libipld::ipld; + +    let post_nsid = Nsid::from_str("app.bsky.feed.post").unwrap(); +    let like_nsid = Nsid::from_str("app.bsky.feed.like").unwrap(); +    let repost_nsid = Nsid::from_str("app.bsky.feed.repost").unwrap(); +    let follow_nsid = Nsid::from_str("app.bsky.graph.follow").unwrap(); + +    let mut srv = AtpService::new_ephemeral().unwrap(); +    let alice_did = { +        let req = AccountRequest { +            email: "alice@bogus.com".to_string(), +            handle: "alice.test".to_string(), +            password: "bogus".to_string(), +            inviteCode: None, +            recoveryKey: None, +        }; +        let session = create_account(&mut srv, &req, true).unwrap(); +        Did::from_str(&session.did).unwrap() +    }; +    let bob_did = { +        let req = AccountRequest { +            email: "bob@bogus.com".to_string(), +            handle: "bob.test".to_string(), +            password: "bogus".to_string(), +            inviteCode: None, +            recoveryKey: None, +        }; +        let session = create_account(&mut srv, &req, true).unwrap(); +        Did::from_str(&session.did).unwrap() +    }; +    let carol_did = { +        let req = AccountRequest { +            email: "carol@bogus.com".to_string(), +            handle: "carol.test".to_string(), +            password: "bogus".to_string(), +            inviteCode: None, +            recoveryKey: None, +        }; +        let session = create_account(&mut srv, &req, true).unwrap(); +        Did::from_str(&session.did).unwrap() +    }; + +    // all feeds and timelines should be empty +    let alice_feed = bsky_get_author_feed(&mut srv, &alice_did).unwrap(); +    let alice_timeline = bsky_get_timeline(&mut srv, &alice_did).unwrap(); +    assert!(alice_feed.feed.is_empty()); +    assert!(alice_timeline.feed.is_empty()); +    let bob_feed = bsky_get_author_feed(&mut srv, &bob_did).unwrap(); +    let bob_timeline = bsky_get_timeline(&mut srv, &bob_did).unwrap(); +    assert!(bob_feed.feed.is_empty()); +    assert!(bob_timeline.feed.is_empty()); +    let carol_feed = bsky_get_author_feed(&mut srv, &carol_did).unwrap(); +    let carol_timeline = bsky_get_timeline(&mut srv, &carol_did).unwrap(); +    assert!(carol_feed.feed.is_empty()); +    assert!(carol_timeline.feed.is_empty()); + +    // alice does some posts +    let alice_post1_tid = srv.tid_gen.next_tid(); +    let alice_post2_tid = srv.tid_gen.next_tid(); +    let alice_post3_tid = srv.tid_gen.next_tid(); +    assert!(alice_post1_tid < alice_post2_tid && alice_post2_tid < alice_post3_tid); +    let mutations = vec![ +        Mutation::Create( +            post_nsid.clone(), +            alice_post1_tid.clone(), +            ipld!({"text": "alice first post"}), +        ), +        Mutation::Create( +            post_nsid.clone(), +            alice_post2_tid.clone(), +            ipld!({"text": "alice second post"}), +        ), +        Mutation::Create( +            post_nsid.clone(), +            alice_post3_tid.clone(), +            ipld!({"text": "alice third post"}), +        ), +    ]; +    srv.repo +        .mutate_repo(&alice_did, &mutations, &srv.pds_keypair) +        .unwrap(); +    bsky_mutate_db(&mut srv.atp_db, &alice_did, mutations).unwrap(); + +    // bob follows alice, likes first post, reposts second, replies third +    let alice_post3_uri = format!( +        "at://{}/{}/{}", +        alice_did.to_string(), +        post_nsid.to_string(), +        alice_post3_tid.to_string() +    ); +    let mutations = vec![ +        Mutation::Create( +            follow_nsid.clone(), +            srv.tid_gen.next_tid(), +            ipld!({"subject": {"did": alice_did.to_string()}, "createdAt": created_at_now()}), +        ), +        Mutation::Create( +            like_nsid.clone(), +            srv.tid_gen.next_tid(), +            ipld!({"subject": {"uri": format!("at://{}/{}/{}", alice_did.to_string(), post_nsid.to_string(), alice_post1_tid.to_string())}, "createdAt": created_at_now()}), +        ), +        Mutation::Create( +            repost_nsid.clone(), +            srv.tid_gen.next_tid(), +            ipld!({"subject": {"uri": format!("at://{}/{}/{}", alice_did.to_string(), post_nsid.to_string(), alice_post2_tid.to_string())}, "createdAt": created_at_now()}), +        ), +        Mutation::Create( +            post_nsid.clone(), +            srv.tid_gen.next_tid(), +            ipld!({"text": "bob comment on alice post3", "reply": {"parent": {"uri": alice_post3_uri.clone()}, "root": {"uri": alice_post3_uri.clone()}}}), +        ), +    ]; +    srv.repo +        .mutate_repo(&bob_did, &mutations, &srv.pds_keypair) +        .unwrap(); +    bsky_mutate_db(&mut srv.atp_db, &bob_did, mutations).unwrap(); + +    // carol follows bob +    let mutations = vec![Mutation::Create( +        follow_nsid.clone(), +        srv.tid_gen.next_tid(), +        ipld!({"subject": {"did": bob_did.to_string()}, "createdAt": created_at_now()}), +    )]; +    srv.repo +        .mutate_repo(&bob_did, &mutations, &srv.pds_keypair) +        .unwrap(); +    bsky_mutate_db(&mut srv.atp_db, &carol_did, mutations).unwrap(); + +    // test alice profile: counts should be updated +    let alice_profile = bsky_get_profile(&mut srv, &alice_did).unwrap(); +    assert_eq!(alice_profile.followersCount, 1); +    assert_eq!(alice_profile.followsCount, 0); +    assert_eq!(alice_profile.postsCount, 3); + +    // test alice timeline: still empty (?) +    let alice_timeline = bsky_get_timeline(&mut srv, &alice_did).unwrap(); +    println!("{:?}", alice_timeline); +    assert!(alice_timeline.feed.is_empty()); + +    // test alice feed: should have 3 posts, with correct counts +    let alice_feed = bsky_get_author_feed(&mut srv, &alice_did).unwrap(); +    assert_eq!(alice_feed.feed.len(), 3); + +    assert_eq!( +        alice_feed.feed[2].uri, +        format!( +            "at://{}/{}/{}", +            alice_did.to_string(), +            post_nsid.to_string(), +            alice_post1_tid.to_string() +        ) +    ); +    // TODO: CID +    assert_eq!(alice_feed.feed[2].author.did, alice_did.to_string()); +    assert_eq!(alice_feed.feed[2].author.handle, "alice.test"); +    assert_eq!(alice_feed.feed[2].repostedBy, None); +    assert_eq!( +        alice_feed.feed[2].record["text"].as_str().unwrap(), +        "alice first post" +    ); +    assert_eq!(alice_feed.feed[2].embed, None); +    assert_eq!(alice_feed.feed[2].replyCount, 0); +    assert_eq!(alice_feed.feed[2].repostCount, 0); +    assert_eq!(alice_feed.feed[2].likeCount, 1); + +    assert_eq!(alice_feed.feed[1].author.did, alice_did.to_string()); +    assert_eq!(alice_feed.feed[1].replyCount, 0); +    assert_eq!(alice_feed.feed[1].repostCount, 1); +    assert_eq!(alice_feed.feed[1].likeCount, 0); + +    assert_eq!(alice_feed.feed[0].author.did, alice_did.to_string()); +    assert_eq!(alice_feed.feed[0].replyCount, 1); +    assert_eq!(alice_feed.feed[0].repostCount, 0); +    assert_eq!(alice_feed.feed[0].likeCount, 0); + +    // test bob timeline: should include alice posts +    let bob_timeline = bsky_get_timeline(&mut srv, &bob_did).unwrap(); +    println!("BOB TIMELINE ======"); +    for item in bob_timeline.feed.iter() { +        println!("{:?}", item); +    } +    assert_eq!(bob_timeline.feed.len(), 3); +    assert_eq!( +        bob_timeline.feed[2].uri, +        format!( +            "at://{}/{}/{}", +            alice_did.to_string(), +            post_nsid.to_string(), +            alice_post1_tid.to_string() +        ) +    ); +    // TODO: CID +    assert_eq!(bob_timeline.feed[2].author.did, alice_did.to_string()); +    assert_eq!(bob_timeline.feed[2].author.handle, "alice.test"); + +    // test bob feed: should include repost and reply +    let bob_feed = bsky_get_author_feed(&mut srv, &bob_did).unwrap(); +    assert_eq!(bob_feed.feed.len(), 1); +    // TODO: handle reposts +    /* +    assert_eq!(bob_feed.feed.len(), 2); +    assert_eq!(bob_feed.feed[1].uri, format!("at://{}/{}/{}", alice_did.to_string(), post_nsid.to_string(), alice_post1_tid.to_string())); +    // TODO: CID +    assert_eq!(bob_feed.feed[1].author.did, alice_did.to_string()); +    assert_eq!(bob_feed.feed[1].author.handle, "alice.test"); +    assert_eq!(bob_feed.feed[1].repostedBy.as_ref().unwrap().did, bob_did.to_string()); +    assert_eq!(bob_feed.feed[1].repostedBy.as_ref().unwrap().handle, "bob.test"); +    // TODO: "is a repost" (check record?) +    */ + +    assert_eq!(bob_feed.feed[0].author.did, bob_did.to_string()); +    assert_eq!(bob_feed.feed[0].author.handle, "bob.test"); + +    // test carol timeline: should include bob's repost and reply +    let carol_timeline = bsky_get_timeline(&mut srv, &carol_did).unwrap(); +    // TODO: handle re-posts (+1 here) +    assert_eq!(carol_timeline.feed.len(), 1); +    // TODO: details + +    // test carol feed: still empty +    let carol_feed = bsky_get_author_feed(&mut srv, &carol_did).unwrap(); +    assert!(carol_feed.feed.is_empty()); +} + +// TODO: post threads (include in the above test?) diff --git a/adenosine-pds/src/db.rs b/adenosine-pds/src/db.rs index 384b498..051588d 100644 --- a/adenosine-pds/src/db.rs +++ b/adenosine-pds/src/db.rs @@ -1,6 +1,6 @@  use crate::models::{FollowRecord, Post, RefRecord};  /// ATP database (as distinct from blockstore) -use crate::{ipld_into_json_value, AtpSession, Did, KeyPair, Tid}; +use crate::{created_at_now, ipld_into_json_value, AtpSession, Did, KeyPair, Tid};  use anyhow::{anyhow, Result};  use lazy_static::lazy_static;  use libipld::cbor::DagCborCodec; @@ -167,15 +167,21 @@ impl AtpDatabase {              let block = Block::<DefaultParams>::encode(DagCborCodec, Code::Sha2_256, &val)?;              let cid = *block.cid();              let post: Post = serde_json::from_value(ipld_into_json_value(val))?; +            let (reply_to_parent_uri, reply_to_root_uri) = match post.reply { +                Some(ref reply) => (Some(reply.parent.uri.clone()), Some(reply.root.uri.clone())), +                None => (None, None), +            };              let mut stmt = self                  .conn -                .prepare_cached("INSERT INTO bsky_post (did, tid, cid, record_json, created_at) VALUES (?1, ?2, ?3, ?4, ?5)")?; +                .prepare_cached("INSERT INTO bsky_post (did, tid, cid, reply_to_parent_uri, reply_to_root_uri, record_json, created_at) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)")?;              stmt.execute(params!(                  did.to_string(),                  tid.to_string(),                  cid.to_string(), +                reply_to_parent_uri, +                reply_to_root_uri,                  serde_json::to_string(&post)?, -                post.createdAt +                post.createdAt.unwrap_or_else(|| created_at_now())              ))?;          } else {              let mut stmt = self diff --git a/adenosine-pds/src/lib.rs b/adenosine-pds/src/lib.rs index c7bb336..738a061 100644 --- a/adenosine-pds/src/lib.rs +++ b/adenosine-pds/src/lib.rs @@ -138,6 +138,16 @@ impl AtpService {          })      } +    pub fn new_ephemeral() -> Result<Self> { +        Ok(AtpService { +            repo: RepoStore::open_ephemeral()?, +            atp_db: AtpDatabase::open_ephemeral()?, +            pds_keypair: KeyPair::new_random(), +            tid_gen: TidLord::new(), +            config: AtpServiceConfig::default(), +        }) +    } +      pub fn run_server(self) -> Result<()> {          let config = self.config.clone();          let srv = Mutex::new(self); @@ -389,6 +399,29 @@ fn xrpc_get_handler(              };              Ok(json!(desc))          } +        // =========== app.bsky methods +        "app.bsky.actor.getProfile" => { +            // TODO did or handle +            let did = Did::from_str(&xrpc_required_param(request, "user")?)?; +            let mut srv = srv.lock().unwrap(); +            Ok(json!(bsky_get_profile(&mut srv, &did)?)) +        } +        "app.bsky.feed.getAuthorFeed" => { +            // TODO did or handle +            let did = Did::from_str(&xrpc_required_param(request, "author")?)?; +            let mut srv = srv.lock().unwrap(); +            Ok(json!(bsky_get_author_feed(&mut srv, &did)?)) +        } +        "app.bsky.feed.getTimeline" => { +            let mut srv = srv.lock().unwrap(); +            let auth_did = &xrpc_check_auth_header(&mut srv, request, None)?; +            Ok(json!(bsky_get_timeline(&mut srv, &auth_did)?)) +        } +        "app.bsky.feed.getPostThread" => { +            let uri = AtUri::from_str(&xrpc_required_param(request, "uri")?)?; +            let mut srv = srv.lock().unwrap(); +            Ok(json!(bsky_get_thread(&mut srv, &uri, None)?)) +        }          _ => Err(anyhow!(XrpcError::NotFound(format!(              "XRPC endpoint handler not found: {}",              method @@ -609,12 +642,6 @@ fn xrpc_post_handler(              Ok(json!({}))          }          // =========== app.bsky methods -        "app.bsky.actor.getProfile" => { -            // TODO did or handle -            let did = Did::from_str(&xrpc_required_param(request, "user")?)?; -            let mut srv = srv.lock().unwrap(); -            Ok(json!(bsky_get_profile(&mut srv, &did)?)) -        }          "app.bsky.actor.updateProfile" => {              let profile: ProfileRecord = rouille::input::json_input(request)?;              let mut srv = srv.lock().unwrap(); @@ -622,22 +649,6 @@ fn xrpc_post_handler(              bsky_update_profile(&mut srv, &auth_did, profile)?;              Ok(json!({}))          } -        "app.bsky.feed.getAuthorFeed" => { -            // TODO did or handle -            let did = Did::from_str(&xrpc_required_param(request, "author")?)?; -            let mut srv = srv.lock().unwrap(); -            Ok(json!(bsky_get_author_feed(&mut srv, &did)?)) -        } -        "app.bsky.feed.getTimeline" => { -            let mut srv = srv.lock().unwrap(); -            let auth_did = &xrpc_check_auth_header(&mut srv, request, None)?; -            Ok(json!(bsky_get_timeline(&mut srv, &auth_did)?)) -        } -        "app.bsky.feed.getPostThread" => { -            let uri = AtUri::from_str(&xrpc_required_param(request, "uri")?)?; -            let mut srv = srv.lock().unwrap(); -            Ok(json!(bsky_get_thread(&mut srv, &uri, None)?)) -        }          _ => Err(anyhow!(XrpcError::NotFound(format!(              "XRPC endpoint handler not found: {}",              method @@ -810,3 +821,10 @@ fn record_handler(      }      .render()?)  } + +/// Helper to generate the current timestamp as right now, UTC, formatted as a string +pub fn created_at_now() -> String { +    let now = time::OffsetDateTime::now_utc(); +    now.format(&time::format_description::well_known::Rfc3339) +        .unwrap() +} diff --git a/adenosine-pds/src/models.rs b/adenosine-pds/src/models.rs index 116ac53..0f47c2d 100644 --- a/adenosine-pds/src/models.rs +++ b/adenosine-pds/src/models.rs @@ -59,7 +59,8 @@ pub struct RepoBatchWrite {  #[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq)]  pub struct Subject {      pub uri: String, -    pub cid: String, +    // TODO: CID is required +    pub cid: Option<String>,  }  /// Generic over Re-post and Like @@ -122,7 +123,8 @@ pub struct User {  #[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq)]  pub struct FeedItem {      pub uri: String, -    pub cid: String, +    // TODO: cid is required +    pub cid: Option<String>,      pub author: User,      pub repostedBy: Option<User>,      pub record: Value, @@ -139,11 +141,19 @@ pub struct FeedItem {  #[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq)]  pub struct Post {      pub text: String, +    pub reply: Option<PostReply>,      pub createdAt: Option<String>,  }  #[allow(non_snake_case)]  #[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq)] +pub struct PostReply { +    pub parent: Subject, +    pub root: Subject, +} + +#[allow(non_snake_case)] +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq)]  pub struct PostThread {      pub thread: ThreadItem,  } @@ -152,7 +162,8 @@ pub struct PostThread {  #[derive(Debug, serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq)]  pub struct ThreadItem {      pub uri: String, -    pub cid: String, +    // TODO: CID is required +    pub cid: Option<String>,      pub author: User,      pub record: Value,      //pub embed?: RecordEmbed | ExternalEmbed | UnknownEmbed, | 
