diff --git a/apps/bot/src/dm-assistant.test.ts b/apps/bot/src/dm-assistant.test.ts index e929180..5c8128f 100644 --- a/apps/bot/src/dm-assistant.test.ts +++ b/apps/bot/src/dm-assistant.test.ts @@ -3,6 +3,7 @@ import { describe, expect, test } from 'bun:test' import type { FinanceCommandService } from '@household/application' import type { HouseholdConfigurationRepository, + ProcessedBotMessageRepository, TelegramPendingActionRecord, TelegramPendingActionRepository } from '@household/ports' @@ -313,6 +314,30 @@ function createPromptRepository(): TelegramPendingActionRepository { } } +function createProcessedBotMessageRepository(): ProcessedBotMessageRepository { + const claims = new Set() + + return { + async claimMessage(input) { + const key = `${input.householdId}:${input.source}:${input.sourceMessageKey}` + if (claims.has(key)) { + return { + claimed: false + } + } + + claims.add(key) + + return { + claimed: true + } + }, + async releaseMessage(input) { + claims.delete(`${input.householdId}:${input.source}:${input.sourceMessageKey}`) + } + } +} + describe('registerDmAssistant', () => { test('replies with a conversational DM answer and records token usage', async () => { const bot = createTestBot() @@ -372,19 +397,26 @@ describe('registerDmAssistant', () => { await bot.handleUpdate(privateMessageUpdate('How much do I still owe this month?') as never) - expect(calls).toHaveLength(2) + expect(calls).toHaveLength(3) expect(calls[0]).toMatchObject({ + method: 'sendChatAction', + payload: { + chat_id: 123456, + action: 'typing' + } + }) + expect(calls[1]).toMatchObject({ method: 'sendMessage', payload: { chat_id: 123456, text: 'Working on it...' } }) - expect(calls[1]).toMatchObject({ + expect(calls[2]).toMatchObject({ method: 'editMessageText', payload: { chat_id: 123456, - message_id: 1, + message_id: 2, text: 'You still owe 350.00 GEL this cycle.' } }) @@ -462,6 +494,82 @@ describe('registerDmAssistant', () => { }) }) + test('ignores duplicate deliveries of the same DM update', async () => { + const bot = createTestBot() + const calls: Array<{ method: string; payload: unknown }> = [] + const usageTracker = createInMemoryAssistantUsageTracker() + + bot.api.config.use(async (_prev, method, payload) => { + calls.push({ method, payload }) + + if (method === 'sendMessage') { + return { + ok: true, + result: { + message_id: calls.length, + date: Math.floor(Date.now() / 1000), + chat: { + id: 123456, + type: 'private' + }, + text: (payload as { text?: string }).text ?? 'ok' + } + } as never + } + + return { + ok: true, + result: true + } as never + }) + + registerDmAssistant({ + bot, + assistant: { + async respond() { + return { + text: 'You still owe 350.00 GEL this cycle.', + usage: { + inputTokens: 100, + outputTokens: 25, + totalTokens: 125 + } + } + } + }, + householdConfigurationRepository: createHouseholdRepository(), + messageProcessingRepository: createProcessedBotMessageRepository(), + promptRepository: createPromptRepository(), + financeServiceForHousehold: () => createFinanceService(), + memoryStore: createInMemoryAssistantConversationMemoryStore(12), + rateLimiter: createInMemoryAssistantRateLimiter({ + burstLimit: 5, + burstWindowMs: 60_000, + rollingLimit: 50, + rollingWindowMs: 86_400_000 + }), + usageTracker + }) + + const update = privateMessageUpdate('How much do I still owe this month?') + await bot.handleUpdate(update as never) + await bot.handleUpdate(update as never) + + expect(calls).toHaveLength(3) + expect(usageTracker.listHouseholdUsage('household-1')).toEqual([ + { + householdId: 'household-1', + telegramUserId: '123456', + displayName: 'Stan', + requestCount: 1, + inputTokens: 100, + outputTokens: 25, + totalTokens: 125, + updatedAt: expect.any(String) + } + ]) + }) + test('confirms a pending payment proposal from DM callback', async () => { const bot = createTestBot() const calls: Array<{ method: string; payload: unknown }> = [] diff --git a/apps/bot/src/dm-assistant.ts b/apps/bot/src/dm-assistant.ts index 2e3d7e1..d367787 100644 --- a/apps/bot/src/dm-assistant.ts +++ b/apps/bot/src/dm-assistant.ts @@ -3,6 +3,7 @@ import { Money } from '@household/domain' import type { Logger } from '@household/observability' import type { HouseholdConfigurationRepository, + ProcessedBotMessageRepository, TelegramPendingActionRepository } from '@household/ports' import type { Bot, Context } from 'grammy' @@ -10,10 +11,12 @@ import type { Bot, Context } from 'grammy' import { resolveReplyLocale } from './bot-locale' import { getBotTranslations, type BotLocale } from './i18n' import type { AssistantReply, ConversationalAssistant } from './openai-chat-assistant' +import { startTypingIndicator } from './telegram-chat-action' const ASSISTANT_PAYMENT_ACTION = 'assistant_payment_confirmation' as const const ASSISTANT_PAYMENT_CONFIRM_CALLBACK_PREFIX = 'assistant_payment:confirm:' const ASSISTANT_PAYMENT_CANCEL_CALLBACK_PREFIX = 'assistant_payment:cancel:' +const DM_ASSISTANT_MESSAGE_SOURCE = 'telegram-dm-assistant' const MEMORY_SUMMARY_MAX_CHARS = 1200 interface AssistantConversationTurn { @@ -465,6 +468,7 @@ export function registerDmAssistant(options: { bot: Bot assistant?: ConversationalAssistant householdConfigurationRepository: HouseholdConfigurationRepository + messageProcessingRepository?: ProcessedBotMessageRepository promptRepository: TelegramPendingActionRepository financeServiceForHousehold: (householdId: string) => FinanceCommandService memoryStore: AssistantConversationMemoryStore @@ -641,135 +645,181 @@ export function registerDmAssistant(options: { } const member = memberships[0]! - const rateLimit = options.rateLimiter.consume(`${member.householdId}:${telegramUserId}`) - if (!rateLimit.allowed) { - await ctx.reply(t.rateLimited(formatRetryDelay(locale, rateLimit.retryAfterMs))) - return - } + const updateId = ctx.update.update_id?.toString() + const dedupeClaim = + options.messageProcessingRepository && typeof updateId === 'string' + ? { + repository: options.messageProcessingRepository, + updateId + } + : null - const financeService = options.financeServiceForHousehold(member.householdId) - const paymentProposal = await maybeCreatePaymentProposal({ - rawText: ctx.msg.text, - householdId: member.householdId, - memberId: member.id, - financeService, - householdConfigurationRepository: options.householdConfigurationRepository - }) - - if (paymentProposal.status === 'clarification') { - await ctx.reply(t.paymentClarification) - return - } - - if (paymentProposal.status === 'unsupported_currency') { - await ctx.reply(t.paymentUnsupportedCurrency) - return - } - - if (paymentProposal.status === 'no_balance') { - await ctx.reply(t.paymentNoBalance) - return - } - - if (paymentProposal.status === 'proposal') { - await options.promptRepository.upsertPendingAction({ - telegramUserId, - telegramChatId, - action: ASSISTANT_PAYMENT_ACTION, - payload: { - ...paymentProposal.payload - }, - expiresAt: null + if (dedupeClaim) { + const claim = await dedupeClaim.repository.claimMessage({ + householdId: member.householdId, + source: DM_ASSISTANT_MESSAGE_SOURCE, + sourceMessageKey: dedupeClaim.updateId }) - const amount = Money.fromMinor( - BigInt(paymentProposal.payload.amountMinor), - paymentProposal.payload.currency - ) - const proposalText = t.paymentProposal( - paymentProposal.payload.kind, - amount.toMajorString(), - amount.currency - ) - options.memoryStore.appendTurn(telegramUserId, { - role: 'user', - text: ctx.msg.text - }) - options.memoryStore.appendTurn(telegramUserId, { - role: 'assistant', - text: proposalText - }) - - await ctx.reply(proposalText, { - reply_markup: paymentProposalReplyMarkup(locale, paymentProposal.payload.proposalId) - }) - return + if (!claim.claimed) { + options.logger?.info( + { + event: 'assistant.duplicate_update', + householdId: member.householdId, + telegramUserId, + updateId: dedupeClaim.updateId + }, + 'Duplicate DM assistant update ignored' + ) + return + } } - if (!options.assistant) { - await ctx.reply(t.unavailable) - return - } - - const memory = options.memoryStore.get(telegramUserId) - const householdContext = await buildHouseholdContext({ - householdId: member.householdId, - memberId: member.id, - memberDisplayName: member.displayName, - locale, - householdConfigurationRepository: options.householdConfigurationRepository, - financeService - }) - const pendingReply = await sendAssistantProcessingReply(ctx, t.processing) - try { - const reply = await options.assistant.respond({ - locale, - householdContext, - memorySummary: memory.summary, - recentTurns: memory.turns, - userMessage: ctx.msg.text - }) + const rateLimit = options.rateLimiter.consume(`${member.householdId}:${telegramUserId}`) + if (!rateLimit.allowed) { + await ctx.reply(t.rateLimited(formatRetryDelay(locale, rateLimit.retryAfterMs))) + return + } - options.usageTracker.record({ + const financeService = options.financeServiceForHousehold(member.householdId) + const paymentProposal = await maybeCreatePaymentProposal({ + rawText: ctx.msg.text, householdId: member.householdId, - telegramUserId, - displayName: member.displayName, - usage: reply.usage - }) - options.memoryStore.appendTurn(telegramUserId, { - role: 'user', - text: ctx.msg.text - }) - options.memoryStore.appendTurn(telegramUserId, { - role: 'assistant', - text: reply.text + memberId: member.id, + financeService, + householdConfigurationRepository: options.householdConfigurationRepository }) - options.logger?.info( - { - event: 'assistant.reply', + if (paymentProposal.status === 'clarification') { + await ctx.reply(t.paymentClarification) + return + } + + if (paymentProposal.status === 'unsupported_currency') { + await ctx.reply(t.paymentUnsupportedCurrency) + return + } + + if (paymentProposal.status === 'no_balance') { + await ctx.reply(t.paymentNoBalance) + return + } + + if (paymentProposal.status === 'proposal') { + await options.promptRepository.upsertPendingAction({ + telegramUserId, + telegramChatId, + action: ASSISTANT_PAYMENT_ACTION, + payload: { + ...paymentProposal.payload + }, + expiresAt: null + }) + + const amount = Money.fromMinor( + BigInt(paymentProposal.payload.amountMinor), + paymentProposal.payload.currency + ) + const proposalText = t.paymentProposal( + paymentProposal.payload.kind, + amount.toMajorString(), + amount.currency + ) + options.memoryStore.appendTurn(telegramUserId, { + role: 'user', + text: ctx.msg.text + }) + options.memoryStore.appendTurn(telegramUserId, { + role: 'assistant', + text: proposalText + }) + + await ctx.reply(proposalText, { + reply_markup: paymentProposalReplyMarkup(locale, paymentProposal.payload.proposalId) + }) + return + } + + if (!options.assistant) { + await ctx.reply(t.unavailable) + return + } + + const memory = options.memoryStore.get(telegramUserId) + const typingIndicator = startTypingIndicator(ctx) + let pendingReply: PendingAssistantReply | null = null + + try { + const householdContext = await buildHouseholdContext({ + householdId: member.householdId, + memberId: member.id, + memberDisplayName: member.displayName, + locale, + householdConfigurationRepository: options.householdConfigurationRepository, + financeService + }) + pendingReply = await sendAssistantProcessingReply(ctx, t.processing) + const reply = await options.assistant.respond({ + locale, + householdContext, + memorySummary: memory.summary, + recentTurns: memory.turns, + userMessage: ctx.msg.text + }) + + options.usageTracker.record({ householdId: member.householdId, telegramUserId, - inputTokens: reply.usage.inputTokens, - outputTokens: reply.usage.outputTokens, - totalTokens: reply.usage.totalTokens - }, - 'DM assistant reply generated' - ) + displayName: member.displayName, + usage: reply.usage + }) + options.memoryStore.appendTurn(telegramUserId, { + role: 'user', + text: ctx.msg.text + }) + options.memoryStore.appendTurn(telegramUserId, { + role: 'assistant', + text: reply.text + }) - await finalizeAssistantReply(ctx, pendingReply, reply.text) + options.logger?.info( + { + event: 'assistant.reply', + householdId: member.householdId, + telegramUserId, + inputTokens: reply.usage.inputTokens, + outputTokens: reply.usage.outputTokens, + totalTokens: reply.usage.totalTokens + }, + 'DM assistant reply generated' + ) + + await finalizeAssistantReply(ctx, pendingReply, reply.text) + } catch (error) { + options.logger?.error( + { + event: 'assistant.reply_failed', + householdId: member.householdId, + telegramUserId, + error + }, + 'DM assistant reply failed' + ) + await finalizeAssistantReply(ctx, pendingReply, t.unavailable) + } finally { + typingIndicator.stop() + } } catch (error) { - options.logger?.error( - { - event: 'assistant.reply_failed', + if (dedupeClaim) { + await dedupeClaim.repository.releaseMessage({ householdId: member.householdId, - telegramUserId, - error - }, - 'DM assistant reply failed' - ) - await finalizeAssistantReply(ctx, pendingReply, t.unavailable) + source: DM_ASSISTANT_MESSAGE_SOURCE, + sourceMessageKey: dedupeClaim.updateId + }) + } + + throw error } }) } diff --git a/apps/bot/src/index.ts b/apps/bot/src/index.ts index 7d71fc2..0dbf377 100644 --- a/apps/bot/src/index.ts +++ b/apps/bot/src/index.ts @@ -15,6 +15,7 @@ import { createDbAnonymousFeedbackRepository, createDbFinanceRepository, createDbHouseholdConfigurationRepository, + createDbProcessedBotMessageRepository, createDbReminderDispatchRepository, createDbTelegramPendingActionRepository } from '@household/adapters-db' @@ -85,7 +86,9 @@ const bot = createTelegramBot( getLogger('telegram'), householdConfigurationRepositoryClient?.repository ) -const webhookHandler = webhookCallback(bot, 'std/http') +const webhookHandler = webhookCallback(bot, 'std/http', { + onTimeout: 'return' +}) const financeRepositoryClients = new Map>() const financeServices = new Map>() const paymentConfirmationServices = new Map< @@ -110,6 +113,10 @@ const telegramPendingActionRepositoryClient = runtime.databaseUrl && (runtime.anonymousFeedbackEnabled || runtime.assistantEnabled) ? createDbTelegramPendingActionRepository(runtime.databaseUrl!) : null +const processedBotMessageRepositoryClient = + runtime.databaseUrl && runtime.assistantEnabled + ? createDbProcessedBotMessageRepository(runtime.databaseUrl!) + : null const assistantMemoryStore = createInMemoryAssistantConversationMemoryStore( runtime.assistantMemoryMaxTurns ) @@ -203,6 +210,10 @@ if (telegramPendingActionRepositoryClient) { shutdownTasks.push(telegramPendingActionRepositoryClient.close) } +if (processedBotMessageRepositoryClient) { + shutdownTasks.push(processedBotMessageRepositoryClient.close) +} + if (runtime.databaseUrl && householdConfigurationRepositoryClient) { const purchaseRepositoryClient = createPurchaseMessageRepository(runtime.databaseUrl!) shutdownTasks.push(purchaseRepositoryClient.close) @@ -366,21 +377,40 @@ if ( householdConfigurationRepositoryClient && telegramPendingActionRepositoryClient ) { - registerDmAssistant({ - bot, - householdConfigurationRepository: householdConfigurationRepositoryClient.repository, - promptRepository: telegramPendingActionRepositoryClient.repository, - financeServiceForHousehold, - memoryStore: assistantMemoryStore, - rateLimiter: assistantRateLimiter, - usageTracker: assistantUsageTracker, - ...(conversationalAssistant - ? { - assistant: conversationalAssistant - } - : {}), - logger: getLogger('dm-assistant') - }) + if (processedBotMessageRepositoryClient) { + registerDmAssistant({ + bot, + householdConfigurationRepository: householdConfigurationRepositoryClient.repository, + messageProcessingRepository: processedBotMessageRepositoryClient.repository, + promptRepository: telegramPendingActionRepositoryClient.repository, + financeServiceForHousehold, + memoryStore: assistantMemoryStore, + rateLimiter: assistantRateLimiter, + usageTracker: assistantUsageTracker, + ...(conversationalAssistant + ? { + assistant: conversationalAssistant + } + : {}), + logger: getLogger('dm-assistant') + }) + } else { + registerDmAssistant({ + bot, + householdConfigurationRepository: householdConfigurationRepositoryClient.repository, + promptRepository: telegramPendingActionRepositoryClient.repository, + financeServiceForHousehold, + memoryStore: assistantMemoryStore, + rateLimiter: assistantRateLimiter, + usageTracker: assistantUsageTracker, + ...(conversationalAssistant + ? { + assistant: conversationalAssistant + } + : {}), + logger: getLogger('dm-assistant') + }) + } } const server = createBotWebhookServer({ diff --git a/apps/bot/src/openai-purchase-interpreter.test.ts b/apps/bot/src/openai-purchase-interpreter.test.ts index 99fc5c8..9c4a7e9 100644 --- a/apps/bot/src/openai-purchase-interpreter.test.ts +++ b/apps/bot/src/openai-purchase-interpreter.test.ts @@ -1,6 +1,7 @@ import { describe, expect, test } from 'bun:test' import { + buildPurchaseInterpretationInput, createOpenAiPurchaseInterpreter, type PurchaseInterpretation } from './openai-purchase-interpreter' @@ -15,6 +16,22 @@ function successfulResponse(payload: unknown): Response { } describe('createOpenAiPurchaseInterpreter', () => { + test('includes clarification context when provided', () => { + expect( + buildPurchaseInterpretationInput('лари', { + recentMessages: ['Купил сосисоны, отдал 45 кровных'] + }) + ).toBe( + [ + 'Recent relevant messages from the same sender in this purchase topic:', + '1. Купил сосисоны, отдал 45 кровных', + '', + 'Latest message to interpret:', + 'лари' + ].join('\n') + ) + }) + test('parses nested responses api content output', async () => { const interpreter = createOpenAiPurchaseInterpreter('test-key', 'gpt-5-mini') expect(interpreter).toBeDefined() @@ -139,4 +156,48 @@ describe('createOpenAiPurchaseInterpreter', () => { globalThis.fetch = originalFetch } }) + + test('defaults omitted purchase currency to the household currency', async () => { + const interpreter = createOpenAiPurchaseInterpreter('test-key', 'gpt-5-mini') + expect(interpreter).toBeDefined() + + const originalFetch = globalThis.fetch + globalThis.fetch = (async () => + successfulResponse({ + output: [ + { + content: [ + { + text: JSON.stringify({ + decision: 'clarification', + amountMinor: '4500', + currency: null, + itemDescription: 'сосисоны', + confidence: 85, + clarificationQuestion: 'В какой валюте 45?' + }) + } + ] + } + ] + })) as unknown as typeof fetch + + try { + const result = await interpreter!('Купил сосисоны, отдал 45 кровных', { + defaultCurrency: 'GEL' + }) + + expect(result).toEqual({ + decision: 'purchase', + amountMinor: 4500n, + currency: 'GEL', + itemDescription: 'сосисоны', + confidence: 85, + parserMode: 'llm', + clarificationQuestion: null + }) + } finally { + globalThis.fetch = originalFetch + } + }) }) diff --git a/apps/bot/src/openai-purchase-interpreter.ts b/apps/bot/src/openai-purchase-interpreter.ts index 3690651..11c82d3 100644 --- a/apps/bot/src/openai-purchase-interpreter.ts +++ b/apps/bot/src/openai-purchase-interpreter.ts @@ -12,10 +12,15 @@ export interface PurchaseInterpretation { clarificationQuestion: string | null } +export interface PurchaseClarificationContext { + recentMessages: readonly string[] +} + export type PurchaseMessageInterpreter = ( rawText: string, options: { defaultCurrency: 'GEL' | 'USD' + clarificationContext?: PurchaseClarificationContext } ) => Promise @@ -51,6 +56,49 @@ function normalizeConfidence(value: number): number { return Math.max(0, Math.min(100, Math.round(scaled))) } +function resolveMissingCurrency(input: { + decision: PurchaseInterpretationDecision + amountMinor: bigint | null + currency: 'GEL' | 'USD' | null + itemDescription: string | null + defaultCurrency: 'GEL' | 'USD' +}): 'GEL' | 'USD' | null { + if (input.currency !== null) { + return input.currency + } + + if ( + input.decision === 'not_purchase' || + input.amountMinor === null || + input.itemDescription === null + ) { + return null + } + + return input.defaultCurrency +} + +export function buildPurchaseInterpretationInput( + rawText: string, + clarificationContext?: PurchaseClarificationContext +): string { + if (!clarificationContext || clarificationContext.recentMessages.length === 0) { + return rawText + } + + const history = clarificationContext.recentMessages + .map((message, index) => `${index + 1}. ${message}`) + .join('\n') + + return [ + 'Recent relevant messages from the same sender in this purchase topic:', + history, + '', + 'Latest message to interpret:', + rawText + ].join('\n') +} + export function createOpenAiPurchaseInterpreter( apiKey: string | undefined, model: string @@ -72,9 +120,12 @@ export function createOpenAiPurchaseInterpreter( { role: 'system', content: [ - 'You classify a single Telegram message from a household shared-purchases topic.', - 'Decide whether the message is a real shared purchase, needs clarification, or is not a shared purchase at all.', - `The household default currency is ${options.defaultCurrency}, but do not assume that omitted currency means ${options.defaultCurrency}.`, + 'You classify a purchase candidate from a household shared-purchases topic.', + 'Decide whether the latest message is a real shared purchase, needs clarification, or is not a shared purchase at all.', + `The household default currency is ${options.defaultCurrency}. If a real purchase clearly omits currency, use ${options.defaultCurrency}.`, + 'If recent messages from the same sender are provided, treat them as clarification context for the latest message.', + 'If the latest message is a complete standalone purchase on its own, ignore the earlier clarification context.', + 'If the latest message answers a previous clarification, combine it with the earlier messages to resolve the purchase.', 'Use clarification when the amount, currency, item, or overall intent is missing or uncertain.', 'Return a clarification question in the same language as the user message when clarification is needed.', 'Return only JSON that matches the schema.' @@ -82,7 +133,7 @@ export function createOpenAiPurchaseInterpreter( }, { role: 'user', - content: rawText + content: buildPurchaseInterpretationInput(rawText, options.clarificationContext) } ], text: { @@ -165,19 +216,35 @@ export function createOpenAiPurchaseInterpreter( return null } + const amountMinor = asOptionalBigInt(parsedJson.amountMinor) + const itemDescription = normalizeOptionalText(parsedJson.itemDescription) + const currency = resolveMissingCurrency({ + decision: parsedJson.decision, + amountMinor, + currency: normalizeCurrency(parsedJson.currency), + itemDescription, + defaultCurrency: options.defaultCurrency + }) + const decision = + parsedJson.decision === 'clarification' && + amountMinor !== null && + currency !== null && + itemDescription + ? 'purchase' + : parsedJson.decision const clarificationQuestion = normalizeOptionalText(parsedJson.clarificationQuestion) - if (parsedJson.decision === 'clarification' && !clarificationQuestion) { + if (decision === 'clarification' && !clarificationQuestion) { return null } return { - decision: parsedJson.decision, - amountMinor: asOptionalBigInt(parsedJson.amountMinor), - currency: normalizeCurrency(parsedJson.currency), - itemDescription: normalizeOptionalText(parsedJson.itemDescription), + decision, + amountMinor, + currency, + itemDescription, confidence: normalizeConfidence(parsedJson.confidence), parserMode: 'llm', - clarificationQuestion + clarificationQuestion: decision === 'clarification' ? clarificationQuestion : null } } } diff --git a/apps/bot/src/purchase-topic-ingestion.test.ts b/apps/bot/src/purchase-topic-ingestion.test.ts index ad30bd4..06797be 100644 --- a/apps/bot/src/purchase-topic-ingestion.test.ts +++ b/apps/bot/src/purchase-topic-ingestion.test.ts @@ -447,8 +447,16 @@ describe('registerPurchaseTopicIngestion', () => { await bot.handleUpdate(purchaseUpdate('Bought toilet paper 30 gel') as never) - expect(calls).toHaveLength(2) + expect(calls).toHaveLength(3) expect(calls[0]).toMatchObject({ + method: 'sendChatAction', + payload: { + chat_id: Number(config.householdChatId), + action: 'typing', + message_thread_id: config.purchaseTopicId + } + }) + expect(calls[1]).toMatchObject({ method: 'sendMessage', payload: { chat_id: Number(config.householdChatId), @@ -458,11 +466,11 @@ describe('registerPurchaseTopicIngestion', () => { } } }) - expect(calls[1]).toMatchObject({ + expect(calls[2]).toMatchObject({ method: 'editMessageText', payload: { chat_id: Number(config.householdChatId), - message_id: 1, + message_id: 2, text: 'I think this shared purchase was: toilet paper - 30.00 GEL. Confirm or cancel below.', reply_markup: { inline_keyboard: [ diff --git a/apps/bot/src/purchase-topic-ingestion.ts b/apps/bot/src/purchase-topic-ingestion.ts index 0734e35..073e70a 100644 --- a/apps/bot/src/purchase-topic-ingestion.ts +++ b/apps/bot/src/purchase-topic-ingestion.ts @@ -1,5 +1,5 @@ import { instantFromEpochSeconds, instantToDate, Money, type Instant } from '@household/domain' -import { and, eq } from 'drizzle-orm' +import { and, desc, eq } from 'drizzle-orm' import type { Bot, Context } from 'grammy' import type { Logger } from '@household/observability' import type { @@ -13,6 +13,7 @@ import type { PurchaseInterpretation, PurchaseMessageInterpreter } from './openai-purchase-interpreter' +import { startTypingIndicator } from './telegram-chat-action' const PURCHASE_CONFIRM_CALLBACK_PREFIX = 'purchase:confirm:' const PURCHASE_CANCEL_CALLBACK_PREFIX = 'purchase:cancel:' @@ -146,6 +147,9 @@ interface PurchasePersistenceDecision { needsReview: boolean } +const CLARIFICATION_CONTEXT_MAX_AGE_MS = 30 * 60_000 +const MAX_CLARIFICATION_CONTEXT_MESSAGES = 3 + function normalizeInterpretation( interpretation: PurchaseInterpretation | null, parserError: string | null @@ -459,6 +463,47 @@ export function createPurchaseMessageRepository(databaseUrl: string): { prepare: false }) + async function getClarificationContext( + record: PurchaseTopicRecord + ): Promise { + const rows = await db + .select({ + rawText: schema.purchaseMessages.rawText, + messageSentAt: schema.purchaseMessages.messageSentAt, + ingestedAt: schema.purchaseMessages.ingestedAt + }) + .from(schema.purchaseMessages) + .where( + and( + eq(schema.purchaseMessages.householdId, record.householdId), + eq(schema.purchaseMessages.senderTelegramUserId, record.senderTelegramUserId), + eq(schema.purchaseMessages.telegramThreadId, record.threadId), + eq(schema.purchaseMessages.processingStatus, 'clarification_needed') + ) + ) + .orderBy( + desc(schema.purchaseMessages.messageSentAt), + desc(schema.purchaseMessages.ingestedAt) + ) + .limit(MAX_CLARIFICATION_CONTEXT_MESSAGES) + + const currentMessageTimestamp = instantToDate(record.messageSentAt).getTime() + const recentMessages = rows + .filter((row) => { + const referenceTimestamp = (row.messageSentAt ?? row.ingestedAt)?.getTime() + return ( + referenceTimestamp !== undefined && + currentMessageTimestamp - referenceTimestamp >= 0 && + currentMessageTimestamp - referenceTimestamp <= CLARIFICATION_CONTEXT_MAX_AGE_MS + ) + }) + .reverse() + .map((row) => row.rawText.trim()) + .filter((value) => value.length > 0) + + return recentMessages.length > 0 ? recentMessages : undefined + } + async function getStoredMessage( purchaseMessageId: string ): Promise { @@ -595,10 +640,18 @@ export function createPurchaseMessageRepository(databaseUrl: string): { const senderMemberId = matchedMember[0]?.id ?? null let parserError: string | null = null + const clarificationContext = interpreter ? await getClarificationContext(record) : undefined const interpretation = interpreter ? await interpreter(record.rawText, { - defaultCurrency: defaultCurrency ?? 'GEL' + defaultCurrency: defaultCurrency ?? 'GEL', + ...(clarificationContext + ? { + clarificationContext: { + recentMessages: clarificationContext + } + } + : {}) }).catch((error) => { parserError = error instanceof Error ? error.message : 'Unknown interpreter error' return null @@ -988,6 +1041,8 @@ export function registerPurchaseTopicIngestion( return } + const typingIndicator = options.interpreter ? startTypingIndicator(ctx) : null + try { const pendingReply = options.interpreter ? await sendPurchaseProcessingReply(ctx, getBotTranslations('en').purchase.processing) @@ -1006,6 +1061,8 @@ export function registerPurchaseTopicIngestion( }, 'Failed to ingest purchase topic message' ) + } finally { + typingIndicator?.stop() } }) } @@ -1049,6 +1106,8 @@ export function registerConfiguredPurchaseTopicIngestion( return } + const typingIndicator = options.interpreter ? startTypingIndicator(ctx) : null + try { const billingSettings = await householdConfigurationRepository.getHouseholdBillingSettings( record.householdId @@ -1080,6 +1139,8 @@ export function registerConfiguredPurchaseTopicIngestion( }, 'Failed to ingest purchase topic message' ) + } finally { + typingIndicator?.stop() } }) } diff --git a/apps/bot/src/telegram-chat-action.ts b/apps/bot/src/telegram-chat-action.ts new file mode 100644 index 0000000..d92c316 --- /dev/null +++ b/apps/bot/src/telegram-chat-action.ts @@ -0,0 +1,55 @@ +import type { Context } from 'grammy' + +const TYPING_REFRESH_INTERVAL_MS = 4_000 + +export interface ActiveChatAction { + stop(): void +} + +export function startTypingIndicator(ctx: Context): ActiveChatAction { + const chatId = ctx.chat?.id + if (!chatId) { + return { + stop() {} + } + } + + const messageThreadId = + ctx.msg && 'message_thread_id' in ctx.msg ? ctx.msg.message_thread_id : undefined + + let active = true + + const sendTypingAction = async () => { + if (!active) { + return + } + + const options = + messageThreadId !== undefined + ? { + message_thread_id: messageThreadId + } + : undefined + + try { + await ctx.api.sendChatAction(chatId, 'typing', options) + } catch {} + } + + void sendTypingAction() + + const interval = setInterval(() => { + void sendTypingAction() + }, TYPING_REFRESH_INTERVAL_MS) + + if (typeof interval.unref === 'function') { + interval.unref() + } + + return { + stop() { + active = false + clearInterval(interval) + } + } +} diff --git a/packages/adapters-db/src/index.ts b/packages/adapters-db/src/index.ts index 0ea022a..48073d1 100644 --- a/packages/adapters-db/src/index.ts +++ b/packages/adapters-db/src/index.ts @@ -1,5 +1,6 @@ export { createDbAnonymousFeedbackRepository } from './anonymous-feedback-repository' export { createDbFinanceRepository } from './finance-repository' export { createDbHouseholdConfigurationRepository } from './household-config-repository' +export { createDbProcessedBotMessageRepository } from './processed-bot-message-repository' export { createDbReminderDispatchRepository } from './reminder-dispatch-repository' export { createDbTelegramPendingActionRepository } from './telegram-pending-action-repository' diff --git a/packages/adapters-db/src/processed-bot-message-repository.ts b/packages/adapters-db/src/processed-bot-message-repository.ts new file mode 100644 index 0000000..3b40374 --- /dev/null +++ b/packages/adapters-db/src/processed-bot-message-repository.ts @@ -0,0 +1,58 @@ +import { and, eq } from 'drizzle-orm' + +import { createDbClient, schema } from '@household/db' +import type { ProcessedBotMessageRepository } from '@household/ports' + +export function createDbProcessedBotMessageRepository(databaseUrl: string): { + repository: ProcessedBotMessageRepository + close: () => Promise +} { + const { db, queryClient } = createDbClient(databaseUrl, { + max: 3, + prepare: false + }) + + const repository: ProcessedBotMessageRepository = { + async claimMessage(input) { + const rows = await db + .insert(schema.processedBotMessages) + .values({ + householdId: input.householdId, + source: input.source, + sourceMessageKey: input.sourceMessageKey, + payloadHash: input.payloadHash ?? null + }) + .onConflictDoNothing({ + target: [ + schema.processedBotMessages.householdId, + schema.processedBotMessages.source, + schema.processedBotMessages.sourceMessageKey + ] + }) + .returning({ id: schema.processedBotMessages.id }) + + return { + claimed: rows.length > 0 + } + }, + + async releaseMessage(input) { + await db + .delete(schema.processedBotMessages) + .where( + and( + eq(schema.processedBotMessages.householdId, input.householdId), + eq(schema.processedBotMessages.source, input.source), + eq(schema.processedBotMessages.sourceMessageKey, input.sourceMessageKey) + ) + ) + } + } + + return { + repository, + close: async () => { + await queryClient.end({ timeout: 5 }) + } + } +} diff --git a/packages/ports/src/index.ts b/packages/ports/src/index.ts index e4597f4..eeb4341 100644 --- a/packages/ports/src/index.ts +++ b/packages/ports/src/index.ts @@ -6,6 +6,12 @@ export { type ReminderTarget, type ReminderType } from './reminders' +export type { + ClaimProcessedBotMessageInput, + ClaimProcessedBotMessageResult, + ProcessedBotMessageRepository, + ReleaseProcessedBotMessageInput +} from './processed-bot-messages' export { HOUSEHOLD_TOPIC_ROLES, type HouseholdConfigurationRepository, diff --git a/packages/ports/src/processed-bot-messages.ts b/packages/ports/src/processed-bot-messages.ts new file mode 100644 index 0000000..6c8076e --- /dev/null +++ b/packages/ports/src/processed-bot-messages.ts @@ -0,0 +1,21 @@ +export interface ClaimProcessedBotMessageInput { + householdId: string + source: string + sourceMessageKey: string + payloadHash?: string | null +} + +export interface ClaimProcessedBotMessageResult { + claimed: boolean +} + +export interface ReleaseProcessedBotMessageInput { + householdId: string + source: string + sourceMessageKey: string +} + +export interface ProcessedBotMessageRepository { + claimMessage(input: ClaimProcessedBotMessageInput): Promise + releaseMessage(input: ReleaseProcessedBotMessageInput): Promise +}