aboutsummaryrefslogtreecommitdiffstats
path: root/apps/workers/searchWorker.ts
blob: ae916441604f82d8cb7dec814978689e45acbe32 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import type { Job } from "bullmq";
import { Worker } from "bullmq";
import { eq } from "drizzle-orm";

import type { ZSearchIndexingRequest } from "@hoarder/shared/queues";
import { db } from "@hoarder/db";
import { bookmarks } from "@hoarder/db/schema";
import logger from "@hoarder/shared/logger";
import {
  queueConnectionDetails,
  SearchIndexingQueue,
  zSearchIndexingRequestSchema,
} from "@hoarder/shared/queues";
import { getSearchIdxClient } from "@hoarder/shared/search";

export class SearchIndexingWorker {
  static async build() {
    logger.info("Starting search indexing worker ...");
    const worker = new Worker<ZSearchIndexingRequest, void>(
      SearchIndexingQueue.name,
      runSearchIndexing,
      {
        connection: queueConnectionDetails,
        autorun: false,
      },
    );

    worker.on("completed", (job) => {
      const jobId = job?.id ?? "unknown";
      logger.info(`[search][${jobId}] Completed successfully`);
    });

    worker.on("failed", (job, error) => {
      const jobId = job?.id ?? "unknown";
      logger.error(`[search][${jobId}] openai job failed: ${error}`);
    });

    return worker;
  }
}

async function runIndex(
  searchClient: NonNullable<Awaited<ReturnType<typeof getSearchIdxClient>>>,
  bookmarkId: string,
) {
  const bookmark = await db.query.bookmarks.findFirst({
    where: eq(bookmarks.id, bookmarkId),
    with: {
      link: true,
      text: true,
      tagsOnBookmarks: {
        with: {
          tag: true,
        },
      },
    },
  });

  if (!bookmark) {
    throw new Error(`Bookmark ${bookmarkId} not found`);
  }

  searchClient.addDocuments([
    {
      id: bookmark.id,
      userId: bookmark.userId,
      ...(bookmark.link
        ? {
            url: bookmark.link.url,
            title: bookmark.link.title,
            description: bookmark.link.description,
            content: bookmark.link.content,
          }
        : undefined),
      ...(bookmark.text ? { content: bookmark.text.text } : undefined),
      note: bookmark.note,
      createdAt: bookmark.createdAt.toISOString(),
      tags: bookmark.tagsOnBookmarks.map((t) => t.tag.name),
    },
  ]);
}

async function runDelete(
  searchClient: NonNullable<Awaited<ReturnType<typeof getSearchIdxClient>>>,
  bookmarkId: string,
) {
  await searchClient.deleteDocument(bookmarkId);
}

async function runSearchIndexing(job: Job<ZSearchIndexingRequest, void>) {
  const jobId = job.id ?? "unknown";

  const request = zSearchIndexingRequestSchema.safeParse(job.data);
  if (!request.success) {
    throw new Error(
      `[search][${jobId}] Got malformed job request: ${request.error.toString()}`,
    );
  }

  const searchClient = await getSearchIdxClient();
  if (!searchClient) {
    logger.debug(
      `[search][${jobId}] Search is not configured, nothing to do now`,
    );
    return;
  }

  const bookmarkId = request.data.bookmarkId;
  switch (request.data.type) {
    case "index": {
      await runIndex(searchClient, bookmarkId);
      break;
    }
    case "delete": {
      await runDelete(searchClient, bookmarkId);
      break;
    }
  }
}