feat(WHE-23): add hybrid purchase parser with persisted parse metadata

This commit is contained in:
2026-03-05 04:43:57 +04:00
parent 3b1b6468db
commit ebb6ce4ce6
14 changed files with 1881 additions and 7 deletions

View File

@@ -1,3 +1,4 @@
import { parsePurchaseMessage, type PurchaseParserLlmFallback } from '@household/application'
import { and, eq } from 'drizzle-orm'
import type { Bot, Context } from 'grammy'
@@ -25,7 +26,10 @@ export interface PurchaseTopicRecord extends PurchaseTopicCandidate {
}
export interface PurchaseMessageIngestionRepository {
save(record: PurchaseTopicRecord): Promise<'created' | 'duplicate'>
save(
record: PurchaseTopicRecord,
llmFallback?: PurchaseParserLlmFallback
): Promise<'created' | 'duplicate'>
}
export function extractPurchaseTopicCandidate(
@@ -52,6 +56,10 @@ export function extractPurchaseTopicCandidate(
}
}
function needsReviewAsInt(value: boolean): number {
return value ? 1 : 0
}
export function createPurchaseMessageRepository(databaseUrl: string): {
repository: PurchaseMessageIngestionRepository
close: () => Promise<void>
@@ -62,7 +70,7 @@ export function createPurchaseMessageRepository(databaseUrl: string): {
})
const repository: PurchaseMessageIngestionRepository = {
async save(record) {
async save(record, llmFallback) {
const matchedMember = await db
.select({ id: schema.members.id })
.from(schema.members)
@@ -75,6 +83,30 @@ export function createPurchaseMessageRepository(databaseUrl: string): {
.limit(1)
const senderMemberId = matchedMember[0]?.id ?? null
let parserError: string | null = null
const parsed = await parsePurchaseMessage(
{
rawText: record.rawText
},
llmFallback
? {
llmFallback
}
: {}
).catch((error) => {
parserError = error instanceof Error ? error.message : 'Unknown parser error'
return null
})
const processingStatus =
parserError !== null
? 'parse_failed'
: parsed === null
? 'needs_review'
: parsed.needsReview
? 'needs_review'
: 'parsed'
const inserted = await db
.insert(schema.purchaseMessages)
@@ -89,7 +121,14 @@ export function createPurchaseMessageRepository(databaseUrl: string): {
telegramThreadId: record.threadId,
telegramUpdateId: String(record.updateId),
messageSentAt: record.messageSentAt,
processingStatus: 'pending'
parsedAmountMinor: parsed?.amountMinor,
parsedCurrency: parsed?.currency,
parsedItemDescription: parsed?.itemDescription,
parserMode: parsed?.parserMode,
parserConfidence: parsed?.confidence,
needsReview: needsReviewAsInt(parsed?.needsReview ?? true),
parserError,
processingStatus
})
.onConflictDoNothing({
target: [
@@ -151,7 +190,10 @@ function toCandidateFromContext(ctx: Context): PurchaseTopicCandidate | null {
export function registerPurchaseTopicIngestion(
bot: Bot,
config: PurchaseTopicIngestionConfig,
repository: PurchaseMessageIngestionRepository
repository: PurchaseMessageIngestionRepository,
options: {
llmFallback?: PurchaseParserLlmFallback
} = {}
): void {
bot.on('message:text', async (ctx) => {
const candidate = toCandidateFromContext(ctx)
@@ -165,7 +207,7 @@ export function registerPurchaseTopicIngestion(
}
try {
const status = await repository.save(record)
const status = await repository.save(record, options.llmFallback)
if (status === 'created') {
console.log(