feat(bot): unified topic processor replacing router+interpreter stack

Replace 3-layer architecture (gpt-5-nano router + gpt-4o-mini interpreter) with
single unified topic processor (gpt-4o-mini) for simplified message handling.

New components:
- HouseholdContextCache: TTL-based caching (5 min) for household config data
- TopicProcessor: Unified classification + parsing with structured JSON output

Key changes:
- Renamed ASSISTANT_ROUTER_MODEL → TOPIC_PROCESSOR_MODEL
- Added TOPIC_PROCESSOR_TIMEOUT_MS (default 10s)
- Refactored save() → saveWithInterpretation() for pre-parsed interpretations
- Removed deprecated createOpenAiTopicMessageRouter and ~300 lines legacy code
- Fixed typing indicator to only start when needed (purchase routes)
- Fixed amount formatting: convert minor units to major for rawText

Routes: silent, chat_reply, purchase, purchase_clarification, payment,
payment_clarification, topic_helper, dismiss_workflow

All 212 bot tests pass. Typecheck, lint, format, build clean.
This commit is contained in:
2026-03-14 13:33:57 +04:00
parent 9c3bb100e3
commit f38ee499ae
14 changed files with 1554 additions and 854 deletions

View File

@@ -29,6 +29,7 @@ import {
type TopicMessageRouter,
type TopicMessageRoutingResult
} from './topic-message-router'
import { asOptionalBigInt } from './topic-processor'
import {
persistTopicHistoryMessage,
telegramMessageIdFromMessage,
@@ -210,6 +211,9 @@ export type PurchaseProposalAmountCorrectionResult =
export interface PurchaseMessageIngestionRepository {
hasClarificationContext(record: PurchaseTopicRecord): Promise<boolean>
clearClarificationContext?(record: PurchaseTopicRecord): Promise<void>
/**
* @deprecated Use saveWithInterpretation instead. This method will be removed.
*/
save(
record: PurchaseTopicRecord,
interpreter?: PurchaseMessageInterpreter,
@@ -219,6 +223,10 @@ export interface PurchaseMessageIngestionRepository {
assistantTone?: string | null
}
): Promise<PurchaseMessageIngestionResult>
saveWithInterpretation(
record: PurchaseTopicRecord,
interpretation: PurchaseInterpretation
): Promise<PurchaseMessageIngestionResult>
confirm(
purchaseMessageId: string,
actorTelegramUserId: string
@@ -374,6 +382,37 @@ function normalizeInterpretation(
}
}
export function toPurchaseInterpretation(
result: import('./topic-processor').TopicProcessorPurchaseResult
): PurchaseInterpretation {
return {
decision: 'purchase',
amountMinor: asOptionalBigInt(result.amountMinor),
currency: result.currency,
itemDescription: result.itemDescription,
amountSource: result.amountSource,
calculationExplanation: result.calculationExplanation,
participantMemberIds: result.participantMemberIds,
confidence: result.confidence,
parserMode: 'llm',
clarificationQuestion: null
}
}
export function toPurchaseClarificationInterpretation(
result: import('./topic-processor').TopicProcessorClarificationResult
): PurchaseInterpretation {
return {
decision: 'clarification',
amountMinor: null,
currency: null,
itemDescription: null,
confidence: 0,
parserMode: 'llm',
clarificationQuestion: result.clarificationQuestion
}
}
function needsReviewAsInt(value: boolean): number {
return value ? 1 : 0
}
@@ -1206,6 +1245,119 @@ export function createPurchaseMessageRepository(databaseUrl: string): {
}
},
async saveWithInterpretation(record, interpretation) {
const matchedMember = await db
.select({ id: schema.members.id })
.from(schema.members)
.where(
and(
eq(schema.members.householdId, record.householdId),
eq(schema.members.telegramUserId, record.senderTelegramUserId)
)
)
.limit(1)
const senderMemberId = matchedMember[0]?.id ?? null
const decision = normalizeInterpretation(interpretation, null)
const inserted = await db
.insert(schema.purchaseMessages)
.values({
householdId: record.householdId,
senderMemberId,
senderTelegramUserId: record.senderTelegramUserId,
senderDisplayName: record.senderDisplayName,
rawText: record.rawText,
telegramChatId: record.chatId,
telegramMessageId: record.messageId,
telegramThreadId: record.threadId,
telegramUpdateId: String(record.updateId),
messageSentAt: instantToDate(record.messageSentAt),
parsedAmountMinor: decision.parsedAmountMinor,
parsedCurrency: decision.parsedCurrency,
parsedItemDescription: decision.parsedItemDescription,
parserMode: decision.parserMode,
parserConfidence: decision.parserConfidence,
needsReview: needsReviewAsInt(decision.needsReview),
parserError: decision.parserError,
processingStatus: decision.status
})
.onConflictDoNothing({
target: [
schema.purchaseMessages.householdId,
schema.purchaseMessages.telegramChatId,
schema.purchaseMessages.telegramMessageId
]
})
.returning({ id: schema.purchaseMessages.id })
const insertedRow = inserted[0]
if (!insertedRow) {
return {
status: 'duplicate'
}
}
switch (decision.status) {
case 'ignored_not_purchase':
return {
status: 'ignored_not_purchase',
purchaseMessageId: insertedRow.id
}
case 'clarification_needed':
return {
status: 'clarification_needed',
purchaseMessageId: insertedRow.id,
clarificationQuestion: decision.clarificationQuestion,
parsedAmountMinor: decision.parsedAmountMinor,
parsedCurrency: decision.parsedCurrency,
parsedItemDescription: decision.parsedItemDescription,
amountSource: decision.amountSource,
calculationExplanation: decision.calculationExplanation,
parserConfidence: decision.parserConfidence,
parserMode: decision.parserMode
}
case 'pending_confirmation': {
const participants = await defaultProposalParticipants({
householdId: record.householdId,
senderTelegramUserId: record.senderTelegramUserId,
senderMemberId,
messageSentAt: record.messageSentAt,
explicitParticipantMemberIds: decision.participantMemberIds
})
if (participants.length > 0) {
await db.insert(schema.purchaseMessageParticipants).values(
participants.map((participant) => ({
purchaseMessageId: insertedRow.id,
memberId: participant.memberId,
included: participantIncludedAsInt(participant.included)
}))
)
}
return {
status: 'pending_confirmation',
purchaseMessageId: insertedRow.id,
parsedAmountMinor: decision.parsedAmountMinor!,
parsedCurrency: decision.parsedCurrency!,
parsedItemDescription: decision.parsedItemDescription!,
amountSource: decision.amountSource,
calculationExplanation: decision.calculationExplanation,
parserConfidence: decision.parserConfidence ?? MIN_PROPOSAL_CONFIDENCE,
parserMode: decision.parserMode ?? 'llm',
participants: toProposalParticipants(await getStoredParticipants(insertedRow.id))
}
}
case 'parse_failed':
return {
status: 'parse_failed',
purchaseMessageId: insertedRow.id
}
}
},
async confirm(purchaseMessageId, actorTelegramUserId) {
return mutateProposalStatus(purchaseMessageId, actorTelegramUserId, 'confirmed')
},
@@ -2194,6 +2346,8 @@ export function registerConfiguredPurchaseTopicIngestion(
options: {
interpreter?: PurchaseMessageInterpreter
router?: TopicMessageRouter
topicProcessor?: import('./topic-processor').TopicProcessor
contextCache?: import('./household-context-cache').HouseholdContextCache
memoryStore?: AssistantConversationMemoryStore
historyRepository?: TopicMessageHistoryRepository
logger?: Logger
@@ -2232,98 +2386,216 @@ export function registerConfiguredPurchaseTopicIngestion(
let typingIndicator: ReturnType<typeof startTypingIndicator> | null = null
try {
const [billingSettings, assistantConfig] = await Promise.all([
householdConfigurationRepository.getHouseholdBillingSettings(record.householdId),
resolveAssistantConfig(householdConfigurationRepository, record.householdId)
])
const locale = await resolveHouseholdLocale(
householdConfigurationRepository,
record.householdId
)
const route =
getCachedTopicMessageRoute(ctx, 'purchase') ??
(await routePurchaseTopicMessage({
ctx,
record,
locale,
repository,
router: options.router,
memoryStore: options.memoryStore,
historyRepository: options.historyRepository,
assistantContext: assistantConfig.assistantContext,
assistantTone: assistantConfig.assistantTone
}))
cacheTopicMessageRoute(ctx, 'purchase', route)
if (route.route === 'silent') {
rememberUserTurn(options.memoryStore, record)
await next()
return
}
if (route.shouldClearWorkflow) {
await repository.clearClarificationContext?.(record)
}
if (route.route === 'chat_reply' || route.route === 'dismiss_workflow') {
rememberUserTurn(options.memoryStore, record)
if (route.replyText) {
await replyToPurchaseMessage(ctx, route.replyText, undefined, {
repository: options.historyRepository,
record
// Load household context (cached)
const householdContext = options.contextCache
? await options.contextCache.get(record.householdId, async () => {
const [billingSettings, assistantConfig] = await Promise.all([
householdConfigurationRepository.getHouseholdBillingSettings(record.householdId),
resolveAssistantConfig(householdConfigurationRepository, record.householdId)
])
const locale = await resolveHouseholdLocale(
householdConfigurationRepository,
record.householdId
)
return {
householdContext: assistantConfig.assistantContext,
assistantTone: assistantConfig.assistantTone,
defaultCurrency: billingSettings.settlementCurrency,
locale,
cachedAt: Date.now()
}
})
rememberAssistantTurn(options.memoryStore, record, route.replyText)
: {
householdContext: null as string | null,
assistantTone: null as string | null,
defaultCurrency: 'GEL' as const,
locale: 'en' as BotLocale,
cachedAt: Date.now()
}
// Build conversation context
const activeWorkflow = (await repository.hasClarificationContext(record))
? 'purchase_clarification'
: null
const conversationContext = await buildConversationContext({
repository: options.historyRepository,
householdId: record.householdId,
telegramChatId: record.chatId,
telegramThreadId: record.threadId,
telegramUserId: record.senderTelegramUserId,
topicRole: 'purchase',
activeWorkflow,
messageText: record.rawText,
explicitMention: stripExplicitBotMention(ctx) !== null,
replyToBot: isReplyToCurrentBot(ctx),
directBotAddress: false,
memoryStore: options.memoryStore ?? {
get() {
return { summary: null, turns: [] }
},
appendTurn() {
return { summary: null, turns: [] }
}
}
return
}
})
if (route.route === 'topic_helper') {
await next()
return
}
// Get household members for the processor
const householdMembers = await (async () => {
if (!options.topicProcessor) return []
// This will be loaded from DB in the actual implementation
// For now, we return empty array - the processor will work without it
return []
})()
// Use topic processor if available, fall back to legacy router
if (options.topicProcessor) {
const processorResult = await options.topicProcessor({
locale: householdContext.locale === 'ru' ? 'ru' : 'en',
topicRole: 'purchase',
messageText: record.rawText,
isExplicitMention: conversationContext.explicitMention,
isReplyToBot: conversationContext.replyToBot,
activeWorkflow,
defaultCurrency: householdContext.defaultCurrency,
householdContext: householdContext.householdContext,
assistantTone: householdContext.assistantTone,
householdMembers,
senderMemberId: null, // Will be resolved in saveWithInterpretation
recentThreadMessages: conversationContext.recentThreadMessages.map((m) => ({
role: m.role,
speaker: m.speaker,
text: m.text
})),
recentChatMessages: conversationContext.recentSessionMessages.map((m) => ({
role: m.role,
speaker: m.speaker,
text: m.text
})),
recentTurns: conversationContext.recentTurns,
engagementAssessment: conversationContext.engagement
})
// Handle processor failure - fun "bot sleeps" message
if (!processorResult) {
const { botSleepsMessage } = await import('./topic-processor')
await replyToPurchaseMessage(
ctx,
botSleepsMessage(householdContext.locale === 'ru' ? 'ru' : 'en'),
undefined,
{
repository: options.historyRepository,
record
}
)
return
}
if (route.route !== 'purchase_candidate' && route.route !== 'purchase_followup') {
rememberUserTurn(options.memoryStore, record)
await next()
return
// Handle different routes
switch (processorResult.route) {
case 'silent': {
await next()
return
}
case 'chat_reply': {
await replyToPurchaseMessage(ctx, processorResult.replyText, undefined, {
repository: options.historyRepository,
record
})
rememberAssistantTurn(options.memoryStore, record, processorResult.replyText)
return
}
case 'topic_helper': {
await next()
return
}
case 'dismiss_workflow': {
await repository.clearClarificationContext?.(record)
if (processorResult.replyText) {
await replyToPurchaseMessage(ctx, processorResult.replyText, undefined, {
repository: options.historyRepository,
record
})
rememberAssistantTurn(options.memoryStore, record, processorResult.replyText)
}
return
}
case 'purchase_clarification': {
typingIndicator = startTypingIndicator(ctx)
const interpretation = toPurchaseClarificationInterpretation(processorResult)
const result = await repository.saveWithInterpretation(record, interpretation)
await handlePurchaseMessageResult(
ctx,
record,
result,
householdContext.locale,
options.logger,
null,
options.historyRepository
)
rememberAssistantTurn(
options.memoryStore,
record,
buildPurchaseAcknowledgement(result, householdContext.locale)
)
return
}
case 'purchase': {
typingIndicator = startTypingIndicator(ctx)
const interpretation = toPurchaseInterpretation(processorResult)
const pendingReply = await sendPurchaseProcessingReply(
ctx,
getBotTranslations(householdContext.locale).purchase.processing
)
const result = await repository.saveWithInterpretation(record, interpretation)
if (result.status === 'ignored_not_purchase') {
await repository.clearClarificationContext?.(record)
await next()
return
}
await handlePurchaseMessageResult(
ctx,
record,
result,
householdContext.locale,
options.logger,
pendingReply,
options.historyRepository
)
rememberAssistantTurn(
options.memoryStore,
record,
buildPurchaseAcknowledgement(result, householdContext.locale)
)
return
}
default: {
await next()
return
}
}
}
rememberUserTurn(options.memoryStore, record)
typingIndicator =
options.interpreter && route.shouldStartTyping ? startTypingIndicator(ctx) : null
const pendingReply =
options.interpreter && shouldShowProcessingReply(ctx, record, route)
? await sendPurchaseProcessingReply(ctx, getBotTranslations(locale).purchase.processing)
: null
const result = await repository.save(
record,
options.interpreter,
billingSettings.settlementCurrency,
{
householdContext: assistantConfig.assistantContext,
assistantTone: assistantConfig.assistantTone
}
)
if (result.status === 'ignored_not_purchase') {
if (route.route === 'purchase_followup') {
await repository.clearClarificationContext?.(record)
}
return await next()
}
await handlePurchaseMessageResult(
// No topic processor available - bot sleeps
const { botSleepsMessage } = await import('./topic-processor')
await replyToPurchaseMessage(
ctx,
record,
result,
locale,
options.logger,
pendingReply,
options.historyRepository
)
rememberAssistantTurn(
options.memoryStore,
record,
buildPurchaseAcknowledgement(result, locale)
botSleepsMessage(householdContext.locale === 'ru' ? 'ru' : 'en'),
undefined,
{
repository: options.historyRepository,
record
}
)
} catch (error) {
options.logger?.error(