feat(WHE-22): ingest configured topic messages with idempotent persistence

This commit is contained in:
2026-03-05 04:32:58 +04:00
parent e72c145e3d
commit 67e9e2dee2
16 changed files with 1838 additions and 20 deletions

View File

@@ -3,6 +3,11 @@ export interface BotRuntimeConfig {
telegramBotToken: string
telegramWebhookSecret: string
telegramWebhookPath: string
databaseUrl?: string
householdId?: string
telegramHouseholdChatId?: string
telegramPurchaseTopicId?: number
purchaseTopicIngestionEnabled: boolean
}
function parsePort(raw: string | undefined): number {
@@ -26,11 +31,56 @@ function requireValue(value: string | undefined, key: string): string {
return value
}
function parseOptionalTopicId(raw: string | undefined): number | undefined {
if (!raw) {
return undefined
}
const parsed = Number(raw)
if (!Number.isInteger(parsed) || parsed <= 0) {
throw new Error(`Invalid TELEGRAM_PURCHASE_TOPIC_ID value: ${raw}`)
}
return parsed
}
function parseOptionalValue(value: string | undefined): string | undefined {
const trimmed = value?.trim()
return trimmed && trimmed.length > 0 ? trimmed : undefined
}
export function getBotRuntimeConfig(env: NodeJS.ProcessEnv = process.env): BotRuntimeConfig {
return {
const databaseUrl = parseOptionalValue(env.DATABASE_URL)
const householdId = parseOptionalValue(env.HOUSEHOLD_ID)
const telegramHouseholdChatId = parseOptionalValue(env.TELEGRAM_HOUSEHOLD_CHAT_ID)
const telegramPurchaseTopicId = parseOptionalTopicId(env.TELEGRAM_PURCHASE_TOPIC_ID)
const purchaseTopicIngestionEnabled =
databaseUrl !== undefined &&
householdId !== undefined &&
telegramHouseholdChatId !== undefined &&
telegramPurchaseTopicId !== undefined
const runtime: BotRuntimeConfig = {
port: parsePort(env.PORT),
telegramBotToken: requireValue(env.TELEGRAM_BOT_TOKEN, 'TELEGRAM_BOT_TOKEN'),
telegramWebhookSecret: requireValue(env.TELEGRAM_WEBHOOK_SECRET, 'TELEGRAM_WEBHOOK_SECRET'),
telegramWebhookPath: env.TELEGRAM_WEBHOOK_PATH ?? '/webhook/telegram'
telegramWebhookPath: env.TELEGRAM_WEBHOOK_PATH ?? '/webhook/telegram',
purchaseTopicIngestionEnabled
}
if (databaseUrl !== undefined) {
runtime.databaseUrl = databaseUrl
}
if (householdId !== undefined) {
runtime.householdId = householdId
}
if (telegramHouseholdChatId !== undefined) {
runtime.telegramHouseholdChatId = telegramHouseholdChatId
}
if (telegramPurchaseTopicId !== undefined) {
runtime.telegramPurchaseTopicId = telegramPurchaseTopicId
}
return runtime
}

View File

@@ -2,12 +2,37 @@ import { webhookCallback } from 'grammy'
import { createTelegramBot } from './bot'
import { getBotRuntimeConfig } from './config'
import {
createPurchaseMessageRepository,
registerPurchaseTopicIngestion
} from './purchase-topic-ingestion'
import { createBotWebhookServer } from './server'
const runtime = getBotRuntimeConfig()
const bot = createTelegramBot(runtime.telegramBotToken)
const webhookHandler = webhookCallback(bot, 'std/http')
let closePurchaseRepository: (() => Promise<void>) | undefined
if (runtime.purchaseTopicIngestionEnabled) {
const purchaseRepositoryClient = createPurchaseMessageRepository(runtime.databaseUrl!)
closePurchaseRepository = purchaseRepositoryClient.close
registerPurchaseTopicIngestion(
bot,
{
householdId: runtime.householdId!,
householdChatId: runtime.telegramHouseholdChatId!,
purchaseTopicId: runtime.telegramPurchaseTopicId!
},
purchaseRepositoryClient.repository
)
} else {
console.warn(
'Purchase topic ingestion is disabled. Set DATABASE_URL, HOUSEHOLD_ID, TELEGRAM_HOUSEHOLD_CHAT_ID, and TELEGRAM_PURCHASE_TOPIC_ID to enable.'
)
}
const server = createBotWebhookServer({
webhookPath: runtime.telegramWebhookPath,
webhookSecret: runtime.telegramWebhookSecret,
@@ -23,6 +48,10 @@ if (import.meta.main) {
console.log(
`@household/bot webhook server started on :${runtime.port} path=${runtime.telegramWebhookPath}`
)
process.on('SIGTERM', () => {
void closePurchaseRepository?.()
})
}
export { server }

View File

@@ -0,0 +1,53 @@
import { describe, expect, test } from 'bun:test'
import {
extractPurchaseTopicCandidate,
type PurchaseTopicCandidate
} from './purchase-topic-ingestion'
const config = {
householdId: '11111111-1111-4111-8111-111111111111',
householdChatId: '-10012345',
purchaseTopicId: 777
}
function candidate(overrides: Partial<PurchaseTopicCandidate> = {}): PurchaseTopicCandidate {
return {
updateId: 1,
chatId: '-10012345',
messageId: '10',
threadId: '777',
senderTelegramUserId: '10002',
rawText: 'Bought toilet paper 30 gel',
messageSentAt: new Date('2026-03-05T00:00:00.000Z'),
...overrides
}
}
describe('extractPurchaseTopicCandidate', () => {
test('returns record when message belongs to configured topic', () => {
const record = extractPurchaseTopicCandidate(candidate(), config)
expect(record).not.toBeNull()
expect(record?.householdId).toBe(config.householdId)
expect(record?.rawText).toBe('Bought toilet paper 30 gel')
})
test('skips message from other chat', () => {
const record = extractPurchaseTopicCandidate(candidate({ chatId: '-10099999' }), config)
expect(record).toBeNull()
})
test('skips message from other topic', () => {
const record = extractPurchaseTopicCandidate(candidate({ threadId: '778' }), config)
expect(record).toBeNull()
})
test('skips blank text after trim', () => {
const record = extractPurchaseTopicCandidate(candidate({ rawText: ' ' }), config)
expect(record).toBeNull()
})
})

View File

@@ -0,0 +1,179 @@
import { and, eq } from 'drizzle-orm'
import type { Bot, Context } from 'grammy'
import { createDbClient, schema } from '@household/db'
export interface PurchaseTopicIngestionConfig {
householdId: string
householdChatId: string
purchaseTopicId: number
}
export interface PurchaseTopicCandidate {
updateId: number
chatId: string
messageId: string
threadId: string
senderTelegramUserId: string
senderDisplayName?: string
rawText: string
messageSentAt: Date
}
export interface PurchaseTopicRecord extends PurchaseTopicCandidate {
householdId: string
}
export interface PurchaseMessageIngestionRepository {
save(record: PurchaseTopicRecord): Promise<'created' | 'duplicate'>
}
export function extractPurchaseTopicCandidate(
value: PurchaseTopicCandidate,
config: PurchaseTopicIngestionConfig
): PurchaseTopicRecord | null {
if (value.chatId !== config.householdChatId) {
return null
}
if (value.threadId !== String(config.purchaseTopicId)) {
return null
}
const normalizedText = value.rawText.trim()
if (normalizedText.length === 0) {
return null
}
return {
...value,
rawText: normalizedText,
householdId: config.householdId
}
}
export function createPurchaseMessageRepository(databaseUrl: string): {
repository: PurchaseMessageIngestionRepository
close: () => Promise<void>
} {
const { db, queryClient } = createDbClient(databaseUrl, {
max: 5,
prepare: false
})
const repository: PurchaseMessageIngestionRepository = {
async save(record) {
const matchedMember = await db
.select({ id: schema.members.id })
.from(schema.members)
.where(
and(
eq(schema.members.householdId, record.householdId),
eq(schema.members.telegramUserId, record.senderTelegramUserId)
)
)
.limit(1)
const senderMemberId = matchedMember[0]?.id ?? null
const inserted = await db
.insert(schema.purchaseMessages)
.values({
householdId: record.householdId,
senderMemberId,
senderTelegramUserId: record.senderTelegramUserId,
senderDisplayName: record.senderDisplayName,
rawText: record.rawText,
telegramChatId: record.chatId,
telegramMessageId: record.messageId,
telegramThreadId: record.threadId,
telegramUpdateId: String(record.updateId),
messageSentAt: record.messageSentAt,
processingStatus: 'pending'
})
.onConflictDoNothing({
target: [
schema.purchaseMessages.householdId,
schema.purchaseMessages.telegramChatId,
schema.purchaseMessages.telegramMessageId
]
})
.returning({ id: schema.purchaseMessages.id })
return inserted.length > 0 ? 'created' : 'duplicate'
}
}
return {
repository,
close: async () => {
await queryClient.end({ timeout: 5 })
}
}
}
function toCandidateFromContext(ctx: Context): PurchaseTopicCandidate | null {
const message = ctx.message
if (!message || !('text' in message)) {
return null
}
if (!message.is_topic_message || message.message_thread_id === undefined) {
return null
}
const senderTelegramUserId = ctx.from?.id?.toString()
if (!senderTelegramUserId) {
return null
}
const senderDisplayName = [ctx.from?.first_name, ctx.from?.last_name]
.filter((part) => !!part && part.trim().length > 0)
.join(' ')
const candidate: PurchaseTopicCandidate = {
updateId: ctx.update.update_id,
chatId: message.chat.id.toString(),
messageId: message.message_id.toString(),
threadId: message.message_thread_id.toString(),
senderTelegramUserId,
rawText: message.text,
messageSentAt: new Date(message.date * 1000)
}
if (senderDisplayName.length > 0) {
candidate.senderDisplayName = senderDisplayName
}
return candidate
}
export function registerPurchaseTopicIngestion(
bot: Bot,
config: PurchaseTopicIngestionConfig,
repository: PurchaseMessageIngestionRepository
): void {
bot.on('message:text', async (ctx) => {
const candidate = toCandidateFromContext(ctx)
if (!candidate) {
return
}
const record = extractPurchaseTopicCandidate(candidate, config)
if (!record) {
return
}
try {
const status = await repository.save(record)
if (status === 'created') {
console.log(
`purchase topic message ingested chat=${record.chatId} thread=${record.threadId} message=${record.messageId}`
)
}
} catch (error) {
console.error('Failed to ingest purchase topic message', error)
}
})
}