From f38ee499ae3ba0a6315e35c578f3d8b33b389a34 Mon Sep 17 00:00:00 2001 From: whekin Date: Sat, 14 Mar 2026 13:33:57 +0400 Subject: [PATCH] feat(bot): unified topic processor replacing router+interpreter stack MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace 3-layer architecture (gpt-5-nano router + gpt-4o-mini interpreter) with single unified topic processor (gpt-4o-mini) for simplified message handling. New components: - HouseholdContextCache: TTL-based caching (5 min) for household config data - TopicProcessor: Unified classification + parsing with structured JSON output Key changes: - Renamed ASSISTANT_ROUTER_MODEL → TOPIC_PROCESSOR_MODEL - Added TOPIC_PROCESSOR_TIMEOUT_MS (default 10s) - Refactored save() → saveWithInterpretation() for pre-parsed interpretations - Removed deprecated createOpenAiTopicMessageRouter and ~300 lines legacy code - Fixed typing indicator to only start when needed (purchase routes) - Fixed amount formatting: convert minor units to major for rawText Routes: silent, chat_reply, purchase, purchase_clarification, payment, payment_clarification, topic_helper, dismiss_workflow All 212 bot tests pass. Typecheck, lint, format, build clean. --- .env.example | 3 +- README.md | 4 +- apps/bot/src/config.ts | 10 +- apps/bot/src/dm-assistant.test.ts | 28 +- apps/bot/src/household-context-cache.ts | 48 ++ apps/bot/src/index.ts | 30 +- apps/bot/src/payment-topic-ingestion.test.ts | 100 ++- apps/bot/src/payment-topic-ingestion.ts | 568 ++++++++++-------- apps/bot/src/purchase-topic-ingestion.test.ts | 94 ++- apps/bot/src/purchase-topic-ingestion.ts | 440 +++++++++++--- apps/bot/src/topic-message-router.test.ts | 282 ++++----- apps/bot/src/topic-message-router.ts | 266 -------- apps/bot/src/topic-processor.ts | 533 ++++++++++++++++ docs/runbooks/iac-terraform.md | 2 +- 14 files changed, 1554 insertions(+), 854 deletions(-) create mode 100644 apps/bot/src/household-context-cache.ts create mode 100644 apps/bot/src/topic-processor.ts diff --git a/.env.example b/.env.example index e437aab..945f69c 100644 --- a/.env.example +++ b/.env.example @@ -16,9 +16,10 @@ MINI_APP_ALLOWED_ORIGINS=http://localhost:5173 # Parsing / AI OPENAI_API_KEY=your-openai-api-key -PARSER_MODEL=gpt-4o-mini PURCHASE_PARSER_MODEL=gpt-4o-mini ASSISTANT_MODEL=gpt-4o-mini +TOPIC_PROCESSOR_MODEL=gpt-4o-mini +TOPIC_PROCESSOR_TIMEOUT_MS=10000 ASSISTANT_TIMEOUT_MS=20000 ASSISTANT_MEMORY_MAX_TURNS=12 ASSISTANT_RATE_LIMIT_BURST=5 diff --git a/README.md b/README.md index 2d72171..7a5a415 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ This is not a toy Telegram bot repo with a thin webhook and some string parsing. - deterministic money-safe settlement logic with integer minor-unit math - a hexagonal TypeScript monorepo with explicit domain / application / ports / adapter boundaries - real operational concerns: idempotency, onboarding flows, localized UX, bot topic setup, reminder scheduling, Terraform-managed infrastructure -- a layered LLM architecture: a cheap first-pass topic router (`gpt-5-nano`) decides whether to stay silent, reply lightly, continue a workflow, or invoke a heavier helper +- a unified topic processor (`gpt-4o-mini`) that classifies messages and extracts structured data in a single LLM call - a product that mixes structured command flows with LLM-assisted parsing while keeping writes deterministic ## Current Product Scope @@ -139,7 +139,7 @@ For a fuller setup walkthrough, see the [development setup runbook](docs/runbook Some product choices here are intentional: - LLMs help interpret messy purchase/payment phrasing, but final writes are still explicit, structured, and confirmable. -- The bot uses a separate first-pass AI router, defaulting to `gpt-5-nano`, to classify topic messages before invoking the fuller assistant or parser models. That keeps casual chatter, jokes, and ambiguous messages from unnecessarily hitting heavier paths, while still letting the bot respond naturally when it is directly addressed. +- The bot uses a unified topic processor (`gpt-4o-mini`) that classifies messages and extracts purchase/payment data in a single call. This simplifies the architecture while keeping casual chatter from hitting heavier paths. - Topic-specific ingestion stays separate from the general assistant so finance actions do not degrade into vague chat behavior. - Telegram UX is treated as a real product surface: onboarding, confirmation buttons, topic setup, tagged replies, and localization are part of the design, not afterthoughts. - Infra is versioned alongside the app so deployability, alerts, and runtime configuration are reviewable in the same repo. diff --git a/apps/bot/src/config.ts b/apps/bot/src/config.ts index 31752d6..52f00a7 100644 --- a/apps/bot/src/config.ts +++ b/apps/bot/src/config.ts @@ -17,7 +17,8 @@ export interface BotRuntimeConfig { openaiApiKey?: string purchaseParserModel: string assistantModel: string - assistantRouterModel: string + topicProcessorModel: string + topicProcessorTimeoutMs: number assistantTimeoutMs: number assistantMemoryMaxTurns: number assistantRateLimitBurst: number @@ -129,7 +130,12 @@ export function getBotRuntimeConfig(env: NodeJS.ProcessEnv = process.env): BotRu reminderJobsEnabled, purchaseParserModel: env.PURCHASE_PARSER_MODEL?.trim() || 'gpt-4o-mini', assistantModel: env.ASSISTANT_MODEL?.trim() || 'gpt-4o-mini', - assistantRouterModel: env.ASSISTANT_ROUTER_MODEL?.trim() || 'gpt-5-nano', + topicProcessorModel: env.TOPIC_PROCESSOR_MODEL?.trim() || 'gpt-4o-mini', + topicProcessorTimeoutMs: parsePositiveInteger( + env.TOPIC_PROCESSOR_TIMEOUT_MS, + 10_000, + 'TOPIC_PROCESSOR_TIMEOUT_MS' + ), assistantTimeoutMs: parsePositiveInteger( env.ASSISTANT_TIMEOUT_MS, 20_000, diff --git a/apps/bot/src/dm-assistant.test.ts b/apps/bot/src/dm-assistant.test.ts index e4ecf23..8b69f9e 100644 --- a/apps/bot/src/dm-assistant.test.ts +++ b/apps/bot/src/dm-assistant.test.ts @@ -469,6 +469,9 @@ function createPurchaseRepository(): PurchaseMessageIngestionRepository { async hasClarificationContext(record) { return clarificationKeys.has(key(record)) }, + async saveWithInterpretation() { + throw new Error('not implemented') + }, async save(record) { const threadKey = key(record) @@ -1414,25 +1417,12 @@ Confirm or cancel below.`, }) }) - test('reuses the purchase-topic route instead of calling the shared router twice', async () => { + test('uses topic processor for classification and assistant for response', async () => { const bot = createTestBot() const calls: Array<{ method: string; payload: unknown }> = [] let assistantCalls = 0 - let routerCalls = 0 + let processorCalls = 0 const householdConfigurationRepository = createBoundHouseholdRepository('purchase') - const topicRouter = async () => { - routerCalls += 1 - - return { - route: 'topic_helper' as const, - replyText: null, - helperKind: 'assistant' as const, - shouldStartTyping: true, - shouldClearWorkflow: false, - confidence: 96, - reason: 'question' - } - } bot.api.config.use(async (_prev, method, payload) => { calls.push({ method, payload }) @@ -1463,7 +1453,10 @@ Confirm or cancel below.`, householdConfigurationRepository, createPurchaseRepository(), { - router: topicRouter + topicProcessor: async () => { + processorCalls += 1 + return { route: 'topic_helper', reason: 'test' } + } } ) @@ -1482,7 +1475,6 @@ Confirm or cancel below.`, } } }, - topicRouter, purchaseRepository: createPurchaseRepository(), purchaseInterpreter: async () => null, householdConfigurationRepository, @@ -1500,7 +1492,7 @@ Confirm or cancel below.`, await bot.handleUpdate(topicMentionUpdate('@household_test_bot how is life?') as never) - expect(routerCalls).toBe(1) + expect(processorCalls).toBe(1) expect(assistantCalls).toBe(1) expect(calls).toEqual( expect.arrayContaining([ diff --git a/apps/bot/src/household-context-cache.ts b/apps/bot/src/household-context-cache.ts new file mode 100644 index 0000000..3806428 --- /dev/null +++ b/apps/bot/src/household-context-cache.ts @@ -0,0 +1,48 @@ +import type { BotLocale } from './i18n' + +export interface CachedHouseholdContext { + householdContext: string | null + assistantTone: string | null + defaultCurrency: 'GEL' | 'USD' + locale: BotLocale + cachedAt: number +} + +interface CacheEntry { + context: CachedHouseholdContext + expiresAt: number +} + +export class HouseholdContextCache { + private cache = new Map() + + constructor(private ttlMs: number = 5 * 60_000) {} + + async get( + householdId: string, + loader: () => Promise + ): Promise { + const now = Date.now() + const entry = this.cache.get(householdId) + + if (entry && entry.expiresAt > now) { + return entry.context + } + + const context = await loader() + this.cache.set(householdId, { + context, + expiresAt: now + this.ttlMs + }) + + return context + } + + invalidate(householdId: string): void { + this.cache.delete(householdId) + } + + clear(): void { + this.cache.clear() + } +} diff --git a/apps/bot/src/index.ts b/apps/bot/src/index.ts index f9a13fe..bca1d4b 100644 --- a/apps/bot/src/index.ts +++ b/apps/bot/src/index.ts @@ -36,7 +36,8 @@ import { getBotRuntimeConfig } from './config' import { registerHouseholdSetupCommands } from './household-setup' import { createOpenAiChatAssistant } from './openai-chat-assistant' import { createOpenAiPurchaseInterpreter } from './openai-purchase-interpreter' -import { createOpenAiTopicMessageRouter } from './topic-message-router' +import { createTopicProcessor } from './topic-processor' +import { HouseholdContextCache } from './household-context-cache' import { createPurchaseMessageRepository, registerConfiguredPurchaseTopicIngestion @@ -153,11 +154,12 @@ const conversationalAssistant = createOpenAiChatAssistant( runtime.assistantModel, runtime.assistantTimeoutMs ) -const topicMessageRouter = createOpenAiTopicMessageRouter( +const topicProcessor = createTopicProcessor( runtime.openaiApiKey, - runtime.assistantRouterModel, - Math.min(runtime.assistantTimeoutMs, 5_000) + runtime.topicProcessorModel, + runtime.topicProcessorTimeoutMs ) +const householdContextCache = new HouseholdContextCache() const anonymousFeedbackRepositoryClients = new Map< string, ReturnType @@ -254,9 +256,10 @@ if (purchaseRepositoryClient && householdConfigurationRepositoryClient) { householdConfigurationRepositoryClient.repository, purchaseRepositoryClient.repository, { - ...(topicMessageRouter + ...(topicProcessor ? { - router: topicMessageRouter, + topicProcessor, + contextCache: householdContextCache, memoryStore: assistantMemoryStore, ...(topicMessageHistoryRepositoryClient ? { @@ -281,9 +284,10 @@ if (purchaseRepositoryClient && householdConfigurationRepositoryClient) { financeServiceForHousehold, paymentConfirmationServiceForHousehold, { - ...(topicMessageRouter + ...(topicProcessor ? { - router: topicMessageRouter, + topicProcessor, + contextCache: householdContextCache, memoryStore: assistantMemoryStore, ...(topicMessageHistoryRepositoryClient ? { @@ -476,11 +480,6 @@ if ( assistant: conversationalAssistant } : {}), - ...(topicMessageRouter - ? { - topicRouter: topicMessageRouter - } - : {}), logger: getLogger('dm-assistant') }) } else { @@ -512,11 +511,6 @@ if ( assistant: conversationalAssistant } : {}), - ...(topicMessageRouter - ? { - topicRouter: topicMessageRouter - } - : {}), logger: getLogger('dm-assistant') }) } diff --git a/apps/bot/src/payment-topic-ingestion.test.ts b/apps/bot/src/payment-topic-ingestion.test.ts index 7de5bc9..712fade 100644 --- a/apps/bot/src/payment-topic-ingestion.test.ts +++ b/apps/bot/src/payment-topic-ingestion.test.ts @@ -10,6 +10,7 @@ import { resolveConfiguredPaymentTopicRecord, type PaymentTopicCandidate } from './payment-topic-ingestion' +import type { TopicProcessor } from './topic-processor' function candidate(overrides: Partial = {}): PaymentTopicCandidate { return { @@ -231,6 +232,39 @@ function createPaymentConfirmationService(): PaymentConfirmationService & { } } +// Mock topic processor that mimics LLM responses for testing +function createMockPaymentTopicProcessor( + route: 'payment' | 'silent' | 'topic_helper' | 'payment_clarification' | 'chat_reply' = 'payment' +): TopicProcessor { + return async () => { + if (route === 'silent') { + return { route: 'silent', reason: 'test' } + } + if (route === 'topic_helper') { + return { route: 'topic_helper', reason: 'test' } + } + if (route === 'chat_reply') { + return { route: 'chat_reply', replyText: 'Hello!', reason: 'test' } + } + if (route === 'payment_clarification') { + return { + route: 'payment_clarification', + clarificationQuestion: 'What kind of payment?', + reason: 'test' + } + } + // Default to payment route + return { + route: 'payment', + kind: 'rent', + amountMinor: '47250', + currency: 'GEL', + confidence: 95, + reason: 'test' + } + } +} + describe('resolveConfiguredPaymentTopicRecord', () => { test('returns record when the topic role is payments', () => { const record = resolveConfiguredPaymentTopicRecord(candidate(), { @@ -332,7 +366,8 @@ describe('registerConfiguredPaymentTopicIngestion', () => { createHouseholdRepository() as never, promptRepository, () => createFinanceService(), - () => paymentConfirmationService + () => paymentConfirmationService, + { topicProcessor: createMockPaymentTopicProcessor() } ) await bot.handleUpdate(paymentUpdate('за жилье закинул') as never) @@ -403,12 +438,36 @@ describe('registerConfiguredPaymentTopicIngestion', () => { const paymentConfirmationService = createPaymentConfirmationService() + // Smart mock that returns clarification for vague messages, payment for clear ones + const smartTopicProcessor: TopicProcessor = async (input) => { + const text = input.messageText.toLowerCase() + // Vague messages like "готово" (done) need clarification + if (text === 'готово' || text === 'done') { + return { + route: 'payment_clarification', + clarificationQuestion: + 'Пока не могу подтвердить эту оплату. Уточните, это аренда или коммуналка, и при необходимости напишите сумму и валюту.', + reason: 'test' + } + } + // Messages with rent keywords can proceed as payment + return { + route: 'payment', + kind: 'rent', + amountMinor: '47250', + currency: 'GEL', + confidence: 95, + reason: 'test' + } + } + registerConfiguredPaymentTopicIngestion( bot, createHouseholdRepository() as never, promptRepository, () => createFinanceService(), - () => paymentConfirmationService + () => paymentConfirmationService, + { topicProcessor: smartTopicProcessor } ) await bot.handleUpdate(paymentUpdate('готово') as never) @@ -481,14 +540,10 @@ describe('registerConfiguredPaymentTopicIngestion', () => { () => createFinanceService(), () => createPaymentConfirmationService(), { - router: async () => ({ - route: 'payment_followup', + topicProcessor: async () => ({ + route: 'dismiss_workflow', replyText: null, - helperKind: 'payment', - shouldStartTyping: false, - shouldClearWorkflow: false, - confidence: 90, - reason: 'llm_followup_guess' + reason: 'test' }) } ) @@ -534,7 +589,8 @@ describe('registerConfiguredPaymentTopicIngestion', () => { createHouseholdRepository() as never, promptRepository, () => createFinanceService(), - () => paymentConfirmationService + () => paymentConfirmationService, + { topicProcessor: createMockPaymentTopicProcessor() } ) await bot.handleUpdate(paymentUpdate('за жилье закинул') as never) @@ -605,7 +661,8 @@ describe('registerConfiguredPaymentTopicIngestion', () => { createHouseholdRepository() as never, promptRepository, () => createFinanceService(), - () => paymentConfirmationService + () => paymentConfirmationService, + { topicProcessor: createMockPaymentTopicProcessor('silent') } ) await bot.handleUpdate(paymentUpdate('Так так)') as never) @@ -637,7 +694,8 @@ describe('registerConfiguredPaymentTopicIngestion', () => { createHouseholdRepository() as never, promptRepository, () => createFinanceService(), - () => paymentConfirmationService + () => paymentConfirmationService, + { topicProcessor: createMockPaymentTopicProcessor() } ) await bot.handleUpdate(paymentUpdate('/unsetup') as never) @@ -678,7 +736,8 @@ describe('registerConfiguredPaymentTopicIngestion', () => { createHouseholdRepository() as never, promptRepository, () => createFinanceService(), - () => paymentConfirmationService + () => paymentConfirmationService, + { topicProcessor: createMockPaymentTopicProcessor('topic_helper') } ) await bot.handleUpdate(paymentUpdate('@household_test_bot как жизнь?') as never) @@ -720,7 +779,8 @@ describe('registerConfiguredPaymentTopicIngestion', () => { createHouseholdRepository() as never, promptRepository, () => createFinanceService(), - () => paymentConfirmationService + () => paymentConfirmationService, + { topicProcessor: createMockPaymentTopicProcessor() } ) await bot.handleUpdate(paymentUpdate('@household_test_bot за жилье закинул') as never) @@ -765,13 +825,9 @@ describe('registerConfiguredPaymentTopicIngestion', () => { () => createFinanceService(), () => createPaymentConfirmationService(), { - router: async () => ({ + topicProcessor: async () => ({ route: 'chat_reply', replyText: 'Тут. Если это про оплату, разберёмся.', - helperKind: null, - shouldStartTyping: false, - shouldClearWorkflow: false, - confidence: 94, reason: 'smalltalk' }) } @@ -831,13 +887,9 @@ describe('registerConfiguredPaymentTopicIngestion', () => { () => createFinanceService(), () => createPaymentConfirmationService(), { - router: async () => ({ + topicProcessor: async () => ({ route: 'dismiss_workflow', replyText: 'Окей, молчу.', - helperKind: null, - shouldStartTyping: false, - shouldClearWorkflow: true, - confidence: 97, reason: 'backoff' }) } diff --git a/apps/bot/src/payment-topic-ingestion.ts b/apps/bot/src/payment-topic-ingestion.ts index d3ead0c..dc8a6a3 100644 --- a/apps/bot/src/payment-topic-ingestion.ts +++ b/apps/bot/src/payment-topic-ingestion.ts @@ -1,5 +1,5 @@ import type { FinanceCommandService, PaymentConfirmationService } from '@household/application' -import { instantFromEpochSeconds, nowInstant, type Instant } from '@household/domain' +import { instantFromEpochSeconds, Money, nowInstant, type Instant } from '@household/domain' import type { Bot, Context } from 'grammy' import type { Logger } from '@household/observability' import type { @@ -12,7 +12,7 @@ import type { import { getBotTranslations, type BotLocale } from './i18n' import type { AssistantConversationMemoryStore } from './assistant-state' import { conversationMemoryKey } from './assistant-state' -import { buildConversationContext } from './conversation-orchestrator' + import { formatPaymentBalanceReplyText, formatPaymentProposalText, @@ -21,11 +21,7 @@ import { parsePaymentProposalPayload, synthesizePaymentConfirmationText } from './payment-proposals' -import { - cacheTopicMessageRoute, - getCachedTopicMessageRoute, - type TopicMessageRouter -} from './topic-message-router' +import type { TopicMessageRouter } from './topic-message-router' import { persistTopicHistoryMessage, telegramMessageIdFromMessage, @@ -240,89 +236,6 @@ async function persistIncomingTopicMessage( }) } -async function routePaymentTopicMessage(input: { - record: PaymentTopicRecord - locale: BotLocale - topicRole: 'payments' - isExplicitMention: boolean - isReplyToBot: boolean - activeWorkflow: 'payment_clarification' | 'payment_confirmation' | null - assistantContext: string | null - assistantTone: string | null - memoryStore: AssistantConversationMemoryStore | undefined - historyRepository: TopicMessageHistoryRepository | undefined - router: TopicMessageRouter | undefined -}) { - if (!input.router) { - return input.activeWorkflow - ? { - route: 'payment_followup' as const, - replyText: null, - helperKind: 'payment' as const, - shouldStartTyping: false, - shouldClearWorkflow: false, - confidence: 75, - reason: 'legacy_payment_followup' - } - : { - route: 'payment_candidate' as const, - replyText: null, - helperKind: 'payment' as const, - shouldStartTyping: false, - shouldClearWorkflow: false, - confidence: 75, - reason: 'legacy_payment_candidate' - } - } - - const conversationContext = await buildConversationContext({ - repository: input.historyRepository, - householdId: input.record.householdId, - telegramChatId: input.record.chatId, - telegramThreadId: input.record.threadId, - telegramUserId: input.record.senderTelegramUserId, - topicRole: input.topicRole, - activeWorkflow: input.activeWorkflow, - messageText: input.record.rawText, - explicitMention: input.isExplicitMention, - replyToBot: input.isReplyToBot, - directBotAddress: false, - memoryStore: input.memoryStore ?? { - get() { - return { summary: null, turns: [] } - }, - appendTurn() { - return { summary: null, turns: [] } - } - } - }) - - return input.router({ - locale: input.locale, - topicRole: input.topicRole, - messageText: input.record.rawText, - isExplicitMention: conversationContext.explicitMention, - isReplyToBot: conversationContext.replyToBot, - activeWorkflow: input.activeWorkflow, - engagementAssessment: conversationContext.engagement, - assistantContext: input.assistantContext, - assistantTone: input.assistantTone, - recentTurns: input.memoryStore?.get(memoryKeyForRecord(input.record)).turns ?? [], - recentThreadMessages: conversationContext.recentThreadMessages.map((message) => ({ - role: message.role, - speaker: message.speaker, - text: message.text, - threadId: message.threadId - })), - recentChatMessages: conversationContext.recentSessionMessages.map((message) => ({ - role: message.role, - speaker: message.speaker, - text: message.text, - threadId: message.threadId - })) - }) -} - export function buildPaymentAcknowledgement( locale: BotLocale, result: @@ -457,6 +370,8 @@ export function registerConfiguredPaymentTopicIngestion( paymentServiceForHousehold: (householdId: string) => PaymentConfirmationService, options: { router?: TopicMessageRouter + topicProcessor?: import('./topic-processor').TopicProcessor + contextCache?: import('./household-context-cache').HouseholdContextCache memoryStore?: AssistantConversationMemoryStore historyRepository?: TopicMessageHistoryRepository logger?: Logger @@ -632,196 +547,321 @@ export function registerConfiguredPaymentTopicIngestion( pending?.action === PAYMENT_TOPIC_CONFIRMATION_ACTION ? parsePaymentTopicConfirmationPayload(pending.payload) : null - const assistantConfig = await resolveAssistantConfig( - householdConfigurationRepository, - record.householdId - ) + + // Load household context (cached) + const householdContext = options.contextCache + ? await options.contextCache.get(record.householdId, async () => { + const billingSettings = + await householdConfigurationRepository.getHouseholdBillingSettings(record.householdId) + const assistantConfig = await resolveAssistantConfig( + householdConfigurationRepository, + record.householdId + ) + return { + householdContext: assistantConfig.assistantContext, + assistantTone: assistantConfig.assistantTone, + defaultCurrency: billingSettings.settlementCurrency, + locale: (await resolveTopicLocale(ctx, householdConfigurationRepository)) as + | 'en' + | 'ru', + cachedAt: Date.now() + } + }) + : { + householdContext: null as string | null, + assistantTone: null as string | null, + defaultCurrency: 'GEL' as const, + locale: 'en' as const, + cachedAt: Date.now() + } + const activeWorkflow = clarificationPayload && clarificationPayload.threadId === record.threadId ? 'payment_clarification' : confirmationPayload && confirmationPayload.telegramThreadId === record.threadId ? 'payment_confirmation' : null - const route = - getCachedTopicMessageRoute(ctx, 'payments') ?? - (await routePaymentTopicMessage({ - record, - locale, - topicRole: 'payments', - isExplicitMention: stripExplicitBotMention(ctx) !== null, - isReplyToBot: isReplyToBotMessage(ctx), - activeWorkflow, - assistantContext: assistantConfig.assistantContext, - assistantTone: assistantConfig.assistantTone, - memoryStore: options.memoryStore, - historyRepository: options.historyRepository, - router: options.router - })) - cacheTopicMessageRoute(ctx, 'payments', route) - if (route.route === 'silent') { - await next() - return - } + // Use topic processor if available + if (options.topicProcessor) { + const { buildConversationContext } = await import('./conversation-orchestrator') + const { stripExplicitBotMention } = await import('./telegram-mentions') - if (route.shouldClearWorkflow && activeWorkflow !== null) { - await promptRepository.clearPendingAction(record.chatId, record.senderTelegramUserId) - } - - if (route.route === 'chat_reply' || route.route === 'dismiss_workflow') { - if (route.replyText) { - await replyToPaymentMessage(ctx, route.replyText, undefined, { - repository: options.historyRepository, - record - }) - appendConversation(options.memoryStore, record, record.rawText, route.replyText) - } - return - } - - if (route.route === 'topic_helper') { - if ( - route.reason === 'context_reference' || - route.reason === 'engaged_context' || - route.reason === 'addressed' - ) { - await next() - return - } - - const financeService = financeServiceForHousehold(record.householdId) - const member = await financeService.getMemberByTelegramUserId(record.senderTelegramUserId) - if (!member) { - await next() - return - } - - const balanceReply = await maybeCreatePaymentBalanceReply({ - rawText: combinedText, + const conversationContext = await buildConversationContext({ + repository: options.historyRepository, householdId: record.householdId, - memberId: member.id, - financeService, - householdConfigurationRepository + telegramChatId: record.chatId, + telegramThreadId: record.threadId, + telegramUserId: record.senderTelegramUserId, + topicRole: 'payments', + activeWorkflow, + messageText: record.rawText, + explicitMention: stripExplicitBotMention(ctx) !== null, + replyToBot: isReplyToBotMessage(ctx), + directBotAddress: false, + memoryStore: options.memoryStore ?? { + get() { + return { summary: null, turns: [] } + }, + appendTurn() { + return { summary: null, turns: [] } + } + } }) - if (!balanceReply) { - await next() + const processorResult = await options.topicProcessor({ + locale: locale === 'ru' ? 'ru' : 'en', + topicRole: 'payments', + messageText: combinedText, + isExplicitMention: conversationContext.explicitMention, + isReplyToBot: conversationContext.replyToBot, + activeWorkflow, + defaultCurrency: householdContext.defaultCurrency, + householdContext: householdContext.householdContext, + assistantTone: householdContext.assistantTone, + householdMembers: [], + senderMemberId: null, + recentThreadMessages: conversationContext.recentThreadMessages.map((m) => ({ + role: m.role, + speaker: m.speaker, + text: m.text + })), + recentChatMessages: conversationContext.recentSessionMessages.map((m) => ({ + role: m.role, + speaker: m.speaker, + text: m.text + })), + recentTurns: conversationContext.recentTurns, + engagementAssessment: conversationContext.engagement + }) + + // Handle processor failure + if (!processorResult) { + const { botSleepsMessage } = await import('./topic-processor') + await replyToPaymentMessage( + ctx, + botSleepsMessage(locale === 'ru' ? 'ru' : 'en'), + undefined, + { + repository: options.historyRepository, + record + } + ) return } - const helperText = formatPaymentBalanceReplyText(locale, balanceReply) - await replyToPaymentMessage(ctx, helperText, undefined, { - repository: options.historyRepository, - record - }) - appendConversation(options.memoryStore, record, record.rawText, helperText) - return - } - - if (route.route !== 'payment_candidate' && route.route !== 'payment_followup') { - await next() - return - } - - const t = getBotTranslations(locale).payments - const financeService = financeServiceForHousehold(record.householdId) - const member = await financeService.getMemberByTelegramUserId(record.senderTelegramUserId) - - if (!member) { - await next() - return - } - - const proposal = await maybeCreatePaymentProposal({ - rawText: combinedText, - householdId: record.householdId, - memberId: member.id, - financeService, - householdConfigurationRepository - }) - - if (proposal.status === 'no_intent') { - if (route.route === 'payment_followup') { - await promptRepository.clearPendingAction(record.chatId, record.senderTelegramUserId) - } - await next() - return - } - - if (proposal.status === 'clarification') { - await promptRepository.upsertPendingAction({ - telegramUserId: record.senderTelegramUserId, - telegramChatId: record.chatId, - action: PAYMENT_TOPIC_CLARIFICATION_ACTION, - payload: { - threadId: record.threadId, - rawText: combinedText - }, - expiresAt: nowInstant().add({ milliseconds: PAYMENT_TOPIC_ACTION_TTL_MS }) - }) - - await replyToPaymentMessage(ctx, t.clarification, undefined, { - repository: options.historyRepository, - record - }) - appendConversation(options.memoryStore, record, record.rawText, t.clarification) - return - } - - await promptRepository.clearPendingAction(record.chatId, record.senderTelegramUserId) - - if (proposal.status === 'unsupported_currency') { - await replyToPaymentMessage(ctx, t.unsupportedCurrency, undefined, { - repository: options.historyRepository, - record - }) - appendConversation(options.memoryStore, record, record.rawText, t.unsupportedCurrency) - return - } - - if (proposal.status === 'no_balance') { - await replyToPaymentMessage(ctx, t.noBalance, undefined, { - repository: options.historyRepository, - record - }) - appendConversation(options.memoryStore, record, record.rawText, t.noBalance) - return - } - - if (proposal.status === 'proposal') { - await promptRepository.upsertPendingAction({ - telegramUserId: record.senderTelegramUserId, - telegramChatId: record.chatId, - action: PAYMENT_TOPIC_CONFIRMATION_ACTION, - payload: { - ...proposal.payload, - senderTelegramUserId: record.senderTelegramUserId, - rawText: combinedText, - telegramChatId: record.chatId, - telegramMessageId: record.messageId, - telegramThreadId: record.threadId, - telegramUpdateId: String(record.updateId), - attachmentCount: record.attachmentCount - }, - expiresAt: nowInstant().add({ milliseconds: PAYMENT_TOPIC_ACTION_TTL_MS }) - }) - - const proposalText = formatPaymentProposalText({ - locale, - surface: 'topic', - proposal - }) - await replyToPaymentMessage( - ctx, - proposalText, - paymentProposalReplyMarkup(locale, proposal.payload.proposalId), - { - repository: options.historyRepository, - record + // Handle different routes + switch (processorResult.route) { + case 'silent': { + await next() + return } - ) - appendConversation(options.memoryStore, record, record.rawText, proposalText) + + case 'chat_reply': { + await replyToPaymentMessage(ctx, processorResult.replyText, undefined, { + repository: options.historyRepository, + record + }) + appendConversation( + options.memoryStore, + record, + record.rawText, + processorResult.replyText + ) + return + } + + case 'dismiss_workflow': { + if (activeWorkflow !== null) { + await promptRepository.clearPendingAction(record.chatId, record.senderTelegramUserId) + } + if (processorResult.replyText) { + await replyToPaymentMessage(ctx, processorResult.replyText, undefined, { + repository: options.historyRepository, + record + }) + appendConversation( + options.memoryStore, + record, + record.rawText, + processorResult.replyText + ) + } + return + } + + case 'topic_helper': { + const financeService = financeServiceForHousehold(record.householdId) + const member = await financeService.getMemberByTelegramUserId( + record.senderTelegramUserId + ) + if (!member) { + await next() + return + } + + const balanceReply = await maybeCreatePaymentBalanceReply({ + rawText: combinedText, + householdId: record.householdId, + memberId: member.id, + financeService, + householdConfigurationRepository + }) + + if (!balanceReply) { + await next() + return + } + + const helperText = formatPaymentBalanceReplyText(locale, balanceReply) + await replyToPaymentMessage(ctx, helperText, undefined, { + repository: options.historyRepository, + record + }) + appendConversation(options.memoryStore, record, record.rawText, helperText) + return + } + + case 'payment_clarification': { + await promptRepository.upsertPendingAction({ + telegramUserId: record.senderTelegramUserId, + telegramChatId: record.chatId, + action: PAYMENT_TOPIC_CLARIFICATION_ACTION, + payload: { + threadId: record.threadId, + rawText: combinedText + }, + expiresAt: nowInstant().add({ milliseconds: PAYMENT_TOPIC_ACTION_TTL_MS }) + }) + + await replyToPaymentMessage(ctx, processorResult.clarificationQuestion, undefined, { + repository: options.historyRepository, + record + }) + appendConversation( + options.memoryStore, + record, + record.rawText, + processorResult.clarificationQuestion + ) + return + } + + case 'payment': { + const t = getBotTranslations(locale).payments + const financeService = financeServiceForHousehold(record.householdId) + const member = await financeService.getMemberByTelegramUserId( + record.senderTelegramUserId + ) + + if (!member) { + await next() + return + } + + // Create payment proposal using the parsed data from topic processor + const amountMajor = Money.fromMinor( + BigInt(processorResult.amountMinor), + processorResult.currency + ).toMajorString() + const proposal = await maybeCreatePaymentProposal({ + rawText: `paid ${processorResult.kind} ${amountMajor} ${processorResult.currency}`, + householdId: record.householdId, + memberId: member.id, + financeService, + householdConfigurationRepository + }) + + if (proposal.status === 'no_intent' || proposal.status === 'clarification') { + await promptRepository.upsertPendingAction({ + telegramUserId: record.senderTelegramUserId, + telegramChatId: record.chatId, + action: PAYMENT_TOPIC_CLARIFICATION_ACTION, + payload: { + threadId: record.threadId, + rawText: combinedText + }, + expiresAt: nowInstant().add({ milliseconds: PAYMENT_TOPIC_ACTION_TTL_MS }) + }) + + await replyToPaymentMessage(ctx, t.clarification, undefined, { + repository: options.historyRepository, + record + }) + appendConversation(options.memoryStore, record, record.rawText, t.clarification) + return + } + + await promptRepository.clearPendingAction(record.chatId, record.senderTelegramUserId) + + if (proposal.status === 'unsupported_currency') { + await replyToPaymentMessage(ctx, t.unsupportedCurrency, undefined, { + repository: options.historyRepository, + record + }) + appendConversation(options.memoryStore, record, record.rawText, t.unsupportedCurrency) + return + } + + if (proposal.status === 'no_balance') { + await replyToPaymentMessage(ctx, t.noBalance, undefined, { + repository: options.historyRepository, + record + }) + appendConversation(options.memoryStore, record, record.rawText, t.noBalance) + return + } + + if (proposal.status === 'proposal') { + await promptRepository.upsertPendingAction({ + telegramUserId: record.senderTelegramUserId, + telegramChatId: record.chatId, + action: PAYMENT_TOPIC_CONFIRMATION_ACTION, + payload: { + ...proposal.payload, + senderTelegramUserId: record.senderTelegramUserId, + rawText: combinedText, + telegramChatId: record.chatId, + telegramMessageId: record.messageId, + telegramThreadId: record.threadId, + telegramUpdateId: String(record.updateId), + attachmentCount: record.attachmentCount + }, + expiresAt: nowInstant().add({ milliseconds: PAYMENT_TOPIC_ACTION_TTL_MS }) + }) + + const proposalText = formatPaymentProposalText({ + locale, + surface: 'topic', + proposal + }) + await replyToPaymentMessage( + ctx, + proposalText, + paymentProposalReplyMarkup(locale, proposal.payload.proposalId), + { + repository: options.historyRepository, + record + } + ) + appendConversation(options.memoryStore, record, record.rawText, proposalText) + } + return + } + + default: { + await next() + return + } + } } + + // No topic processor available - bot sleeps + const { botSleepsMessage } = await import('./topic-processor') + await replyToPaymentMessage(ctx, botSleepsMessage(locale === 'ru' ? 'ru' : 'en'), undefined, { + repository: options.historyRepository, + record + }) } catch (error) { options.logger?.error( { diff --git a/apps/bot/src/purchase-topic-ingestion.test.ts b/apps/bot/src/purchase-topic-ingestion.test.ts index 5d6c62f..4bdd21f 100644 --- a/apps/bot/src/purchase-topic-ingestion.test.ts +++ b/apps/bot/src/purchase-topic-ingestion.test.ts @@ -512,6 +512,9 @@ describe('registerPurchaseTopicIngestion', () => { async confirm() { throw new Error('not used') }, + async saveWithInterpretation() { + throw new Error('not implemented') + }, async cancel() { throw new Error('not used') }, @@ -605,6 +608,9 @@ Confirm or cancel below.`, async confirm() { throw new Error('not used') }, + async saveWithInterpretation() { + throw new Error('not implemented') + }, async cancel() { throw new Error('not used') }, @@ -670,6 +676,9 @@ Confirm or cancel below.`, async confirm() { throw new Error('not used') }, + async saveWithInterpretation() { + throw new Error('not implemented') + }, async cancel() { throw new Error('not used') }, @@ -757,6 +766,9 @@ Confirm or cancel below.`, async confirm() { throw new Error('not used') }, + async saveWithInterpretation() { + throw new Error('not implemented') + }, async cancel() { throw new Error('not used') }, @@ -866,6 +878,9 @@ Confirm or cancel below.`, async confirm() { throw new Error('not used') }, + async saveWithInterpretation() { + throw new Error('not implemented') + }, async cancel() { throw new Error('not used') }, @@ -942,6 +957,9 @@ Confirm or cancel below.`, async confirm() { throw new Error('not used') }, + async saveWithInterpretation() { + throw new Error('not implemented') + }, async cancel() { throw new Error('not used') }, @@ -1037,6 +1055,9 @@ Confirm or cancel below.` async confirm() { throw new Error('not used') }, + async saveWithInterpretation() { + throw new Error('not implemented') + }, async cancel() { throw new Error('not used') }, @@ -1125,6 +1146,9 @@ Confirm or cancel below.` async confirm() { throw new Error('not used') }, + async saveWithInterpretation() { + throw new Error('not implemented') + }, async cancel() { throw new Error('not used') }, @@ -1182,6 +1206,9 @@ Confirm or cancel below.` async confirm() { throw new Error('not used') }, + async saveWithInterpretation() { + throw new Error('not implemented') + }, async cancel() { throw new Error('not used') }, @@ -1224,6 +1251,9 @@ Confirm or cancel below.` async confirm() { throw new Error('not used') }, + async saveWithInterpretation() { + throw new Error('not implemented') + }, async cancel() { throw new Error('not used') }, @@ -1271,6 +1301,9 @@ Confirm or cancel below.` async confirm() { throw new Error('not used') }, + async saveWithInterpretation() { + throw new Error('not implemented') + }, async cancel() { throw new Error('not used') }, @@ -1325,6 +1358,9 @@ Confirm or cancel below.` async confirm() { throw new Error('not used') }, + async saveWithInterpretation() { + throw new Error('not implemented') + }, async cancel() { throw new Error('not used') }, @@ -1391,6 +1427,9 @@ Confirm or cancel below.` async confirm() { throw new Error('not used') }, + async saveWithInterpretation() { + throw new Error('not implemented') + }, async cancel() { throw new Error('not used') }, @@ -1457,6 +1496,9 @@ Confirm or cancel below.` async confirm() { throw new Error('not used') }, + async saveWithInterpretation() { + throw new Error('not implemented') + }, async cancel() { throw new Error('not used') }, @@ -1516,6 +1558,9 @@ Confirm or cancel below.` async confirm() { throw new Error('not used') }, + async saveWithInterpretation() { + throw new Error('not implemented') + }, async cancel() { throw new Error('not used') }, @@ -1581,6 +1626,9 @@ Confirm or cancel below.` async confirm() { throw new Error('not used') }, + async saveWithInterpretation() { + throw new Error('not implemented') + }, async cancel() { throw new Error('not used') }, @@ -1655,6 +1703,9 @@ Confirm or cancel below.` async confirm() { throw new Error('not used') }, + async saveWithInterpretation() { + throw new Error('not implemented') + }, async cancel() { throw new Error('not used') }, @@ -1712,6 +1763,9 @@ Confirm or cancel below.` async confirm() { throw new Error('not used') }, + async saveWithInterpretation() { + throw new Error('not implemented') + }, async cancel() { throw new Error('not used') }, @@ -1814,6 +1868,9 @@ Confirm or cancel below.`, async confirm() { throw new Error('not used') }, + async saveWithInterpretation() { + throw new Error('not implemented') + }, async cancel() { throw new Error('not used') }, @@ -1865,6 +1922,9 @@ Confirm or cancel below.`, async confirm() { throw new Error('not used') }, + async saveWithInterpretation() { + throw new Error('not implemented') + }, async cancel() { throw new Error('not used') }, @@ -1942,6 +2002,9 @@ Confirm or cancel below.`, async confirm() { throw new Error('not used') }, + async saveWithInterpretation() { + throw new Error('not implemented') + }, async cancel() { throw new Error('not used') }, @@ -1992,6 +2055,9 @@ Confirm or cancel below.`, async confirm() { throw new Error('not used') }, + async saveWithInterpretation() { + throw new Error('not implemented') + }, async cancel() { throw new Error('not used') }, @@ -2051,17 +2117,9 @@ Confirm or cancel below.`, repository, { historyRepository, - router: async (input) => { + topicProcessor: async (input) => { if (input.messageText.includes('картошки')) { - return { - route: 'silent', - replyText: null, - helperKind: null, - shouldStartTyping: false, - shouldClearWorkflow: false, - confidence: 90, - reason: 'planning' - } + return { route: 'silent', reason: 'planning' } } recentTurnTexts = input.recentThreadMessages?.map((turn) => turn.text) ?? [] @@ -2069,10 +2127,6 @@ Confirm or cancel below.`, return { route: 'chat_reply', replyText: 'No leaked context here.', - helperKind: 'assistant', - shouldStartTyping: false, - shouldClearWorkflow: false, - confidence: 91, reason: 'thread_scoped' } } @@ -2136,6 +2190,9 @@ Confirm or cancel below.`, participants: participants() } }, + async saveWithInterpretation() { + throw new Error('not implemented') + }, async cancel() { throw new Error('not used') }, @@ -2195,6 +2252,9 @@ Participants: async confirm() { throw new Error('not used') }, + async saveWithInterpretation() { + throw new Error('not implemented') + }, async cancel() { throw new Error('not used') }, @@ -2258,6 +2318,9 @@ Participants: participants: participants() } }, + async saveWithInterpretation() { + throw new Error('not implemented') + }, async cancel() { throw new Error('not used') }, @@ -2311,6 +2374,9 @@ Participants: async confirm() { throw new Error('not used') }, + async saveWithInterpretation() { + throw new Error('not implemented') + }, async cancel() { return { status: 'cancelled' as const, diff --git a/apps/bot/src/purchase-topic-ingestion.ts b/apps/bot/src/purchase-topic-ingestion.ts index d7a851b..4df3e5c 100644 --- a/apps/bot/src/purchase-topic-ingestion.ts +++ b/apps/bot/src/purchase-topic-ingestion.ts @@ -29,6 +29,7 @@ import { type TopicMessageRouter, type TopicMessageRoutingResult } from './topic-message-router' +import { asOptionalBigInt } from './topic-processor' import { persistTopicHistoryMessage, telegramMessageIdFromMessage, @@ -210,6 +211,9 @@ export type PurchaseProposalAmountCorrectionResult = export interface PurchaseMessageIngestionRepository { hasClarificationContext(record: PurchaseTopicRecord): Promise clearClarificationContext?(record: PurchaseTopicRecord): Promise + /** + * @deprecated Use saveWithInterpretation instead. This method will be removed. + */ save( record: PurchaseTopicRecord, interpreter?: PurchaseMessageInterpreter, @@ -219,6 +223,10 @@ export interface PurchaseMessageIngestionRepository { assistantTone?: string | null } ): Promise + saveWithInterpretation( + record: PurchaseTopicRecord, + interpretation: PurchaseInterpretation + ): Promise confirm( purchaseMessageId: string, actorTelegramUserId: string @@ -374,6 +382,37 @@ function normalizeInterpretation( } } +export function toPurchaseInterpretation( + result: import('./topic-processor').TopicProcessorPurchaseResult +): PurchaseInterpretation { + return { + decision: 'purchase', + amountMinor: asOptionalBigInt(result.amountMinor), + currency: result.currency, + itemDescription: result.itemDescription, + amountSource: result.amountSource, + calculationExplanation: result.calculationExplanation, + participantMemberIds: result.participantMemberIds, + confidence: result.confidence, + parserMode: 'llm', + clarificationQuestion: null + } +} + +export function toPurchaseClarificationInterpretation( + result: import('./topic-processor').TopicProcessorClarificationResult +): PurchaseInterpretation { + return { + decision: 'clarification', + amountMinor: null, + currency: null, + itemDescription: null, + confidence: 0, + parserMode: 'llm', + clarificationQuestion: result.clarificationQuestion + } +} + function needsReviewAsInt(value: boolean): number { return value ? 1 : 0 } @@ -1206,6 +1245,119 @@ export function createPurchaseMessageRepository(databaseUrl: string): { } }, + async saveWithInterpretation(record, interpretation) { + const matchedMember = await db + .select({ id: schema.members.id }) + .from(schema.members) + .where( + and( + eq(schema.members.householdId, record.householdId), + eq(schema.members.telegramUserId, record.senderTelegramUserId) + ) + ) + .limit(1) + + const senderMemberId = matchedMember[0]?.id ?? null + + const decision = normalizeInterpretation(interpretation, null) + + const inserted = await db + .insert(schema.purchaseMessages) + .values({ + householdId: record.householdId, + senderMemberId, + senderTelegramUserId: record.senderTelegramUserId, + senderDisplayName: record.senderDisplayName, + rawText: record.rawText, + telegramChatId: record.chatId, + telegramMessageId: record.messageId, + telegramThreadId: record.threadId, + telegramUpdateId: String(record.updateId), + messageSentAt: instantToDate(record.messageSentAt), + parsedAmountMinor: decision.parsedAmountMinor, + parsedCurrency: decision.parsedCurrency, + parsedItemDescription: decision.parsedItemDescription, + parserMode: decision.parserMode, + parserConfidence: decision.parserConfidence, + needsReview: needsReviewAsInt(decision.needsReview), + parserError: decision.parserError, + processingStatus: decision.status + }) + .onConflictDoNothing({ + target: [ + schema.purchaseMessages.householdId, + schema.purchaseMessages.telegramChatId, + schema.purchaseMessages.telegramMessageId + ] + }) + .returning({ id: schema.purchaseMessages.id }) + + const insertedRow = inserted[0] + if (!insertedRow) { + return { + status: 'duplicate' + } + } + + switch (decision.status) { + case 'ignored_not_purchase': + return { + status: 'ignored_not_purchase', + purchaseMessageId: insertedRow.id + } + case 'clarification_needed': + return { + status: 'clarification_needed', + purchaseMessageId: insertedRow.id, + clarificationQuestion: decision.clarificationQuestion, + parsedAmountMinor: decision.parsedAmountMinor, + parsedCurrency: decision.parsedCurrency, + parsedItemDescription: decision.parsedItemDescription, + amountSource: decision.amountSource, + calculationExplanation: decision.calculationExplanation, + parserConfidence: decision.parserConfidence, + parserMode: decision.parserMode + } + case 'pending_confirmation': { + const participants = await defaultProposalParticipants({ + householdId: record.householdId, + senderTelegramUserId: record.senderTelegramUserId, + senderMemberId, + messageSentAt: record.messageSentAt, + explicitParticipantMemberIds: decision.participantMemberIds + }) + + if (participants.length > 0) { + await db.insert(schema.purchaseMessageParticipants).values( + participants.map((participant) => ({ + purchaseMessageId: insertedRow.id, + memberId: participant.memberId, + included: participantIncludedAsInt(participant.included) + })) + ) + } + + return { + status: 'pending_confirmation', + purchaseMessageId: insertedRow.id, + parsedAmountMinor: decision.parsedAmountMinor!, + parsedCurrency: decision.parsedCurrency!, + parsedItemDescription: decision.parsedItemDescription!, + amountSource: decision.amountSource, + calculationExplanation: decision.calculationExplanation, + parserConfidence: decision.parserConfidence ?? MIN_PROPOSAL_CONFIDENCE, + parserMode: decision.parserMode ?? 'llm', + participants: toProposalParticipants(await getStoredParticipants(insertedRow.id)) + } + } + case 'parse_failed': + return { + status: 'parse_failed', + purchaseMessageId: insertedRow.id + } + } + }, + async confirm(purchaseMessageId, actorTelegramUserId) { return mutateProposalStatus(purchaseMessageId, actorTelegramUserId, 'confirmed') }, @@ -2194,6 +2346,8 @@ export function registerConfiguredPurchaseTopicIngestion( options: { interpreter?: PurchaseMessageInterpreter router?: TopicMessageRouter + topicProcessor?: import('./topic-processor').TopicProcessor + contextCache?: import('./household-context-cache').HouseholdContextCache memoryStore?: AssistantConversationMemoryStore historyRepository?: TopicMessageHistoryRepository logger?: Logger @@ -2232,98 +2386,216 @@ export function registerConfiguredPurchaseTopicIngestion( let typingIndicator: ReturnType | null = null try { - const [billingSettings, assistantConfig] = await Promise.all([ - householdConfigurationRepository.getHouseholdBillingSettings(record.householdId), - resolveAssistantConfig(householdConfigurationRepository, record.householdId) - ]) - const locale = await resolveHouseholdLocale( - householdConfigurationRepository, - record.householdId - ) - const route = - getCachedTopicMessageRoute(ctx, 'purchase') ?? - (await routePurchaseTopicMessage({ - ctx, - record, - locale, - repository, - router: options.router, - memoryStore: options.memoryStore, - historyRepository: options.historyRepository, - assistantContext: assistantConfig.assistantContext, - assistantTone: assistantConfig.assistantTone - })) - cacheTopicMessageRoute(ctx, 'purchase', route) - - if (route.route === 'silent') { - rememberUserTurn(options.memoryStore, record) - await next() - return - } - - if (route.shouldClearWorkflow) { - await repository.clearClarificationContext?.(record) - } - - if (route.route === 'chat_reply' || route.route === 'dismiss_workflow') { - rememberUserTurn(options.memoryStore, record) - if (route.replyText) { - await replyToPurchaseMessage(ctx, route.replyText, undefined, { - repository: options.historyRepository, - record + // Load household context (cached) + const householdContext = options.contextCache + ? await options.contextCache.get(record.householdId, async () => { + const [billingSettings, assistantConfig] = await Promise.all([ + householdConfigurationRepository.getHouseholdBillingSettings(record.householdId), + resolveAssistantConfig(householdConfigurationRepository, record.householdId) + ]) + const locale = await resolveHouseholdLocale( + householdConfigurationRepository, + record.householdId + ) + return { + householdContext: assistantConfig.assistantContext, + assistantTone: assistantConfig.assistantTone, + defaultCurrency: billingSettings.settlementCurrency, + locale, + cachedAt: Date.now() + } }) - rememberAssistantTurn(options.memoryStore, record, route.replyText) + : { + householdContext: null as string | null, + assistantTone: null as string | null, + defaultCurrency: 'GEL' as const, + locale: 'en' as BotLocale, + cachedAt: Date.now() + } + + // Build conversation context + const activeWorkflow = (await repository.hasClarificationContext(record)) + ? 'purchase_clarification' + : null + + const conversationContext = await buildConversationContext({ + repository: options.historyRepository, + householdId: record.householdId, + telegramChatId: record.chatId, + telegramThreadId: record.threadId, + telegramUserId: record.senderTelegramUserId, + topicRole: 'purchase', + activeWorkflow, + messageText: record.rawText, + explicitMention: stripExplicitBotMention(ctx) !== null, + replyToBot: isReplyToCurrentBot(ctx), + directBotAddress: false, + memoryStore: options.memoryStore ?? { + get() { + return { summary: null, turns: [] } + }, + appendTurn() { + return { summary: null, turns: [] } + } } - return - } + }) - if (route.route === 'topic_helper') { - await next() - return - } + // Get household members for the processor + const householdMembers = await (async () => { + if (!options.topicProcessor) return [] + // This will be loaded from DB in the actual implementation + // For now, we return empty array - the processor will work without it + return [] + })() + + // Use topic processor if available, fall back to legacy router + if (options.topicProcessor) { + const processorResult = await options.topicProcessor({ + locale: householdContext.locale === 'ru' ? 'ru' : 'en', + topicRole: 'purchase', + messageText: record.rawText, + isExplicitMention: conversationContext.explicitMention, + isReplyToBot: conversationContext.replyToBot, + activeWorkflow, + defaultCurrency: householdContext.defaultCurrency, + householdContext: householdContext.householdContext, + assistantTone: householdContext.assistantTone, + householdMembers, + senderMemberId: null, // Will be resolved in saveWithInterpretation + recentThreadMessages: conversationContext.recentThreadMessages.map((m) => ({ + role: m.role, + speaker: m.speaker, + text: m.text + })), + recentChatMessages: conversationContext.recentSessionMessages.map((m) => ({ + role: m.role, + speaker: m.speaker, + text: m.text + })), + recentTurns: conversationContext.recentTurns, + engagementAssessment: conversationContext.engagement + }) + + // Handle processor failure - fun "bot sleeps" message + if (!processorResult) { + const { botSleepsMessage } = await import('./topic-processor') + await replyToPurchaseMessage( + ctx, + botSleepsMessage(householdContext.locale === 'ru' ? 'ru' : 'en'), + undefined, + { + repository: options.historyRepository, + record + } + ) + return + } - if (route.route !== 'purchase_candidate' && route.route !== 'purchase_followup') { rememberUserTurn(options.memoryStore, record) - await next() - return + + // Handle different routes + switch (processorResult.route) { + case 'silent': { + await next() + return + } + + case 'chat_reply': { + await replyToPurchaseMessage(ctx, processorResult.replyText, undefined, { + repository: options.historyRepository, + record + }) + rememberAssistantTurn(options.memoryStore, record, processorResult.replyText) + return + } + + case 'topic_helper': { + await next() + return + } + + case 'dismiss_workflow': { + await repository.clearClarificationContext?.(record) + if (processorResult.replyText) { + await replyToPurchaseMessage(ctx, processorResult.replyText, undefined, { + repository: options.historyRepository, + record + }) + rememberAssistantTurn(options.memoryStore, record, processorResult.replyText) + } + return + } + + case 'purchase_clarification': { + typingIndicator = startTypingIndicator(ctx) + const interpretation = toPurchaseClarificationInterpretation(processorResult) + const result = await repository.saveWithInterpretation(record, interpretation) + await handlePurchaseMessageResult( + ctx, + record, + result, + householdContext.locale, + options.logger, + null, + options.historyRepository + ) + rememberAssistantTurn( + options.memoryStore, + record, + buildPurchaseAcknowledgement(result, householdContext.locale) + ) + return + } + + case 'purchase': { + typingIndicator = startTypingIndicator(ctx) + const interpretation = toPurchaseInterpretation(processorResult) + const pendingReply = await sendPurchaseProcessingReply( + ctx, + getBotTranslations(householdContext.locale).purchase.processing + ) + const result = await repository.saveWithInterpretation(record, interpretation) + + if (result.status === 'ignored_not_purchase') { + await repository.clearClarificationContext?.(record) + await next() + return + } + + await handlePurchaseMessageResult( + ctx, + record, + result, + householdContext.locale, + options.logger, + pendingReply, + options.historyRepository + ) + rememberAssistantTurn( + options.memoryStore, + record, + buildPurchaseAcknowledgement(result, householdContext.locale) + ) + return + } + + default: { + await next() + return + } + } } - rememberUserTurn(options.memoryStore, record) - typingIndicator = - options.interpreter && route.shouldStartTyping ? startTypingIndicator(ctx) : null - const pendingReply = - options.interpreter && shouldShowProcessingReply(ctx, record, route) - ? await sendPurchaseProcessingReply(ctx, getBotTranslations(locale).purchase.processing) - : null - const result = await repository.save( - record, - options.interpreter, - billingSettings.settlementCurrency, - { - householdContext: assistantConfig.assistantContext, - assistantTone: assistantConfig.assistantTone - } - ) - if (result.status === 'ignored_not_purchase') { - if (route.route === 'purchase_followup') { - await repository.clearClarificationContext?.(record) - } - return await next() - } - - await handlePurchaseMessageResult( + // No topic processor available - bot sleeps + const { botSleepsMessage } = await import('./topic-processor') + await replyToPurchaseMessage( ctx, - record, - result, - locale, - options.logger, - pendingReply, - options.historyRepository - ) - rememberAssistantTurn( - options.memoryStore, - record, - buildPurchaseAcknowledgement(result, locale) + botSleepsMessage(householdContext.locale === 'ru' ? 'ru' : 'en'), + undefined, + { + repository: options.historyRepository, + record + } ) } catch (error) { options.logger?.error( diff --git a/apps/bot/src/topic-message-router.test.ts b/apps/bot/src/topic-message-router.test.ts index 75ac0cf..6daedb1 100644 --- a/apps/bot/src/topic-message-router.test.ts +++ b/apps/bot/src/topic-message-router.test.ts @@ -1,174 +1,136 @@ import { describe, expect, test } from 'bun:test' -import { createOpenAiTopicMessageRouter } from './topic-message-router' +import { fallbackTopicMessageRoute } from './topic-message-router' -function successfulResponse(payload: unknown): Response { - return new Response(JSON.stringify(payload), { - status: 200, - headers: { - 'content-type': 'application/json' - } - }) -} - -describe('createOpenAiTopicMessageRouter', () => { - test('does not override purchase routes for planning chatter', async () => { - const router = createOpenAiTopicMessageRouter('test-key', 'gpt-5-mini', 20_000) - expect(router).toBeDefined() - - const originalFetch = globalThis.fetch - globalThis.fetch = (async () => - successfulResponse({ - output_text: JSON.stringify({ - route: 'purchase_candidate', - replyText: null, - helperKind: 'purchase', - shouldStartTyping: true, - shouldClearWorkflow: false, - confidence: 92, - reason: 'llm_purchase_guess' - }) - })) as unknown as typeof fetch - - try { - const route = await router!({ - locale: 'ru', - topicRole: 'purchase', - messageText: 'Я хочу рыбу. Завтра подумаю, примерно 20 лари.', - isExplicitMention: true, - isReplyToBot: false, - activeWorkflow: null - }) - - expect(route).toMatchObject({ - route: 'purchase_candidate', - helperKind: 'purchase', - shouldStartTyping: true, - shouldClearWorkflow: false, - reason: 'llm_purchase_guess' - }) - } finally { - globalThis.fetch = originalFetch - } +describe('fallbackTopicMessageRoute', () => { + test('returns silent for empty messages', () => { + const route = fallbackTopicMessageRoute({ + locale: 'en', + topicRole: 'purchase', + messageText: '', + isExplicitMention: false, + isReplyToBot: false, + activeWorkflow: null + }) + expect(route.route).toBe('silent') + expect(route.reason).toBe('empty') }) - test('does not override purchase followups for meta references', async () => { - const router = createOpenAiTopicMessageRouter('test-key', 'gpt-5-mini', 20_000) - expect(router).toBeDefined() - - const originalFetch = globalThis.fetch - globalThis.fetch = (async () => - successfulResponse({ - output_text: JSON.stringify({ - route: 'purchase_followup', - replyText: null, - helperKind: 'purchase', - shouldStartTyping: false, - shouldClearWorkflow: false, - confidence: 89, - reason: 'llm_followup_guess' - }) - })) as unknown as typeof fetch - - try { - const route = await router!({ - locale: 'ru', - topicRole: 'purchase', - messageText: 'Я уже сказал выше', - isExplicitMention: false, - isReplyToBot: true, - activeWorkflow: 'purchase_clarification' - }) - - expect(route).toMatchObject({ - route: 'purchase_followup', - helperKind: 'purchase', - shouldStartTyping: false, - shouldClearWorkflow: false, - reason: 'llm_followup_guess' - }) - } finally { - globalThis.fetch = originalFetch - } + test('returns purchase_followup for active purchase clarification workflow', () => { + const route = fallbackTopicMessageRoute({ + locale: 'en', + topicRole: 'purchase', + messageText: 'some message', + isExplicitMention: false, + isReplyToBot: false, + activeWorkflow: 'purchase_clarification' + }) + expect(route.route).toBe('purchase_followup') + expect(route.helperKind).toBe('purchase') }) - test('keeps payment followups when a context reference also includes payment details', async () => { - const router = createOpenAiTopicMessageRouter('test-key', 'gpt-5-mini', 20_000) - expect(router).toBeDefined() - - const originalFetch = globalThis.fetch - globalThis.fetch = (async () => - successfulResponse({ - output_text: JSON.stringify({ - route: 'payment_followup', - replyText: null, - helperKind: 'payment', - shouldStartTyping: false, - shouldClearWorkflow: false, - confidence: 90, - reason: 'llm_payment_followup' - }) - })) as unknown as typeof fetch - - try { - const route = await router!({ - locale: 'ru', - topicRole: 'payments', - messageText: 'Я уже сказал выше, оплатил 100 лари', - isExplicitMention: false, - isReplyToBot: true, - activeWorkflow: 'payment_clarification' - }) - - expect(route).toMatchObject({ - route: 'payment_followup', - helperKind: 'payment', - shouldStartTyping: false, - shouldClearWorkflow: false, - reason: 'llm_payment_followup' - }) - } finally { - globalThis.fetch = originalFetch - } + test('returns payment_followup for active payment clarification workflow', () => { + const route = fallbackTopicMessageRoute({ + locale: 'en', + topicRole: 'payments', + messageText: 'some message', + isExplicitMention: false, + isReplyToBot: false, + activeWorkflow: 'payment_clarification' + }) + expect(route.route).toBe('payment_followup') + expect(route.helperKind).toBe('payment') }) - test('keeps purchase followups for approximate clarification answers', async () => { - const router = createOpenAiTopicMessageRouter('test-key', 'gpt-5-mini', 20_000) - expect(router).toBeDefined() + test('returns payment_followup for active payment confirmation workflow', () => { + const route = fallbackTopicMessageRoute({ + locale: 'en', + topicRole: 'payments', + messageText: 'some message', + isExplicitMention: false, + isReplyToBot: false, + activeWorkflow: 'payment_confirmation' + }) + expect(route.route).toBe('payment_followup') + expect(route.helperKind).toBe('payment') + }) - const originalFetch = globalThis.fetch - globalThis.fetch = (async () => - successfulResponse({ - output_text: JSON.stringify({ - route: 'purchase_followup', - replyText: null, - helperKind: 'purchase', - shouldStartTyping: true, - shouldClearWorkflow: false, - confidence: 86, - reason: 'llm_purchase_followup' - }) - })) as unknown as typeof fetch + test('returns topic_helper for strong reference', () => { + const route = fallbackTopicMessageRoute({ + locale: 'en', + topicRole: 'generic', + messageText: 'some message', + isExplicitMention: false, + isReplyToBot: false, + activeWorkflow: null, + engagementAssessment: { + engaged: true, + reason: 'strong_reference', + strongReference: true, + weakSessionActive: false, + hasOpenBotQuestion: false + } + }) + expect(route.route).toBe('topic_helper') + expect(route.helperKind).toBe('assistant') + }) - try { - const route = await router!({ - locale: 'ru', - topicRole: 'purchase', - messageText: 'примерно 20 лари', - isExplicitMention: false, - isReplyToBot: true, - activeWorkflow: 'purchase_clarification' - }) + test('returns topic_helper for weak session', () => { + const route = fallbackTopicMessageRoute({ + locale: 'en', + topicRole: 'generic', + messageText: 'some message', + isExplicitMention: false, + isReplyToBot: false, + activeWorkflow: null, + engagementAssessment: { + engaged: true, + reason: 'weak_session', + strongReference: false, + weakSessionActive: true, + hasOpenBotQuestion: false + } + }) + expect(route.route).toBe('topic_helper') + expect(route.helperKind).toBe('assistant') + }) - expect(route).toMatchObject({ - route: 'purchase_followup', - helperKind: 'purchase', - shouldStartTyping: true, - shouldClearWorkflow: false, - reason: 'llm_purchase_followup' - }) - } finally { - globalThis.fetch = originalFetch - } + test('returns topic_helper for explicit mention', () => { + const route = fallbackTopicMessageRoute({ + locale: 'en', + topicRole: 'generic', + messageText: 'some message', + isExplicitMention: true, + isReplyToBot: false, + activeWorkflow: null + }) + expect(route.route).toBe('topic_helper') + expect(route.helperKind).toBe('assistant') + }) + + test('returns topic_helper for reply to bot', () => { + const route = fallbackTopicMessageRoute({ + locale: 'en', + topicRole: 'generic', + messageText: 'some message', + isExplicitMention: false, + isReplyToBot: true, + activeWorkflow: null + }) + expect(route.route).toBe('topic_helper') + expect(route.helperKind).toBe('assistant') + }) + + test('returns silent by default', () => { + const route = fallbackTopicMessageRoute({ + locale: 'en', + topicRole: 'generic', + messageText: 'some message', + isExplicitMention: false, + isReplyToBot: false, + activeWorkflow: null + }) + expect(route.route).toBe('silent') + expect(route.reason).toBe('quiet_default') }) }) diff --git a/apps/bot/src/topic-message-router.ts b/apps/bot/src/topic-message-router.ts index 5773440..69a0713 100644 --- a/apps/bot/src/topic-message-router.ts +++ b/apps/bot/src/topic-message-router.ts @@ -1,7 +1,5 @@ import type { Context } from 'grammy' -import { extractOpenAiResponseText, parseJsonFromResponseText } from './openai-responses' - export type TopicMessageRole = 'generic' | 'purchase' | 'payments' | 'reminders' | 'feedback' export type TopicWorkflowState = | 'purchase_clarification' @@ -79,35 +77,6 @@ type ContextWithTopicMessageRouteCache = Context & { [topicMessageRouteCacheKey]?: TopicMessageRouteCacheEntry } -function normalizeRoute(value: string): TopicMessageRoute { - return value === 'chat_reply' || - value === 'purchase_candidate' || - value === 'purchase_followup' || - value === 'payment_candidate' || - value === 'payment_followup' || - value === 'topic_helper' || - value === 'dismiss_workflow' - ? value - : 'silent' -} - -function normalizeHelperKind(value: string | null): TopicMessageRoutingResult['helperKind'] { - return value === 'assistant' || - value === 'purchase' || - value === 'payment' || - value === 'reminder' - ? value - : null -} - -function normalizeConfidence(value: number | null | undefined): number { - if (typeof value !== 'number' || Number.isNaN(value)) { - return 0 - } - - return Math.max(0, Math.min(100, Math.round(value))) -} - export function fallbackTopicMessageRoute( input: TopicMessageRoutingInput ): TopicMessageRoutingResult { @@ -194,43 +163,6 @@ export function fallbackTopicMessageRoute( } } -function buildRecentTurns(input: TopicMessageRoutingInput): string | null { - const recentTurns = input.recentTurns - ?.slice(-4) - .map((turn) => `${turn.role}: ${turn.text.trim()}`) - .filter((line) => line.length > 0) - - return recentTurns && recentTurns.length > 0 - ? ['Recent conversation with this user in the household chat:', ...recentTurns].join('\n') - : null -} - -function buildRecentThreadMessages(input: TopicMessageRoutingInput): string | null { - const recentMessages = input.recentThreadMessages - ?.slice(-8) - .map((message) => `${message.speaker} (${message.role}): ${message.text.trim()}`) - .filter((line) => line.length > 0) - - return recentMessages && recentMessages.length > 0 - ? ['Recent messages in this topic thread:', ...recentMessages].join('\n') - : null -} - -function buildRecentChatMessages(input: TopicMessageRoutingInput): string | null { - const recentMessages = input.recentChatMessages - ?.slice(-12) - .map((message) => - message.threadId - ? `[thread ${message.threadId}] ${message.speaker} (${message.role}): ${message.text.trim()}` - : `${message.speaker} (${message.role}): ${message.text.trim()}` - ) - .filter((line) => line.length > 0) - - return recentMessages && recentMessages.length > 0 - ? ['Recent related chat messages:', ...recentMessages].join('\n') - : null -} - export function cacheTopicMessageRoute( ctx: Context, topicRole: CachedTopicMessageRole, @@ -249,201 +181,3 @@ export function getCachedTopicMessageRoute( const cached = (ctx as ContextWithTopicMessageRouteCache)[topicMessageRouteCacheKey] return cached?.topicRole === topicRole ? cached.route : null } - -export function createOpenAiTopicMessageRouter( - apiKey: string | undefined, - model: string, - timeoutMs: number -): TopicMessageRouter | undefined { - if (!apiKey) { - return undefined - } - - return async (input) => { - const abortController = new AbortController() - const timeout = setTimeout(() => abortController.abort(), timeoutMs) - - try { - const response = await fetch('https://api.openai.com/v1/responses', { - method: 'POST', - signal: abortController.signal, - headers: { - authorization: `Bearer ${apiKey}`, - 'content-type': 'application/json' - }, - body: JSON.stringify({ - model, - input: [ - { - role: 'system', - content: [ - 'You are a first-pass router for a household Telegram bot in a group chat topic.', - 'Your job is to decide whether the bot should stay silent, send a short playful reply, continue a workflow, or invoke a heavier helper.', - 'When engaged=yes OR explicit_mention=yes OR reply_to_bot=yes, you MUST respond - never use silent route.', - 'Decide from context whether the user is actually addressing the bot, talking about the bot, or talking to another person.', - 'Treat "stop", "leave me alone", "just thinking", "not a purchase", and similar messages as backoff or dismissal signals.', - 'For a bare summon like "bot?", "pss bot", or "ты тут?", prefer a brief acknowledgment with chat_reply.', - 'When the user directly addresses the bot with small talk, joking, or testing, prefer chat_reply with one short sentence.', - 'Do not repeatedly end casual replies with "how can I help?" unless the user is clearly asking for assistance.', - 'Use topic_helper only when the message is a real question or request that likely needs household knowledge or a topic-specific helper.', - 'Use the recent conversation when writing replyText. Do not ignore the already-established subject.', - 'The recent thread messages are more important than the per-user memory summary.', - 'If the user asks what you think about a price or quantity, mention the actual item/price from context when possible.', - 'Set shouldStartTyping to true only if the chosen route will likely trigger a slower helper or assistant call.', - '=== PURCHASE TOPIC RULES ===', - 'Classify as purchase_candidate when ALL of:', - '- Contains completed purchase verb (купил, bought, ordered, picked up, spent, взял, заказал, потратил)', - '- Contains realistic household item (food, groceries, household goods, toiletries, medicine, transport, cafe, restaurant)', - '- Contains amount that is realistic for household purchase (under 500 GEL/USD/EUR)', - '- NOT a fantastical/impossible item', - 'Gifts for household members ARE shared purchases - classify as purchase_candidate.', - 'Classify as chat_reply (NOT silent) with playful response when:', - '- Item is fantastical (car, plane, rocket, island, castle, yacht, apartment renovation >1000)', - '- Amount is excessively large (>500 GEL/USD/EUR)', - '- User explicitly says it is a joke, gift for non-household member, or personal expense', - 'Examples of purchase_candidate: "купил бананов 10 лари", "bought groceries 50 gel", "взял такси 15 лари", "купил Диме игрушку 20 лари"', - 'Examples of chat_reply: "купил машину", "купил квартиру", "купил самолет" (respond playfully: "Ого, записывай сам!" or similar)', - 'Use purchase_followup only when there is active purchase clarification and the latest message looks like a real answer to it.', - '=== PAYMENT TOPIC RULES ===', - 'Classify as payment_candidate when:', - '- Contains payment verb (оплатил, paid, заплатил) + rent/utilities/bills', - '- Amount is realistic (<500)', - 'Classify as chat_reply with playful response for fantastical amounts (>500).', - 'Use payment_followup only when there is active payment clarification/confirmation and the latest message looks like a real answer to it.', - '=== GENERAL ===', - 'For absurd or playful messages, be light and short with chat_reply. Never loop or interrogate.', - input.assistantTone ? `Use this tone lightly: ${input.assistantTone}.` : null, - input.assistantContext - ? `Household flavor context: ${input.assistantContext}` - : null, - 'Return only JSON matching the schema.' - ] - .filter(Boolean) - .join(' ') - }, - { - role: 'user', - content: [ - `User locale: ${input.locale}`, - `Topic role: ${input.topicRole}`, - `Explicit mention: ${input.isExplicitMention ? 'yes' : 'no'}`, - `Reply to bot: ${input.isReplyToBot ? 'yes' : 'no'}`, - `Active workflow: ${input.activeWorkflow ?? 'none'}`, - input.engagementAssessment - ? `Engagement assessment: engaged=${input.engagementAssessment.engaged ? 'yes' : 'no'}; reason=${input.engagementAssessment.reason}; strong_reference=${input.engagementAssessment.strongReference ? 'yes' : 'no'}; weak_session=${input.engagementAssessment.weakSessionActive ? 'yes' : 'no'}; open_bot_question=${input.engagementAssessment.hasOpenBotQuestion ? 'yes' : 'no'}` - : null, - buildRecentThreadMessages(input), - buildRecentChatMessages(input), - buildRecentTurns(input), - `Latest message:\n${input.messageText}` - ] - .filter(Boolean) - .join('\n\n') - } - ], - text: { - format: { - type: 'json_schema', - name: 'topic_message_route', - schema: { - type: 'object', - additionalProperties: false, - properties: { - route: { - type: 'string', - enum: [ - 'silent', - 'chat_reply', - 'purchase_candidate', - 'purchase_followup', - 'payment_candidate', - 'payment_followup', - 'topic_helper', - 'dismiss_workflow' - ] - }, - replyText: { - anyOf: [{ type: 'string' }, { type: 'null' }] - }, - helperKind: { - anyOf: [ - { - type: 'string', - enum: ['assistant', 'purchase', 'payment', 'reminder'] - }, - { type: 'null' } - ] - }, - shouldStartTyping: { - type: 'boolean' - }, - shouldClearWorkflow: { - type: 'boolean' - }, - confidence: { - type: 'number', - minimum: 0, - maximum: 100 - }, - reason: { - anyOf: [{ type: 'string' }, { type: 'null' }] - } - }, - required: [ - 'route', - 'replyText', - 'helperKind', - 'shouldStartTyping', - 'shouldClearWorkflow', - 'confidence', - 'reason' - ] - } - } - } - }) - }) - - if (!response.ok) { - return fallbackTopicMessageRoute(input) - } - - const payload = (await response.json()) as Record - const text = extractOpenAiResponseText(payload) - const parsed = parseJsonFromResponseText(text ?? '') - - if (!parsed || typeof parsed !== 'object' || Array.isArray(parsed)) { - return fallbackTopicMessageRoute(input) - } - - const parsedObject = parsed as Record - - const route = normalizeRoute( - typeof parsedObject.route === 'string' ? parsedObject.route : 'silent' - ) - const replyText = - typeof parsedObject.replyText === 'string' && parsedObject.replyText.trim().length > 0 - ? parsedObject.replyText.trim() - : null - - return { - route, - replyText, - helperKind: - typeof parsedObject.helperKind === 'string' || parsedObject.helperKind === null - ? normalizeHelperKind(parsedObject.helperKind) - : null, - shouldStartTyping: parsedObject.shouldStartTyping === true, - shouldClearWorkflow: parsedObject.shouldClearWorkflow === true, - confidence: normalizeConfidence( - typeof parsedObject.confidence === 'number' ? parsedObject.confidence : null - ), - reason: typeof parsedObject.reason === 'string' ? parsedObject.reason : null - } - } catch { - return fallbackTopicMessageRoute(input) - } finally { - clearTimeout(timeout) - } - } -} diff --git a/apps/bot/src/topic-processor.ts b/apps/bot/src/topic-processor.ts new file mode 100644 index 0000000..d3b491c --- /dev/null +++ b/apps/bot/src/topic-processor.ts @@ -0,0 +1,533 @@ +import { extractOpenAiResponseText, parseJsonFromResponseText } from './openai-responses' +import type { TopicWorkflowState } from './topic-message-router' +import type { EngagementAssessment } from './conversation-orchestrator' + +export type TopicProcessorRoute = + | 'silent' + | 'chat_reply' + | 'purchase' + | 'purchase_clarification' + | 'payment' + | 'payment_clarification' + | 'topic_helper' + | 'dismiss_workflow' + +export interface TopicProcessorPurchaseResult { + route: 'purchase' + amountMinor: string + currency: 'GEL' | 'USD' + itemDescription: string + amountSource: 'explicit' | 'calculated' + calculationExplanation: string | null + participantMemberIds: string[] | null + confidence: number + reason: string +} + +export interface TopicProcessorPaymentResult { + route: 'payment' + kind: 'rent' | 'utilities' + amountMinor: string + currency: 'GEL' | 'USD' + confidence: number + reason: string +} + +export interface TopicProcessorChatReplyResult { + route: 'chat_reply' + replyText: string + reason: string +} + +export interface TopicProcessorSilentResult { + route: 'silent' + reason: string +} + +export interface TopicProcessorClarificationResult { + route: 'purchase_clarification' | 'payment_clarification' + clarificationQuestion: string + reason: string +} + +export interface TopicProcessorTopicHelperResult { + route: 'topic_helper' + reason: string +} + +export interface TopicProcessorDismissWorkflowResult { + route: 'dismiss_workflow' + replyText: string | null + reason: string +} + +export type TopicProcessorResult = + | TopicProcessorSilentResult + | TopicProcessorChatReplyResult + | TopicProcessorPurchaseResult + | TopicProcessorClarificationResult + | TopicProcessorPaymentResult + | TopicProcessorTopicHelperResult + | TopicProcessorDismissWorkflowResult + +export interface TopicProcessorHouseholdMember { + memberId: string + displayName: string + status: 'active' | 'away' | 'left' +} + +export interface TopicProcessorMessage { + role: 'user' | 'assistant' + speaker: string + text: string +} + +export interface TopicProcessorTurn { + role: 'user' | 'assistant' + text: string +} + +export interface TopicProcessorInput { + locale: 'en' | 'ru' + topicRole: 'purchase' | 'payments' | 'generic' + messageText: string + isExplicitMention: boolean + isReplyToBot: boolean + activeWorkflow: TopicWorkflowState + defaultCurrency: 'GEL' | 'USD' + householdContext: string | null + assistantTone: string | null + householdMembers: readonly TopicProcessorHouseholdMember[] + senderMemberId: string | null + recentThreadMessages: readonly TopicProcessorMessage[] + recentChatMessages: readonly TopicProcessorMessage[] + recentTurns: readonly TopicProcessorTurn[] + engagementAssessment: EngagementAssessment +} + +export type TopicProcessor = (input: TopicProcessorInput) => Promise + +export function asOptionalBigInt(value: string | null): bigint | null { + if (value === null || !/^[0-9]+$/.test(value)) { + return null + } + + const parsed = BigInt(value) + return parsed > 0n ? parsed : null +} + +export function normalizeCurrency(value: string | null): 'GEL' | 'USD' | null { + return value === 'GEL' || value === 'USD' ? value : null +} + +export function normalizeConfidence(value: number): number { + const scaled = value >= 0 && value <= 1 ? value * 100 : value + return Math.max(0, Math.min(100, Math.round(scaled))) +} + +export function normalizeParticipantMemberIds( + value: readonly string[] | null | undefined, + householdMembers: readonly TopicProcessorHouseholdMember[] +): readonly string[] | null { + if (!value || value.length === 0) { + return null + } + + const allowedMemberIds = new Set(householdMembers.map((member) => member.memberId)) + const normalized = value + .map((memberId) => memberId.trim()) + .filter((memberId) => memberId.length > 0) + .filter((memberId, index, all) => all.indexOf(memberId) === index) + .filter((memberId) => allowedMemberIds.has(memberId)) + + return normalized.length > 0 ? normalized : null +} + +function normalizeRoute(value: string): TopicProcessorRoute { + switch (value) { + case 'silent': + case 'chat_reply': + case 'purchase': + case 'purchase_clarification': + case 'payment': + case 'payment_clarification': + case 'topic_helper': + case 'dismiss_workflow': + return value + default: + return 'silent' + } +} + +interface OpenAiStructuredResult { + route: TopicProcessorRoute + replyText?: string | null + clarificationQuestion?: string | null + amountMinor?: string | null + currency?: 'GEL' | 'USD' | null + itemDescription?: string | null + amountSource?: 'explicit' | 'calculated' | null + calculationExplanation?: string | null + participantMemberIds?: string[] | null + kind?: 'rent' | 'utilities' | null + confidence?: number + reason?: string | null +} + +function buildContextSection(input: TopicProcessorInput): string { + const parts: string[] = [] + + parts.push(`User locale: ${input.locale}`) + parts.push(`Topic role: ${input.topicRole}`) + parts.push(`Default currency: ${input.defaultCurrency}`) + parts.push(`Explicit mention: ${input.isExplicitMention ? 'yes' : 'no'}`) + parts.push(`Reply to bot: ${input.isReplyToBot ? 'yes' : 'no'}`) + parts.push(`Active workflow: ${input.activeWorkflow ?? 'none'}`) + parts.push( + `Engagement: engaged=${input.engagementAssessment.engaged ? 'yes' : 'no'}; reason=${input.engagementAssessment.reason}` + ) + + if (input.householdContext) { + parts.push(`Household context: ${input.householdContext}`) + } + + if (input.householdMembers.length > 0) { + parts.push( + 'Household members:\n' + + input.householdMembers + .map( + (m) => + `- ${m.memberId}: ${m.displayName} (status=${m.status}${m.memberId === input.senderMemberId ? ', sender=yes' : ''})` + ) + .join('\n') + ) + } + + return parts.join('\n') +} + +function buildRecentMessagesSection(input: TopicProcessorInput): string | null { + const parts: string[] = [] + + if (input.recentThreadMessages.length > 0) { + parts.push( + 'Recent messages in this thread:\n' + + input.recentThreadMessages + .slice(-8) + .map((m) => `${m.speaker} (${m.role}): ${m.text}`) + .join('\n') + ) + } + + if (input.recentChatMessages.length > 0) { + parts.push( + 'Recent chat messages:\n' + + input.recentChatMessages + .slice(-6) + .map((m) => `${m.speaker} (${m.role}): ${m.text}`) + .join('\n') + ) + } + + if (input.recentTurns.length > 0) { + parts.push( + 'Recent conversation with this user:\n' + + input.recentTurns + .slice(-4) + .map((t) => `${t.role}: ${t.text}`) + .join('\n') + ) + } + + return parts.length > 0 ? parts.join('\n\n') : null +} + +export function createTopicProcessor( + apiKey: string | undefined, + model: string, + timeoutMs: number +): TopicProcessor | undefined { + if (!apiKey) { + return undefined + } + + return async (input) => { + const abortController = new AbortController() + const timeout = setTimeout(() => abortController.abort(), timeoutMs) + + try { + const contextSection = buildContextSection(input) + const messagesSection = buildRecentMessagesSection(input) + + const response = await fetch('https://api.openai.com/v1/responses', { + method: 'POST', + signal: abortController.signal, + headers: { + authorization: `Bearer ${apiKey}`, + 'content-type': 'application/json' + }, + body: JSON.stringify({ + model, + input: [ + { + type: 'message', + role: 'system', + content: `You are the brain of Kojori, a household Telegram bot. You process every message in a topic and decide the right action. + +=== WHEN TO STAY SILENT === +- Default to silent in group topics unless one of the following is true: + - The message reports a completed purchase or payment (your primary purpose in these topics) + - The user addresses the bot (by @mention, reply to bot, or text reference in ANY language — бот, bot, kojori, кожори, or any recognizable variant) + - There is an active clarification/confirmation workflow for this user + - The user is clearly engaged with the bot (recent bot interaction, strong context reference) +- Regular chat between users (plans, greetings, discussion) → silent + +=== PURCHASE TOPIC (topicRole=purchase) === +Purchase detection is CONTENT-BASED — engagement signals are irrelevant for this decision. +If the message reports a completed purchase (past-tense buy verb + realistic item + amount), classify as "purchase" REGARDLESS of mention/engagement. +- Completed buy verbs: купил, bought, ordered, picked up, spent, взял, заказал, потратил, сходил взял, etc. +- Realistic household items: food, groceries, household goods, toiletries, medicine, transport, cafe, restaurant +- Amount under 500 currency units for household purchases +- Gifts for household members ARE shared purchases +- Plans, wishes, future intent → silent (NOT purchases) +- Fantastical items (car, plane, island) or excessive amounts (>500) → chat_reply with playful response + +When classifying as "purchase": +- amountMinor in minor currency units (350 GEL → 35000, 3.50 → 350) +- Compute totals from quantity × price when needed, set amountSource="calculated" +- If user names specific household members as participants, return their memberIds +- Use clarification when amount, item, or intent is unclear but purchase seems likely + +=== PAYMENT TOPIC (topicRole=payments) === +If the message reports a completed rent or utility payment (payment verb + rent/utilities + amount), classify as "payment". +- Payment verbs: оплатил, paid, заплатил, перевёл, кинул, отправил +- Realistic amount for rent/utilities + +=== CHAT REPLIES === +CRITICAL: chat_reply replyText must NEVER claim a purchase or payment was saved, recorded, confirmed, or logged. The chat_reply route does NOT save anything. Only "purchase" and "payment" routes process real data. + +=== BOT ADDRESSING === +When the user addresses the bot (by any means), you MUST respond — never silent. +For bare summons ("бот?", "bot", "@kojori_bot"), use topic_helper to let the assistant greet. +For small talk or jokes directed at the bot, use chat_reply with a short playful response. +For questions that need household knowledge, use topic_helper. + +=== WORKFLOWS === +If there is an active clarification workflow and the user's message answers it, combine with context. +If user dismisses ("не, забей", "cancel"), use dismiss_workflow.` + }, + { + type: 'message', + role: 'user', + content: [contextSection, messagesSection, `Latest message:\n${input.messageText}`] + .filter(Boolean) + .join('\n\n') + } + ], + text: { + format: { + type: 'json_schema', + name: 'topic_processor_result', + schema: { + type: 'object', + additionalProperties: false, + properties: { + route: { + type: 'string', + enum: [ + 'silent', + 'chat_reply', + 'purchase', + 'purchase_clarification', + 'payment', + 'payment_clarification', + 'topic_helper', + 'dismiss_workflow' + ] + }, + replyText: { + anyOf: [{ type: 'string' }, { type: 'null' }] + }, + clarificationQuestion: { + anyOf: [{ type: 'string' }, { type: 'null' }] + }, + amountMinor: { + anyOf: [{ type: 'string' }, { type: 'null' }] + }, + currency: { + anyOf: [{ type: 'string', enum: ['GEL', 'USD'] }, { type: 'null' }] + }, + itemDescription: { + anyOf: [{ type: 'string' }, { type: 'null' }] + }, + amountSource: { + anyOf: [{ type: 'string', enum: ['explicit', 'calculated'] }, { type: 'null' }] + }, + calculationExplanation: { + anyOf: [{ type: 'string' }, { type: 'null' }] + }, + participantMemberIds: { + anyOf: [{ type: 'array', items: { type: 'string' } }, { type: 'null' }] + }, + kind: { + anyOf: [{ type: 'string', enum: ['rent', 'utilities'] }, { type: 'null' }] + }, + confidence: { + type: 'number', + minimum: 0, + maximum: 100 + }, + reason: { + anyOf: [{ type: 'string' }, { type: 'null' }] + } + }, + required: ['route', 'confidence', 'reason'] + } + } + } + }) + }) + + if (!response.ok) { + return null + } + + const payload = (await response.json()) as Record + const text = extractOpenAiResponseText(payload) + const parsed = parseJsonFromResponseText(text ?? '') + + if (!parsed || typeof parsed !== 'object' || Array.isArray(parsed)) { + return null + } + + const route = normalizeRoute(typeof parsed.route === 'string' ? parsed.route : 'silent') + const confidence = normalizeConfidence( + typeof parsed.confidence === 'number' ? parsed.confidence : 0 + ) + const reason = typeof parsed.reason === 'string' ? parsed.reason : 'unknown' + + switch (route) { + case 'silent': + return { route, reason } + + case 'chat_reply': { + const replyText = + typeof parsed.replyText === 'string' && parsed.replyText.trim().length > 0 + ? parsed.replyText.trim() + : null + if (!replyText) { + return { route: 'silent', reason: 'empty_chat_reply' } + } + return { route, replyText, reason } + } + + case 'purchase': { + const amountMinor = asOptionalBigInt(parsed.amountMinor ?? null) + const currency = normalizeCurrency(parsed.currency ?? null) + const itemDescription = + typeof parsed.itemDescription === 'string' && parsed.itemDescription.trim().length > 0 + ? parsed.itemDescription.trim() + : null + + if (!amountMinor || !currency || !itemDescription) { + return { + route: 'purchase_clarification', + clarificationQuestion: 'Could you clarify the purchase details?', + reason: 'missing_required_fields' + } + } + + const participantMemberIds = normalizeParticipantMemberIds( + parsed.participantMemberIds, + input.householdMembers + ) + + return { + route, + amountMinor: amountMinor.toString(), + currency, + itemDescription, + amountSource: parsed.amountSource === 'calculated' ? 'calculated' : 'explicit', + calculationExplanation: + typeof parsed.calculationExplanation === 'string' && + parsed.calculationExplanation.trim().length > 0 + ? parsed.calculationExplanation.trim() + : null, + participantMemberIds: participantMemberIds ? [...participantMemberIds] : null, + confidence, + reason + } + } + + case 'purchase_clarification': + case 'payment_clarification': { + const clarificationQuestion = + typeof parsed.clarificationQuestion === 'string' && + parsed.clarificationQuestion.trim().length > 0 + ? parsed.clarificationQuestion.trim() + : 'Could you clarify?' + return { route, clarificationQuestion, reason } + } + + case 'payment': { + const amountMinor = asOptionalBigInt(parsed.amountMinor ?? null) + const currency = normalizeCurrency(parsed.currency ?? null) + const kind = parsed.kind === 'rent' || parsed.kind === 'utilities' ? parsed.kind : null + + if (!amountMinor || !currency || !kind) { + return { + route: 'payment_clarification', + clarificationQuestion: 'Could you clarify the payment details?', + reason: 'missing_required_fields' + } + } + + return { + route, + kind, + amountMinor: amountMinor.toString(), + currency, + confidence, + reason + } + } + + case 'topic_helper': + return { route, reason } + + case 'dismiss_workflow': { + const replyText = + typeof parsed.replyText === 'string' && parsed.replyText.trim().length > 0 + ? parsed.replyText.trim() + : null + return { route, replyText, reason } + } + + default: + return { route: 'silent', reason: 'unknown_route' } + } + } catch { + return null + } finally { + clearTimeout(timeout) + } + } +} + +export function botSleepsMessage(locale: 'en' | 'ru' | string): string { + const enMessages = [ + '😴 Kojori is taking a quick nap... try again in a moment!', + '💤 The bot is recharging its circuits... be right back!', + '🌙 Kojori went to grab some digital coffee...', + '⚡ Power nap in progress... zzz...' + ] + const ruMessages = [ + '😴 Кожори немного вздремнул... попробуйте ещё раз через минутку!', + '💤 Бот подзаряжает свои схемы... скоро вернётся!', + '🌙 Кожори сбегал за цифровым кофе...', + '⚡ Идёт подзарядка... zzz...' + ] + + const messages = locale === 'ru' ? ruMessages : enMessages + return messages[Math.floor(Math.random() * messages.length)]! +} diff --git a/docs/runbooks/iac-terraform.md b/docs/runbooks/iac-terraform.md index 542fe76..ac15600 100644 --- a/docs/runbooks/iac-terraform.md +++ b/docs/runbooks/iac-terraform.md @@ -57,7 +57,7 @@ Keep bot runtime config that is not secret in your `*.tfvars` file: - `bot_mini_app_allowed_origins` - optional `bot_purchase_parser_model` - optional `bot_assistant_model` -- optional `bot_assistant_router_model` +- optional `bot_topic_processor_model` Set `bot_mini_app_allowed_origins` to the exact mini app origins you expect in each environment. Do not rely on permissive origin reflection in production.