feed-generatorでBlueskyのカスタムフィードを作ろう

Blueskyも登録してみました。

フィードが便利なのですが、フィードの作れるskyfeedは7日保存だったりたまに重くなるので自鯖に立ててみました。Bluesky公式のfeed-generatorとDockerを使用します。

参考サイト:

Github – ATProto Feed Generator

feed-generatorを使ってBlueskyのCustomFeedを作る

カスタムフィードを作ってみる?

feed-generatorはリアルタイムで全ての投稿が流れてくる中から自鯖のDBにデータを選んで保存し、そのDBからフィードを作成してフィードのjsonを返します。

AT protocolの思想にcraving indexerがBluesky鯖の外部に作成されて分散型でも鯖を跨いでデータを収集し検索や統計に使用されるという要素が有り、フィード作成はその思想の一端を担っている印象です。(現時点まだ鯖間の連合はできないですが…)

続きを読む

カスタムフィードの作成

準備

あんまり重く無さそうなので自宅鯖ではなくVPSサーバーに直置きにしました。

カスタムフィードはDockerで動かし、ホストOS側のNginxでリバースプロキシさせます。Docker, NginxはホストOSにインストール済みの状態です。

まずドメインが必要なので「bsfeed.estampie.work」の証明書の取得をしておきました。Nginxで80ポート設定とAレコードがちゃんと設定出来ていれば大丈夫なはず。

sudo certbot --nginx -d bsfeed.estampie.work 

feed-generatorの編集

Github – ATProto Feed Generatorをforkして自鯖にcloneします。とりあえず元のアドレスで

mkdir bluesky
cd bluesky
git clone https://github.com/bluesky-social/feed-generator.git

feed-generatorフォルダ内のファイルを編集していきます。

/.env

環境変数ファイルを設定します。.env.example.envにリネームして書き換えます。

DIDはhttps://bsky.social/xrpc/com.atproto.identity.resolveHandle?handle={ID(user.bsky.socialなど)}にアクセスすると表示される「did:plc:…(ランダム文字列)」です。

DBのsqliteはデフォルトではメモリへ保存なので.dbファイルへと書き換えます。

# Whichever port you want to run this on 
FEEDGEN_PORT=3000

# Change this to use a different bind address
FEEDGEN_LISTENHOST="feed-generator"

# Set to something like db.sqlite to store persistently
FEEDGEN_SQLITE_LOCATION="./db/mydb.db" // ここが:memory:だとメモリ保存になってしまうのでDB名を設定

# Don't change unless you're working in a different environment than the primary Bluesky network
FEEDGEN_SUBSCRIPTION_ENDPOINT="wss://bsky.network"

# Set this to the hostname that you intend to run the service at
FEEDGEN_HOSTNAME="bsfeed.estampie.work" // 自鯖ホスト

# Set this to the DID of the account you'll use to publish the feed
# You can find your accounts DID by going to
# https://bsky.social/xrpc/com.atproto.identity.resolveHandle?handle=${YOUR_HANDLE}
FEEDGEN_PUBLISHER_DID="自分のDID" // ここも書き換え

# Only use this if you want a service did different from did:web
# FEEDGEN_SERVICE_DID="did:plc:abcde..."

# Delay between reconnect attempts to the firehose subscription endpoint (in milliseconds)
FEEDGEN_SUBSCRIPTION_RECONNECT_DELAY=3000

ここからなのですが、サンプルのsrc/algos/whats-alf.tsを読むとsrc/subscription.tsで先に投稿を「alf」を含む投稿で絞ってDBに取り込んでから表示しているようなのですが、Skyfeedのように後から条件を増やしたいのでちょっと編集します。

subscription.tsである程度投稿を条件で絞り込んでDBに取り込み、src/algos/*.tsでフィルタリングしてフィードを作成します。本文でフィルタリングするためにDBのカラムにtextフィールドを作成し、本文も取り込んであげます。

下記コメント部分を追記していきます。

src/db/migrations.ts

migrations['001'] = {
  async up(db: Kysely<unknown>) {
    await db.schema
      .createTable('post')
      .addColumn('uri', 'varchar', (col) => col.primaryKey())
      .addColumn('cid', 'varchar', (col) => col.notNull())
      .addColumn('text', 'text', (col) => col.notNull()) // textフィールドを追加
      .addColumn('replyParent', 'varchar')
      .addColumn('replyRoot', 'varchar')
      .addColumn('indexedAt', 'varchar', (col) => col.notNull())
      .execute()

src/db/schema.ts

export type Post = {
  uri: string
  cid: string
  text: string; // textフィールドを追加
  replyParent: string | null
  replyRoot: string | null
  indexedAt: string
}

src/subscription.ts

subscription.ts.filter((create) =>の先にDBに取り込む投稿の条件を付与します。

下記では本文に「ひよこ」「にわとり」を含む投稿をDBに取り込みます。

  async handleEvent(evt: RepoEvent) {
    if (!isCommit(evt)) return
    const ops = await getOpsByType(evt)

    const postsToDelete = ops.posts.deletes.map((del) => del.uri)
    const postsToCreate = ops.posts.creates
      .filter((create) => {
        return create.record.text.includes('ひよこ') || create.record.text.includes('にわとり');
      })
      .map((create) => {
        return {
          uri: create.uri,
          cid: create.cid,
          text: create.record.text, // textフィールドを追加
          replyParent: create.record?.reply?.parent.uri ?? null,
          replyRoot: create.record?.reply?.root.uri ?? null,
          indexedAt: new Date().toISOString(),
        }
      })

フィードの作成

ひよこフィードとにわとりフィードをそれぞれ作成する想定です。

src/algos/whats-alf.tsをコピーして同フォルダに「hiyoko.ts」「niwatori.ts」を作成します

DBに取り込んだ投稿からさらにフィルタリングの条件を追加できます。

src/algos/hiyoko.ts

import { InvalidRequestError } from '@atproto/xrpc-server'
import { QueryParams } from '../lexicon/types/app/bsky/feed/getFeedSkeleton'
import { AppContext } from '../config'

export const shortname = 'hiyoko' // ここを編集

export const handler = async (ctx: AppContext, params: QueryParams) => {
    let builder = ctx.db
        .selectFrom('post')
        .selectAll()
        .where('text', 'like', '%ひよこ%') // DBに取り込んだ投稿からさらにフィルタリングの条件を追加
        .where('text', 'not like', '%うさぎ%') // 特定の文字列を除外もできる
        .orderBy('indexedAt', 'desc')
        .orderBy('cid', 'desc')
        .limit(params.limit);

    if (params.cursor) {
        const [indexedAt, cid] = params.cursor.split('::')
        if (!indexedAt || !cid) {
            throw new InvalidRequestError('malformed cursor')
        }
        const timeStr = new Date(parseInt(indexedAt, 10)).toISOString()
        builder = builder
            .where('post.indexedAt', '<', timeStr)
            .where('post.cid', '<', cid)
    }
    const res = await builder.execute()

    const feed = res.map((row) => ({
        post: row.uri,
    }))

    let cursor: string | undefined
    const last = res.at(-1)
    if (last) {
        cursor = `${new Date(last.indexedAt).getTime()}::${last.cid}`
    }

    return {
        cursor,
        feed,
    }
}

niwatori.tsも同様に作成します。

src/algos/index.ts

同フォルダのindex.tsに上記で作成したフィードを追加します。

import { AppContext } from '../config'
import {
  QueryParams,
  OutputSchema as AlgoOutput,
} from '../lexicon/types/app/bsky/feed/getFeedSkeleton' // 以下書き換え
import * as hiyoko from './hiyoko'
import * as niwatori from './niwatori'

type AlgoHandler = (ctx: AppContext, params: QueryParams) => Promise<AlgoOutput>

const algos: Record<string, AlgoHandler> = { // 以下書き換え
  [hiyoko.shortname]: hiyoko.handler,
  [niwatori.shortname]: niwatori.handler,
}

export default algos

Dockerfileの作成

feed-generatorの一つ上のディレクトリにDockerfiledocker-compose.yamlを作成します。

Dockerfileはこちらの参考サイトのものをそのままコピペさせていただいています。

Dockerfile

FROM node:18-alpine
WORKDIR /build
COPY ./feed-generator /build

RUN npm install
RUN npm run build


FROM node:18-alpine
ENV NODE_ENV production
WORKDIR /app
RUN apk add --no-cache tini
COPY --chown=node:node --from=0 /build/dist /app
COPY --chown=node:node --from=0 /build/package.json /app/package.json
COPY --chown=node:node --from=0 /build/.env /app/.env

RUN npm install --omit=dev

USER node
ENTRYPOINT ["/sbin/tini", "--"]
CMD ["node", "/app/index.js"]

NginxはホストOS側で設定するのでNginxはDockerでは作成しません。なのでdocker-compose.yamlは下記にしました。restart: alwaysを入れて終了しても自動再起動するようにします。

docker-compose.yaml

version: "3.8"

services:
  feed-generator:
    container_name: feed-generator
    build:
      context: ./
      dockerfile: Dockerfile
    restart: always
    volumes:
      - /app/db
    environment:
      - NODE_ENV=production
    ports:
      - "3000:3000"

networks:
  default:

Docker起動

Dockerコンテナを起動します。

sudo docker compose up -d

Nginxの設定

/etc/nginx/conf.d/bsfeed.confを作成しURLからDockerコンテナへリバースプロキシさせます。

/etc/nginx/conf.d/bsfeed.conf

 server {
     listen 443 ssl;
     server_name bsfeed.estampie.work;
     ssl_certificate (証明書のパス);
     ssl_certificate_key (証明書鍵のパス);
 
     location / {
         proxy_pass http://127.0.0.1:3000/;
         #CORS
         add_header Access-Control-Allow-Origin "*";
         add_header Access-Control-Allow-Methods "POST, GET, OPTIONS";
         add_header Access-Control-Allow-Headers "Origin, Authorization, Accept";
         add_header Access-Control-Allow-Credentials true;
 
         proxy_set_header Host $host;
         proxy_set_header X-Real-IP $remote_addr;
         proxy_set_header X-Forwarded-For        $proxy_add_x_forwarded_for; 
     }
      listen 80;
      listen [::]:80;
 
      root /var/www/html;
 
      # Add index.php to the list if you are using PHP
      index index.html index.htm index.nginx-debian.html;
 
      server_name _;
 }

sudo systemctl restart nginxで有効化させます。

フィードの確認

無事作成されればhttps://{FEEDGEN_HOSTNAME}/xrpc/app.bsky.feed.getFeedSkeleton?feed=at://{FEEDGEN_PUBLISHER_DID}/app.bsky.feed.generator/{algos/*.tsに記述したshortname}にアクセスするとjsonが表示される筈です。

作成した以降の投稿が表示されるのでテスト投稿を行っておきましょう。

「https://上記フィードアドレス/hiyoko」「https://上記フィードアドレス/niwatori」でそれぞれのフィードにアクセスできます。

フィードの公開

blueskyで公開するための情報をpublishFeedGen.tsに記載していきます。

scripts/publishFeedGen.ts

ひよこフィードを登録します。

(2024/2/9時点)forkをGithubにそのまま上げている場合直書きのためパスワード見えてしまうので注意。パスワードを.envへ移す公式プルリク#50を使用すると良いかも

  // YOUR bluesky handle
  // Ex: user.bsky.social
  const handle = '自分のユーザーID'

  // YOUR bluesky password, or preferably an App Password (found in your client settings)
  // Ex: abcd-1234-efgh-5678
  const password = 'アプリパスワード'

  // A short name for the record that will show in urls
  // Lowercase with no spaces.
  // Ex: whats-hot
  const recordName = 'hiyoko' // hiyoko.tsのshortname

  // A display name for your feed
  // Ex: What's Hot
  const displayName = 'ひよこ' // フィードの名前

  // (Optional) A description of your feed
  // Ex: Top trending content from the whole network
  const description = 'ひよこの話題を集めるフィード' // 説明

  // (Optional) The path to an image to be used as your feed's avatar
  // Ex: ~/path/to/avatar.jpeg
  const avatar: string = ''

アイコンも設定したい場合ビルド前にfeed-generatorフォルダに画像を入れ「const avatar: string = '/publish/画像ファイル名.png’」で指定してあげると設定出来ます。jpgまたはpngです。

参考サイトを参考にfeed-generatorの一つ上のフォルダに「pub.Dockerfile」を作成し下記を実行します。(feed-generator本体も同じフォルダにDockerfileとして存在するため)

pub.Dockerfile

FROM node:18-alpine
WORKDIR /publish
COPY feed-generator /publish

RUN npm install

CMD ["npm", "run", "publishFeed"]
sudo docker build -t publish_feed -f pub.Dockerfile .
sudo docker run publish_feed

上記で「All done 🎉」が表示されれば完了です。niwatoriも公開したい場合publishFeedGen.tsを書き換えて再度ビルドからDockerコマンドを実行すればOKです。説明文など公開情報を更新するときも再度実行します。

Blueskyのフィード検索から検索してみましょう🥳

公開停止は公式プルリク#77を導入すれば使えます。

元のストリームから取り込む条件を変更したり、DBカラムを追加したりすることで色々編集できそうです。

色々改造

フィードを作る前の投稿をDBに取り込む (2/12追記)

Blueskyの検索はhttps://api.bsky.app/xrpc/app.bsky.feed.searchPosts?q=クエリ&limit=100で検索結果のjsonが取得できるのでこれをDBに取り込みます。

subscription.tsと同じフォルダにsearchtodb.tsを作成します。

パラメータにはcursorがあり、limitの数以上取り込めるので使用します。たくさん取り込みたくなければlimitを指定してこの部分の処理は削除して下さい。

既にDBに取り込まれているデータはuriを見て除外します。

searchtodb.ts

// フィード作成前の投稿もDBに追加するスクリプト
import https from 'https';
import dotenv from 'dotenv';
import { createDb, Database } from './db';

dotenv.config();

async function fetchSearchResults(query: string, limit: number = 100, cursor: string = ''): Promise<any[]> {
    let url = `https://api.bsky.app/xrpc/app.bsky.feed.searchPosts?q=${encodeURIComponent(query)}&limit=${limit}`;
    if (cursor) {
        url += `&cursor=${encodeURIComponent(cursor)}`;
    }

    return new Promise((resolve, reject) => {
        https.get(url, (res) => {
            let data = '';
            res.on('data', (chunk) => {
                data += chunk;
            });
            res.on('end', async () => {
                try {
                    const result = JSON.parse(data);
                    if (result.cursor && result.posts.length > 0) {
                        const nextResults = await fetchSearchResults(query, limit, result.cursor);
                        resolve([...result.posts, ...nextResults]);
                    } else {
                        resolve(result.posts);
                    }
                } catch (error) {
                    reject(error);
                }
            });
        }).on('error', (error) => {
            reject(error);
        });
    });
}

async function saveSearchResultsToDb(db: Database, posts: any[]) {
    for (const post of posts) {

        // 同じuriを持つレコードがデータベースに存在するか確認
        const exists = await db
            .selectFrom('post')
            .select('uri')
            .where('uri', '=', post.uri)
            .execute();

        // レコードが存在しない場合のみ挿入を実行
        if (exists.length === 0) {
            await db
                .insertInto('post')
                .values({
                    uri: post.uri,
                    cid: post.cid,
                    text: post.record.text,
                    indexedAt: post.indexedAt,
                })
                .execute();
            // 取り込めたtextを表示(消してもOK)
            console.log(`Added post to database: ${post.record.text}`);
        }
    }
}

async function main() {
    try {
        const dbLocation = maybeStr(process.env.FEEDGEN_SQLITE_LOCATION);
        if (!dbLocation) {
            console.error('Database location is not defined.');
            process.exit(1);
        }
        const db = createDb(dbLocation);
        const queries = ['クエリ1', 'クエリ2']; // 検索クエリのリスト

        for (const query of queries) {
            const searchResults = await fetchSearchResults(query);
            await saveSearchResultsToDb(db, searchResults);
            console.log(`Search results for query "${query}" saved to database.`);
        }
    } catch (error) {
        console.error('An error occurred:', error);
        process.exit(1); // エラーが発生した場合終了
    }
}

const maybeStr = (val?: string) => val;

main();

下記コマンドで実行できます。

sudo docker compose build
sudo docker compose stop && sudo docker compose up -d
sudo docker exec feed-generator node searchtodb.js

🎉

画像の添付情報も取り込む (2/12追記)

データーベースにmediaカラムを追加し、postのembedにapp.bsky.embed.imagesがあればmediaカラムに「image」というテキストを追加します。

※既に取り込まれたポストでは無効なので全部最初からやっています。既存DBに追加する場合、migrations['002’]にカラム追加処理を書いて、別途既存のポストに画像の有無を付与するスクリプト作成が必要です。

migrations.tsに下記を追加

migrations['001'] = {
  async up(db: Kysely<unknown>) {
    await db.schema
      .createTable('post')
      // ~略~
      .addColumn('media', 'varchar')

schema.tsに下記を追加

export type Post = {
  // ~略~
  media: string | null

subscription.ts書き換え

      .map((create) => {
        // 画像の添付があればmediaカラムにimageを挿入
        const media = create.record.embed && create.record.embed.$type === 'app.bsky.embed.images' ? 'image' : null;

        return {
          uri: create.uri,
          cid: create.cid,
          text: create.record.text,
          replyParent: create.record?.reply?.parent.uri ?? null,
          replyRoot: create.record?.reply?.root.uri ?? null,
          indexedAt: new Date().toISOString(),
          media: media,
        };
      });

フィード作成前の投稿もDBに取り込むsearchtodb.ts書き換え

async function saveSearchResultsToDb(db: Database, posts: any[]) {
    for (const post of posts) {
        // 画像の添付があればmediaカラムにimageを挿入
        const media = post.record.embed && post.record.embed.$type === 'app.bsky.embed.images' ? 'image' : null;

        // 同じuriを持つレコードがデータベースに存在するか確認
        const exists = await db
            .selectFrom('post')
            .select('uri')
            .where('uri', '=', post.uri)
            .execute();

        // レコードが存在しない場合のみ挿入を実行
        if (exists.length === 0) {
            await db
                .insertInto('post')
                .values({
                    uri: post.uri,
                    cid: post.cid,
                    text: post.record.text,
                    indexedAt: post.indexedAt,
                    media: media,
                })
                .execute();
            // 取り込めたtextを表示(消してもOK)
            console.log(`Added post to database: ${post.record.text}`);
        }
    }
}

あとはargos/*.tsのフィルタ部分でmediaカラムをチェックするようにすれば画像添付の投稿のみを集めたフィードが作成できます。

export const handler = async (ctx: AppContext, params: QueryParams) => {
    let builder = ctx.db
        .selectFrom('post')
        .selectAll()
        // ~略~
        .where('media', 'like', '%image%')

🥳