fix(bot): harden webhook processing and purchase defaults

This commit is contained in:
2026-03-11 03:25:16 +04:00
parent dc09a07e21
commit ac5f11f8da
12 changed files with 674 additions and 148 deletions

View File

@@ -3,6 +3,7 @@ import { describe, expect, test } from 'bun:test'
import type { FinanceCommandService } from '@household/application' import type { FinanceCommandService } from '@household/application'
import type { import type {
HouseholdConfigurationRepository, HouseholdConfigurationRepository,
ProcessedBotMessageRepository,
TelegramPendingActionRecord, TelegramPendingActionRecord,
TelegramPendingActionRepository TelegramPendingActionRepository
} from '@household/ports' } from '@household/ports'
@@ -313,6 +314,30 @@ function createPromptRepository(): TelegramPendingActionRepository {
} }
} }
function createProcessedBotMessageRepository(): ProcessedBotMessageRepository {
const claims = new Set<string>()
return {
async claimMessage(input) {
const key = `${input.householdId}:${input.source}:${input.sourceMessageKey}`
if (claims.has(key)) {
return {
claimed: false
}
}
claims.add(key)
return {
claimed: true
}
},
async releaseMessage(input) {
claims.delete(`${input.householdId}:${input.source}:${input.sourceMessageKey}`)
}
}
}
describe('registerDmAssistant', () => { describe('registerDmAssistant', () => {
test('replies with a conversational DM answer and records token usage', async () => { test('replies with a conversational DM answer and records token usage', async () => {
const bot = createTestBot() const bot = createTestBot()
@@ -372,19 +397,26 @@ describe('registerDmAssistant', () => {
await bot.handleUpdate(privateMessageUpdate('How much do I still owe this month?') as never) await bot.handleUpdate(privateMessageUpdate('How much do I still owe this month?') as never)
expect(calls).toHaveLength(2) expect(calls).toHaveLength(3)
expect(calls[0]).toMatchObject({ expect(calls[0]).toMatchObject({
method: 'sendChatAction',
payload: {
chat_id: 123456,
action: 'typing'
}
})
expect(calls[1]).toMatchObject({
method: 'sendMessage', method: 'sendMessage',
payload: { payload: {
chat_id: 123456, chat_id: 123456,
text: 'Working on it...' text: 'Working on it...'
} }
}) })
expect(calls[1]).toMatchObject({ expect(calls[2]).toMatchObject({
method: 'editMessageText', method: 'editMessageText',
payload: { payload: {
chat_id: 123456, chat_id: 123456,
message_id: 1, message_id: 2,
text: 'You still owe 350.00 GEL this cycle.' text: 'You still owe 350.00 GEL this cycle.'
} }
}) })
@@ -462,6 +494,82 @@ describe('registerDmAssistant', () => {
}) })
}) })
test('ignores duplicate deliveries of the same DM update', async () => {
const bot = createTestBot()
const calls: Array<{ method: string; payload: unknown }> = []
const usageTracker = createInMemoryAssistantUsageTracker()
bot.api.config.use(async (_prev, method, payload) => {
calls.push({ method, payload })
if (method === 'sendMessage') {
return {
ok: true,
result: {
message_id: calls.length,
date: Math.floor(Date.now() / 1000),
chat: {
id: 123456,
type: 'private'
},
text: (payload as { text?: string }).text ?? 'ok'
}
} as never
}
return {
ok: true,
result: true
} as never
})
registerDmAssistant({
bot,
assistant: {
async respond() {
return {
text: 'You still owe 350.00 GEL this cycle.',
usage: {
inputTokens: 100,
outputTokens: 25,
totalTokens: 125
}
}
}
},
householdConfigurationRepository: createHouseholdRepository(),
messageProcessingRepository: createProcessedBotMessageRepository(),
promptRepository: createPromptRepository(),
financeServiceForHousehold: () => createFinanceService(),
memoryStore: createInMemoryAssistantConversationMemoryStore(12),
rateLimiter: createInMemoryAssistantRateLimiter({
burstLimit: 5,
burstWindowMs: 60_000,
rollingLimit: 50,
rollingWindowMs: 86_400_000
}),
usageTracker
})
const update = privateMessageUpdate('How much do I still owe this month?')
await bot.handleUpdate(update as never)
await bot.handleUpdate(update as never)
expect(calls).toHaveLength(3)
expect(usageTracker.listHouseholdUsage('household-1')).toEqual([
{
householdId: 'household-1',
telegramUserId: '123456',
displayName: 'Stan',
requestCount: 1,
inputTokens: 100,
outputTokens: 25,
totalTokens: 125,
updatedAt: expect.any(String)
}
])
})
test('confirms a pending payment proposal from DM callback', async () => { test('confirms a pending payment proposal from DM callback', async () => {
const bot = createTestBot() const bot = createTestBot()
const calls: Array<{ method: string; payload: unknown }> = [] const calls: Array<{ method: string; payload: unknown }> = []

View File

@@ -3,6 +3,7 @@ import { Money } from '@household/domain'
import type { Logger } from '@household/observability' import type { Logger } from '@household/observability'
import type { import type {
HouseholdConfigurationRepository, HouseholdConfigurationRepository,
ProcessedBotMessageRepository,
TelegramPendingActionRepository TelegramPendingActionRepository
} from '@household/ports' } from '@household/ports'
import type { Bot, Context } from 'grammy' import type { Bot, Context } from 'grammy'
@@ -10,10 +11,12 @@ import type { Bot, Context } from 'grammy'
import { resolveReplyLocale } from './bot-locale' import { resolveReplyLocale } from './bot-locale'
import { getBotTranslations, type BotLocale } from './i18n' import { getBotTranslations, type BotLocale } from './i18n'
import type { AssistantReply, ConversationalAssistant } from './openai-chat-assistant' import type { AssistantReply, ConversationalAssistant } from './openai-chat-assistant'
import { startTypingIndicator } from './telegram-chat-action'
const ASSISTANT_PAYMENT_ACTION = 'assistant_payment_confirmation' as const const ASSISTANT_PAYMENT_ACTION = 'assistant_payment_confirmation' as const
const ASSISTANT_PAYMENT_CONFIRM_CALLBACK_PREFIX = 'assistant_payment:confirm:' const ASSISTANT_PAYMENT_CONFIRM_CALLBACK_PREFIX = 'assistant_payment:confirm:'
const ASSISTANT_PAYMENT_CANCEL_CALLBACK_PREFIX = 'assistant_payment:cancel:' const ASSISTANT_PAYMENT_CANCEL_CALLBACK_PREFIX = 'assistant_payment:cancel:'
const DM_ASSISTANT_MESSAGE_SOURCE = 'telegram-dm-assistant'
const MEMORY_SUMMARY_MAX_CHARS = 1200 const MEMORY_SUMMARY_MAX_CHARS = 1200
interface AssistantConversationTurn { interface AssistantConversationTurn {
@@ -465,6 +468,7 @@ export function registerDmAssistant(options: {
bot: Bot bot: Bot
assistant?: ConversationalAssistant assistant?: ConversationalAssistant
householdConfigurationRepository: HouseholdConfigurationRepository householdConfigurationRepository: HouseholdConfigurationRepository
messageProcessingRepository?: ProcessedBotMessageRepository
promptRepository: TelegramPendingActionRepository promptRepository: TelegramPendingActionRepository
financeServiceForHousehold: (householdId: string) => FinanceCommandService financeServiceForHousehold: (householdId: string) => FinanceCommandService
memoryStore: AssistantConversationMemoryStore memoryStore: AssistantConversationMemoryStore
@@ -641,135 +645,181 @@ export function registerDmAssistant(options: {
} }
const member = memberships[0]! const member = memberships[0]!
const rateLimit = options.rateLimiter.consume(`${member.householdId}:${telegramUserId}`) const updateId = ctx.update.update_id?.toString()
if (!rateLimit.allowed) { const dedupeClaim =
await ctx.reply(t.rateLimited(formatRetryDelay(locale, rateLimit.retryAfterMs))) options.messageProcessingRepository && typeof updateId === 'string'
return ? {
} repository: options.messageProcessingRepository,
updateId
}
: null
const financeService = options.financeServiceForHousehold(member.householdId) if (dedupeClaim) {
const paymentProposal = await maybeCreatePaymentProposal({ const claim = await dedupeClaim.repository.claimMessage({
rawText: ctx.msg.text, householdId: member.householdId,
householdId: member.householdId, source: DM_ASSISTANT_MESSAGE_SOURCE,
memberId: member.id, sourceMessageKey: dedupeClaim.updateId
financeService,
householdConfigurationRepository: options.householdConfigurationRepository
})
if (paymentProposal.status === 'clarification') {
await ctx.reply(t.paymentClarification)
return
}
if (paymentProposal.status === 'unsupported_currency') {
await ctx.reply(t.paymentUnsupportedCurrency)
return
}
if (paymentProposal.status === 'no_balance') {
await ctx.reply(t.paymentNoBalance)
return
}
if (paymentProposal.status === 'proposal') {
await options.promptRepository.upsertPendingAction({
telegramUserId,
telegramChatId,
action: ASSISTANT_PAYMENT_ACTION,
payload: {
...paymentProposal.payload
},
expiresAt: null
}) })
const amount = Money.fromMinor( if (!claim.claimed) {
BigInt(paymentProposal.payload.amountMinor), options.logger?.info(
paymentProposal.payload.currency {
) event: 'assistant.duplicate_update',
const proposalText = t.paymentProposal( householdId: member.householdId,
paymentProposal.payload.kind, telegramUserId,
amount.toMajorString(), updateId: dedupeClaim.updateId
amount.currency },
) 'Duplicate DM assistant update ignored'
options.memoryStore.appendTurn(telegramUserId, { )
role: 'user', return
text: ctx.msg.text }
})
options.memoryStore.appendTurn(telegramUserId, {
role: 'assistant',
text: proposalText
})
await ctx.reply(proposalText, {
reply_markup: paymentProposalReplyMarkup(locale, paymentProposal.payload.proposalId)
})
return
} }
if (!options.assistant) {
await ctx.reply(t.unavailable)
return
}
const memory = options.memoryStore.get(telegramUserId)
const householdContext = await buildHouseholdContext({
householdId: member.householdId,
memberId: member.id,
memberDisplayName: member.displayName,
locale,
householdConfigurationRepository: options.householdConfigurationRepository,
financeService
})
const pendingReply = await sendAssistantProcessingReply(ctx, t.processing)
try { try {
const reply = await options.assistant.respond({ const rateLimit = options.rateLimiter.consume(`${member.householdId}:${telegramUserId}`)
locale, if (!rateLimit.allowed) {
householdContext, await ctx.reply(t.rateLimited(formatRetryDelay(locale, rateLimit.retryAfterMs)))
memorySummary: memory.summary, return
recentTurns: memory.turns, }
userMessage: ctx.msg.text
})
options.usageTracker.record({ const financeService = options.financeServiceForHousehold(member.householdId)
const paymentProposal = await maybeCreatePaymentProposal({
rawText: ctx.msg.text,
householdId: member.householdId, householdId: member.householdId,
telegramUserId, memberId: member.id,
displayName: member.displayName, financeService,
usage: reply.usage householdConfigurationRepository: options.householdConfigurationRepository
})
options.memoryStore.appendTurn(telegramUserId, {
role: 'user',
text: ctx.msg.text
})
options.memoryStore.appendTurn(telegramUserId, {
role: 'assistant',
text: reply.text
}) })
options.logger?.info( if (paymentProposal.status === 'clarification') {
{ await ctx.reply(t.paymentClarification)
event: 'assistant.reply', return
}
if (paymentProposal.status === 'unsupported_currency') {
await ctx.reply(t.paymentUnsupportedCurrency)
return
}
if (paymentProposal.status === 'no_balance') {
await ctx.reply(t.paymentNoBalance)
return
}
if (paymentProposal.status === 'proposal') {
await options.promptRepository.upsertPendingAction({
telegramUserId,
telegramChatId,
action: ASSISTANT_PAYMENT_ACTION,
payload: {
...paymentProposal.payload
},
expiresAt: null
})
const amount = Money.fromMinor(
BigInt(paymentProposal.payload.amountMinor),
paymentProposal.payload.currency
)
const proposalText = t.paymentProposal(
paymentProposal.payload.kind,
amount.toMajorString(),
amount.currency
)
options.memoryStore.appendTurn(telegramUserId, {
role: 'user',
text: ctx.msg.text
})
options.memoryStore.appendTurn(telegramUserId, {
role: 'assistant',
text: proposalText
})
await ctx.reply(proposalText, {
reply_markup: paymentProposalReplyMarkup(locale, paymentProposal.payload.proposalId)
})
return
}
if (!options.assistant) {
await ctx.reply(t.unavailable)
return
}
const memory = options.memoryStore.get(telegramUserId)
const typingIndicator = startTypingIndicator(ctx)
let pendingReply: PendingAssistantReply | null = null
try {
const householdContext = await buildHouseholdContext({
householdId: member.householdId,
memberId: member.id,
memberDisplayName: member.displayName,
locale,
householdConfigurationRepository: options.householdConfigurationRepository,
financeService
})
pendingReply = await sendAssistantProcessingReply(ctx, t.processing)
const reply = await options.assistant.respond({
locale,
householdContext,
memorySummary: memory.summary,
recentTurns: memory.turns,
userMessage: ctx.msg.text
})
options.usageTracker.record({
householdId: member.householdId, householdId: member.householdId,
telegramUserId, telegramUserId,
inputTokens: reply.usage.inputTokens, displayName: member.displayName,
outputTokens: reply.usage.outputTokens, usage: reply.usage
totalTokens: reply.usage.totalTokens })
}, options.memoryStore.appendTurn(telegramUserId, {
'DM assistant reply generated' role: 'user',
) text: ctx.msg.text
})
options.memoryStore.appendTurn(telegramUserId, {
role: 'assistant',
text: reply.text
})
await finalizeAssistantReply(ctx, pendingReply, reply.text) options.logger?.info(
{
event: 'assistant.reply',
householdId: member.householdId,
telegramUserId,
inputTokens: reply.usage.inputTokens,
outputTokens: reply.usage.outputTokens,
totalTokens: reply.usage.totalTokens
},
'DM assistant reply generated'
)
await finalizeAssistantReply(ctx, pendingReply, reply.text)
} catch (error) {
options.logger?.error(
{
event: 'assistant.reply_failed',
householdId: member.householdId,
telegramUserId,
error
},
'DM assistant reply failed'
)
await finalizeAssistantReply(ctx, pendingReply, t.unavailable)
} finally {
typingIndicator.stop()
}
} catch (error) { } catch (error) {
options.logger?.error( if (dedupeClaim) {
{ await dedupeClaim.repository.releaseMessage({
event: 'assistant.reply_failed',
householdId: member.householdId, householdId: member.householdId,
telegramUserId, source: DM_ASSISTANT_MESSAGE_SOURCE,
error sourceMessageKey: dedupeClaim.updateId
}, })
'DM assistant reply failed' }
)
await finalizeAssistantReply(ctx, pendingReply, t.unavailable) throw error
} }
}) })
} }

View File

@@ -15,6 +15,7 @@ import {
createDbAnonymousFeedbackRepository, createDbAnonymousFeedbackRepository,
createDbFinanceRepository, createDbFinanceRepository,
createDbHouseholdConfigurationRepository, createDbHouseholdConfigurationRepository,
createDbProcessedBotMessageRepository,
createDbReminderDispatchRepository, createDbReminderDispatchRepository,
createDbTelegramPendingActionRepository createDbTelegramPendingActionRepository
} from '@household/adapters-db' } from '@household/adapters-db'
@@ -85,7 +86,9 @@ const bot = createTelegramBot(
getLogger('telegram'), getLogger('telegram'),
householdConfigurationRepositoryClient?.repository householdConfigurationRepositoryClient?.repository
) )
const webhookHandler = webhookCallback(bot, 'std/http') const webhookHandler = webhookCallback(bot, 'std/http', {
onTimeout: 'return'
})
const financeRepositoryClients = new Map<string, ReturnType<typeof createDbFinanceRepository>>() const financeRepositoryClients = new Map<string, ReturnType<typeof createDbFinanceRepository>>()
const financeServices = new Map<string, ReturnType<typeof createFinanceCommandService>>() const financeServices = new Map<string, ReturnType<typeof createFinanceCommandService>>()
const paymentConfirmationServices = new Map< const paymentConfirmationServices = new Map<
@@ -110,6 +113,10 @@ const telegramPendingActionRepositoryClient =
runtime.databaseUrl && (runtime.anonymousFeedbackEnabled || runtime.assistantEnabled) runtime.databaseUrl && (runtime.anonymousFeedbackEnabled || runtime.assistantEnabled)
? createDbTelegramPendingActionRepository(runtime.databaseUrl!) ? createDbTelegramPendingActionRepository(runtime.databaseUrl!)
: null : null
const processedBotMessageRepositoryClient =
runtime.databaseUrl && runtime.assistantEnabled
? createDbProcessedBotMessageRepository(runtime.databaseUrl!)
: null
const assistantMemoryStore = createInMemoryAssistantConversationMemoryStore( const assistantMemoryStore = createInMemoryAssistantConversationMemoryStore(
runtime.assistantMemoryMaxTurns runtime.assistantMemoryMaxTurns
) )
@@ -203,6 +210,10 @@ if (telegramPendingActionRepositoryClient) {
shutdownTasks.push(telegramPendingActionRepositoryClient.close) shutdownTasks.push(telegramPendingActionRepositoryClient.close)
} }
if (processedBotMessageRepositoryClient) {
shutdownTasks.push(processedBotMessageRepositoryClient.close)
}
if (runtime.databaseUrl && householdConfigurationRepositoryClient) { if (runtime.databaseUrl && householdConfigurationRepositoryClient) {
const purchaseRepositoryClient = createPurchaseMessageRepository(runtime.databaseUrl!) const purchaseRepositoryClient = createPurchaseMessageRepository(runtime.databaseUrl!)
shutdownTasks.push(purchaseRepositoryClient.close) shutdownTasks.push(purchaseRepositoryClient.close)
@@ -366,21 +377,40 @@ if (
householdConfigurationRepositoryClient && householdConfigurationRepositoryClient &&
telegramPendingActionRepositoryClient telegramPendingActionRepositoryClient
) { ) {
registerDmAssistant({ if (processedBotMessageRepositoryClient) {
bot, registerDmAssistant({
householdConfigurationRepository: householdConfigurationRepositoryClient.repository, bot,
promptRepository: telegramPendingActionRepositoryClient.repository, householdConfigurationRepository: householdConfigurationRepositoryClient.repository,
financeServiceForHousehold, messageProcessingRepository: processedBotMessageRepositoryClient.repository,
memoryStore: assistantMemoryStore, promptRepository: telegramPendingActionRepositoryClient.repository,
rateLimiter: assistantRateLimiter, financeServiceForHousehold,
usageTracker: assistantUsageTracker, memoryStore: assistantMemoryStore,
...(conversationalAssistant rateLimiter: assistantRateLimiter,
? { usageTracker: assistantUsageTracker,
assistant: conversationalAssistant ...(conversationalAssistant
} ? {
: {}), assistant: conversationalAssistant
logger: getLogger('dm-assistant') }
}) : {}),
logger: getLogger('dm-assistant')
})
} else {
registerDmAssistant({
bot,
householdConfigurationRepository: householdConfigurationRepositoryClient.repository,
promptRepository: telegramPendingActionRepositoryClient.repository,
financeServiceForHousehold,
memoryStore: assistantMemoryStore,
rateLimiter: assistantRateLimiter,
usageTracker: assistantUsageTracker,
...(conversationalAssistant
? {
assistant: conversationalAssistant
}
: {}),
logger: getLogger('dm-assistant')
})
}
} }
const server = createBotWebhookServer({ const server = createBotWebhookServer({

View File

@@ -1,6 +1,7 @@
import { describe, expect, test } from 'bun:test' import { describe, expect, test } from 'bun:test'
import { import {
buildPurchaseInterpretationInput,
createOpenAiPurchaseInterpreter, createOpenAiPurchaseInterpreter,
type PurchaseInterpretation type PurchaseInterpretation
} from './openai-purchase-interpreter' } from './openai-purchase-interpreter'
@@ -15,6 +16,22 @@ function successfulResponse(payload: unknown): Response {
} }
describe('createOpenAiPurchaseInterpreter', () => { describe('createOpenAiPurchaseInterpreter', () => {
test('includes clarification context when provided', () => {
expect(
buildPurchaseInterpretationInput('лари', {
recentMessages: ['Купил сосисоны, отдал 45 кровных']
})
).toBe(
[
'Recent relevant messages from the same sender in this purchase topic:',
'1. Купил сосисоны, отдал 45 кровных',
'',
'Latest message to interpret:',
'лари'
].join('\n')
)
})
test('parses nested responses api content output', async () => { test('parses nested responses api content output', async () => {
const interpreter = createOpenAiPurchaseInterpreter('test-key', 'gpt-5-mini') const interpreter = createOpenAiPurchaseInterpreter('test-key', 'gpt-5-mini')
expect(interpreter).toBeDefined() expect(interpreter).toBeDefined()
@@ -139,4 +156,48 @@ describe('createOpenAiPurchaseInterpreter', () => {
globalThis.fetch = originalFetch globalThis.fetch = originalFetch
} }
}) })
test('defaults omitted purchase currency to the household currency', async () => {
const interpreter = createOpenAiPurchaseInterpreter('test-key', 'gpt-5-mini')
expect(interpreter).toBeDefined()
const originalFetch = globalThis.fetch
globalThis.fetch = (async () =>
successfulResponse({
output: [
{
content: [
{
text: JSON.stringify({
decision: 'clarification',
amountMinor: '4500',
currency: null,
itemDescription: 'сосисоны',
confidence: 85,
clarificationQuestion: 'В какой валюте 45?'
})
}
]
}
]
})) as unknown as typeof fetch
try {
const result = await interpreter!('Купил сосисоны, отдал 45 кровных', {
defaultCurrency: 'GEL'
})
expect(result).toEqual<PurchaseInterpretation>({
decision: 'purchase',
amountMinor: 4500n,
currency: 'GEL',
itemDescription: 'сосисоны',
confidence: 85,
parserMode: 'llm',
clarificationQuestion: null
})
} finally {
globalThis.fetch = originalFetch
}
})
}) })

View File

@@ -12,10 +12,15 @@ export interface PurchaseInterpretation {
clarificationQuestion: string | null clarificationQuestion: string | null
} }
export interface PurchaseClarificationContext {
recentMessages: readonly string[]
}
export type PurchaseMessageInterpreter = ( export type PurchaseMessageInterpreter = (
rawText: string, rawText: string,
options: { options: {
defaultCurrency: 'GEL' | 'USD' defaultCurrency: 'GEL' | 'USD'
clarificationContext?: PurchaseClarificationContext
} }
) => Promise<PurchaseInterpretation | null> ) => Promise<PurchaseInterpretation | null>
@@ -51,6 +56,49 @@ function normalizeConfidence(value: number): number {
return Math.max(0, Math.min(100, Math.round(scaled))) return Math.max(0, Math.min(100, Math.round(scaled)))
} }
function resolveMissingCurrency(input: {
decision: PurchaseInterpretationDecision
amountMinor: bigint | null
currency: 'GEL' | 'USD' | null
itemDescription: string | null
defaultCurrency: 'GEL' | 'USD'
}): 'GEL' | 'USD' | null {
if (input.currency !== null) {
return input.currency
}
if (
input.decision === 'not_purchase' ||
input.amountMinor === null ||
input.itemDescription === null
) {
return null
}
return input.defaultCurrency
}
export function buildPurchaseInterpretationInput(
rawText: string,
clarificationContext?: PurchaseClarificationContext
): string {
if (!clarificationContext || clarificationContext.recentMessages.length === 0) {
return rawText
}
const history = clarificationContext.recentMessages
.map((message, index) => `${index + 1}. ${message}`)
.join('\n')
return [
'Recent relevant messages from the same sender in this purchase topic:',
history,
'',
'Latest message to interpret:',
rawText
].join('\n')
}
export function createOpenAiPurchaseInterpreter( export function createOpenAiPurchaseInterpreter(
apiKey: string | undefined, apiKey: string | undefined,
model: string model: string
@@ -72,9 +120,12 @@ export function createOpenAiPurchaseInterpreter(
{ {
role: 'system', role: 'system',
content: [ content: [
'You classify a single Telegram message from a household shared-purchases topic.', 'You classify a purchase candidate from a household shared-purchases topic.',
'Decide whether the message is a real shared purchase, needs clarification, or is not a shared purchase at all.', 'Decide whether the latest message is a real shared purchase, needs clarification, or is not a shared purchase at all.',
`The household default currency is ${options.defaultCurrency}, but do not assume that omitted currency means ${options.defaultCurrency}.`, `The household default currency is ${options.defaultCurrency}. If a real purchase clearly omits currency, use ${options.defaultCurrency}.`,
'If recent messages from the same sender are provided, treat them as clarification context for the latest message.',
'If the latest message is a complete standalone purchase on its own, ignore the earlier clarification context.',
'If the latest message answers a previous clarification, combine it with the earlier messages to resolve the purchase.',
'Use clarification when the amount, currency, item, or overall intent is missing or uncertain.', 'Use clarification when the amount, currency, item, or overall intent is missing or uncertain.',
'Return a clarification question in the same language as the user message when clarification is needed.', 'Return a clarification question in the same language as the user message when clarification is needed.',
'Return only JSON that matches the schema.' 'Return only JSON that matches the schema.'
@@ -82,7 +133,7 @@ export function createOpenAiPurchaseInterpreter(
}, },
{ {
role: 'user', role: 'user',
content: rawText content: buildPurchaseInterpretationInput(rawText, options.clarificationContext)
} }
], ],
text: { text: {
@@ -165,19 +216,35 @@ export function createOpenAiPurchaseInterpreter(
return null return null
} }
const amountMinor = asOptionalBigInt(parsedJson.amountMinor)
const itemDescription = normalizeOptionalText(parsedJson.itemDescription)
const currency = resolveMissingCurrency({
decision: parsedJson.decision,
amountMinor,
currency: normalizeCurrency(parsedJson.currency),
itemDescription,
defaultCurrency: options.defaultCurrency
})
const decision =
parsedJson.decision === 'clarification' &&
amountMinor !== null &&
currency !== null &&
itemDescription
? 'purchase'
: parsedJson.decision
const clarificationQuestion = normalizeOptionalText(parsedJson.clarificationQuestion) const clarificationQuestion = normalizeOptionalText(parsedJson.clarificationQuestion)
if (parsedJson.decision === 'clarification' && !clarificationQuestion) { if (decision === 'clarification' && !clarificationQuestion) {
return null return null
} }
return { return {
decision: parsedJson.decision, decision,
amountMinor: asOptionalBigInt(parsedJson.amountMinor), amountMinor,
currency: normalizeCurrency(parsedJson.currency), currency,
itemDescription: normalizeOptionalText(parsedJson.itemDescription), itemDescription,
confidence: normalizeConfidence(parsedJson.confidence), confidence: normalizeConfidence(parsedJson.confidence),
parserMode: 'llm', parserMode: 'llm',
clarificationQuestion clarificationQuestion: decision === 'clarification' ? clarificationQuestion : null
} }
} }
} }

View File

@@ -447,8 +447,16 @@ describe('registerPurchaseTopicIngestion', () => {
await bot.handleUpdate(purchaseUpdate('Bought toilet paper 30 gel') as never) await bot.handleUpdate(purchaseUpdate('Bought toilet paper 30 gel') as never)
expect(calls).toHaveLength(2) expect(calls).toHaveLength(3)
expect(calls[0]).toMatchObject({ expect(calls[0]).toMatchObject({
method: 'sendChatAction',
payload: {
chat_id: Number(config.householdChatId),
action: 'typing',
message_thread_id: config.purchaseTopicId
}
})
expect(calls[1]).toMatchObject({
method: 'sendMessage', method: 'sendMessage',
payload: { payload: {
chat_id: Number(config.householdChatId), chat_id: Number(config.householdChatId),
@@ -458,11 +466,11 @@ describe('registerPurchaseTopicIngestion', () => {
} }
} }
}) })
expect(calls[1]).toMatchObject({ expect(calls[2]).toMatchObject({
method: 'editMessageText', method: 'editMessageText',
payload: { payload: {
chat_id: Number(config.householdChatId), chat_id: Number(config.householdChatId),
message_id: 1, message_id: 2,
text: 'I think this shared purchase was: toilet paper - 30.00 GEL. Confirm or cancel below.', text: 'I think this shared purchase was: toilet paper - 30.00 GEL. Confirm or cancel below.',
reply_markup: { reply_markup: {
inline_keyboard: [ inline_keyboard: [

View File

@@ -1,5 +1,5 @@
import { instantFromEpochSeconds, instantToDate, Money, type Instant } from '@household/domain' import { instantFromEpochSeconds, instantToDate, Money, type Instant } from '@household/domain'
import { and, eq } from 'drizzle-orm' import { and, desc, eq } from 'drizzle-orm'
import type { Bot, Context } from 'grammy' import type { Bot, Context } from 'grammy'
import type { Logger } from '@household/observability' import type { Logger } from '@household/observability'
import type { import type {
@@ -13,6 +13,7 @@ import type {
PurchaseInterpretation, PurchaseInterpretation,
PurchaseMessageInterpreter PurchaseMessageInterpreter
} from './openai-purchase-interpreter' } from './openai-purchase-interpreter'
import { startTypingIndicator } from './telegram-chat-action'
const PURCHASE_CONFIRM_CALLBACK_PREFIX = 'purchase:confirm:' const PURCHASE_CONFIRM_CALLBACK_PREFIX = 'purchase:confirm:'
const PURCHASE_CANCEL_CALLBACK_PREFIX = 'purchase:cancel:' const PURCHASE_CANCEL_CALLBACK_PREFIX = 'purchase:cancel:'
@@ -146,6 +147,9 @@ interface PurchasePersistenceDecision {
needsReview: boolean needsReview: boolean
} }
const CLARIFICATION_CONTEXT_MAX_AGE_MS = 30 * 60_000
const MAX_CLARIFICATION_CONTEXT_MESSAGES = 3
function normalizeInterpretation( function normalizeInterpretation(
interpretation: PurchaseInterpretation | null, interpretation: PurchaseInterpretation | null,
parserError: string | null parserError: string | null
@@ -459,6 +463,47 @@ export function createPurchaseMessageRepository(databaseUrl: string): {
prepare: false prepare: false
}) })
async function getClarificationContext(
record: PurchaseTopicRecord
): Promise<readonly string[] | undefined> {
const rows = await db
.select({
rawText: schema.purchaseMessages.rawText,
messageSentAt: schema.purchaseMessages.messageSentAt,
ingestedAt: schema.purchaseMessages.ingestedAt
})
.from(schema.purchaseMessages)
.where(
and(
eq(schema.purchaseMessages.householdId, record.householdId),
eq(schema.purchaseMessages.senderTelegramUserId, record.senderTelegramUserId),
eq(schema.purchaseMessages.telegramThreadId, record.threadId),
eq(schema.purchaseMessages.processingStatus, 'clarification_needed')
)
)
.orderBy(
desc(schema.purchaseMessages.messageSentAt),
desc(schema.purchaseMessages.ingestedAt)
)
.limit(MAX_CLARIFICATION_CONTEXT_MESSAGES)
const currentMessageTimestamp = instantToDate(record.messageSentAt).getTime()
const recentMessages = rows
.filter((row) => {
const referenceTimestamp = (row.messageSentAt ?? row.ingestedAt)?.getTime()
return (
referenceTimestamp !== undefined &&
currentMessageTimestamp - referenceTimestamp >= 0 &&
currentMessageTimestamp - referenceTimestamp <= CLARIFICATION_CONTEXT_MAX_AGE_MS
)
})
.reverse()
.map((row) => row.rawText.trim())
.filter((value) => value.length > 0)
return recentMessages.length > 0 ? recentMessages : undefined
}
async function getStoredMessage( async function getStoredMessage(
purchaseMessageId: string purchaseMessageId: string
): Promise<StoredPurchaseMessageRow | null> { ): Promise<StoredPurchaseMessageRow | null> {
@@ -595,10 +640,18 @@ export function createPurchaseMessageRepository(databaseUrl: string): {
const senderMemberId = matchedMember[0]?.id ?? null const senderMemberId = matchedMember[0]?.id ?? null
let parserError: string | null = null let parserError: string | null = null
const clarificationContext = interpreter ? await getClarificationContext(record) : undefined
const interpretation = interpreter const interpretation = interpreter
? await interpreter(record.rawText, { ? await interpreter(record.rawText, {
defaultCurrency: defaultCurrency ?? 'GEL' defaultCurrency: defaultCurrency ?? 'GEL',
...(clarificationContext
? {
clarificationContext: {
recentMessages: clarificationContext
}
}
: {})
}).catch((error) => { }).catch((error) => {
parserError = error instanceof Error ? error.message : 'Unknown interpreter error' parserError = error instanceof Error ? error.message : 'Unknown interpreter error'
return null return null
@@ -988,6 +1041,8 @@ export function registerPurchaseTopicIngestion(
return return
} }
const typingIndicator = options.interpreter ? startTypingIndicator(ctx) : null
try { try {
const pendingReply = options.interpreter const pendingReply = options.interpreter
? await sendPurchaseProcessingReply(ctx, getBotTranslations('en').purchase.processing) ? await sendPurchaseProcessingReply(ctx, getBotTranslations('en').purchase.processing)
@@ -1006,6 +1061,8 @@ export function registerPurchaseTopicIngestion(
}, },
'Failed to ingest purchase topic message' 'Failed to ingest purchase topic message'
) )
} finally {
typingIndicator?.stop()
} }
}) })
} }
@@ -1049,6 +1106,8 @@ export function registerConfiguredPurchaseTopicIngestion(
return return
} }
const typingIndicator = options.interpreter ? startTypingIndicator(ctx) : null
try { try {
const billingSettings = await householdConfigurationRepository.getHouseholdBillingSettings( const billingSettings = await householdConfigurationRepository.getHouseholdBillingSettings(
record.householdId record.householdId
@@ -1080,6 +1139,8 @@ export function registerConfiguredPurchaseTopicIngestion(
}, },
'Failed to ingest purchase topic message' 'Failed to ingest purchase topic message'
) )
} finally {
typingIndicator?.stop()
} }
}) })
} }

View File

@@ -0,0 +1,55 @@
import type { Context } from 'grammy'
const TYPING_REFRESH_INTERVAL_MS = 4_000
export interface ActiveChatAction {
stop(): void
}
export function startTypingIndicator(ctx: Context): ActiveChatAction {
const chatId = ctx.chat?.id
if (!chatId) {
return {
stop() {}
}
}
const messageThreadId =
ctx.msg && 'message_thread_id' in ctx.msg ? ctx.msg.message_thread_id : undefined
let active = true
const sendTypingAction = async () => {
if (!active) {
return
}
const options =
messageThreadId !== undefined
? {
message_thread_id: messageThreadId
}
: undefined
try {
await ctx.api.sendChatAction(chatId, 'typing', options)
} catch {}
}
void sendTypingAction()
const interval = setInterval(() => {
void sendTypingAction()
}, TYPING_REFRESH_INTERVAL_MS)
if (typeof interval.unref === 'function') {
interval.unref()
}
return {
stop() {
active = false
clearInterval(interval)
}
}
}

View File

@@ -1,5 +1,6 @@
export { createDbAnonymousFeedbackRepository } from './anonymous-feedback-repository' export { createDbAnonymousFeedbackRepository } from './anonymous-feedback-repository'
export { createDbFinanceRepository } from './finance-repository' export { createDbFinanceRepository } from './finance-repository'
export { createDbHouseholdConfigurationRepository } from './household-config-repository' export { createDbHouseholdConfigurationRepository } from './household-config-repository'
export { createDbProcessedBotMessageRepository } from './processed-bot-message-repository'
export { createDbReminderDispatchRepository } from './reminder-dispatch-repository' export { createDbReminderDispatchRepository } from './reminder-dispatch-repository'
export { createDbTelegramPendingActionRepository } from './telegram-pending-action-repository' export { createDbTelegramPendingActionRepository } from './telegram-pending-action-repository'

View File

@@ -0,0 +1,58 @@
import { and, eq } from 'drizzle-orm'
import { createDbClient, schema } from '@household/db'
import type { ProcessedBotMessageRepository } from '@household/ports'
export function createDbProcessedBotMessageRepository(databaseUrl: string): {
repository: ProcessedBotMessageRepository
close: () => Promise<void>
} {
const { db, queryClient } = createDbClient(databaseUrl, {
max: 3,
prepare: false
})
const repository: ProcessedBotMessageRepository = {
async claimMessage(input) {
const rows = await db
.insert(schema.processedBotMessages)
.values({
householdId: input.householdId,
source: input.source,
sourceMessageKey: input.sourceMessageKey,
payloadHash: input.payloadHash ?? null
})
.onConflictDoNothing({
target: [
schema.processedBotMessages.householdId,
schema.processedBotMessages.source,
schema.processedBotMessages.sourceMessageKey
]
})
.returning({ id: schema.processedBotMessages.id })
return {
claimed: rows.length > 0
}
},
async releaseMessage(input) {
await db
.delete(schema.processedBotMessages)
.where(
and(
eq(schema.processedBotMessages.householdId, input.householdId),
eq(schema.processedBotMessages.source, input.source),
eq(schema.processedBotMessages.sourceMessageKey, input.sourceMessageKey)
)
)
}
}
return {
repository,
close: async () => {
await queryClient.end({ timeout: 5 })
}
}
}

View File

@@ -6,6 +6,12 @@ export {
type ReminderTarget, type ReminderTarget,
type ReminderType type ReminderType
} from './reminders' } from './reminders'
export type {
ClaimProcessedBotMessageInput,
ClaimProcessedBotMessageResult,
ProcessedBotMessageRepository,
ReleaseProcessedBotMessageInput
} from './processed-bot-messages'
export { export {
HOUSEHOLD_TOPIC_ROLES, HOUSEHOLD_TOPIC_ROLES,
type HouseholdConfigurationRepository, type HouseholdConfigurationRepository,

View File

@@ -0,0 +1,21 @@
export interface ClaimProcessedBotMessageInput {
householdId: string
source: string
sourceMessageKey: string
payloadHash?: string | null
}
export interface ClaimProcessedBotMessageResult {
claimed: boolean
}
export interface ReleaseProcessedBotMessageInput {
householdId: string
source: string
sourceMessageKey: string
}
export interface ProcessedBotMessageRepository {
claimMessage(input: ClaimProcessedBotMessageInput): Promise<ClaimProcessedBotMessageResult>
releaseMessage(input: ReleaseProcessedBotMessageInput): Promise<void>
}