1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
|
import type { Job } from "bullmq";
import { Worker } from "bullmq";
import { eq } from "drizzle-orm";
import type { ZSearchIndexingRequest } from "@hoarder/shared/queues";
import { db } from "@hoarder/db";
import { bookmarks } from "@hoarder/db/schema";
import logger from "@hoarder/shared/logger";
import {
queueConnectionDetails,
SearchIndexingQueue,
zSearchIndexingRequestSchema,
} from "@hoarder/shared/queues";
import { getSearchIdxClient } from "@hoarder/shared/search";
export class SearchIndexingWorker {
static async build() {
logger.info("Starting search indexing worker ...");
const worker = new Worker<ZSearchIndexingRequest, void>(
SearchIndexingQueue.name,
runSearchIndexing,
{
connection: queueConnectionDetails,
autorun: false,
},
);
worker.on("completed", (job) => {
const jobId = job?.id ?? "unknown";
logger.info(`[search][${jobId}] Completed successfully`);
});
worker.on("failed", (job, error) => {
const jobId = job?.id ?? "unknown";
logger.error(`[search][${jobId}] openai job failed: ${error}`);
});
return worker;
}
}
async function runIndex(
searchClient: NonNullable<Awaited<ReturnType<typeof getSearchIdxClient>>>,
bookmarkId: string,
) {
const bookmark = await db.query.bookmarks.findFirst({
where: eq(bookmarks.id, bookmarkId),
with: {
link: true,
text: true,
tagsOnBookmarks: {
with: {
tag: true,
},
},
},
});
if (!bookmark) {
throw new Error(`Bookmark ${bookmarkId} not found`);
}
searchClient.addDocuments([
{
id: bookmark.id,
userId: bookmark.userId,
...(bookmark.link
? {
url: bookmark.link.url,
title: bookmark.link.title,
description: bookmark.link.description,
content: bookmark.link.content,
}
: undefined),
...(bookmark.text ? { content: bookmark.text.text } : undefined),
note: bookmark.note,
tags: bookmark.tagsOnBookmarks.map((t) => t.tag.name),
},
]);
}
async function runDelete(
searchClient: NonNullable<Awaited<ReturnType<typeof getSearchIdxClient>>>,
bookmarkId: string,
) {
await searchClient.deleteDocument(bookmarkId);
}
async function runSearchIndexing(job: Job<ZSearchIndexingRequest, void>) {
const jobId = job.id ?? "unknown";
const request = zSearchIndexingRequestSchema.safeParse(job.data);
if (!request.success) {
throw new Error(
`[search][${jobId}] Got malformed job request: ${request.error.toString()}`,
);
}
const searchClient = await getSearchIdxClient();
if (!searchClient) {
logger.debug(
`[search][${jobId}] Search is not configured, nothing to do now`,
);
return;
}
const bookmarkId = request.data.bookmarkId;
switch (request.data.type) {
case "index": {
await runIndex(searchClient, bookmarkId);
break;
}
case "delete": {
await runDelete(searchClient, bookmarkId);
break;
}
}
}
|