feat(bot): add shared topic router

This commit is contained in:
2026-03-12 17:12:26 +04:00
parent 014d791bdc
commit 8374d18189
18 changed files with 1692 additions and 292 deletions

View File

@@ -0,0 +1,180 @@
export interface AssistantConversationTurn {
role: 'user' | 'assistant'
text: string
}
interface AssistantConversationState {
summary: string | null
turns: AssistantConversationTurn[]
}
const MEMORY_SUMMARY_MAX_CHARS = 1200
export interface AssistantConversationMemoryStore {
get(key: string): AssistantConversationState
appendTurn(key: string, turn: AssistantConversationTurn): AssistantConversationState
}
export interface AssistantRateLimitResult {
allowed: boolean
retryAfterMs: number
}
export interface AssistantRateLimiter {
consume(key: string): AssistantRateLimitResult
}
export interface AssistantUsageSnapshot {
householdId: string
telegramUserId: string
displayName: string
requestCount: number
inputTokens: number
outputTokens: number
totalTokens: number
updatedAt: string
}
export interface AssistantUsageTracker {
record(input: {
householdId: string
telegramUserId: string
displayName: string
usage: {
inputTokens: number
outputTokens: number
totalTokens: number
}
}): void
listHouseholdUsage(householdId: string): readonly AssistantUsageSnapshot[]
}
function summarizeTurns(
summary: string | null,
turns: readonly AssistantConversationTurn[]
): string {
const next = [summary, ...turns.map((turn) => `${turn.role}: ${turn.text}`)]
.filter(Boolean)
.join('\n')
return next.length <= MEMORY_SUMMARY_MAX_CHARS
? next
: next.slice(next.length - MEMORY_SUMMARY_MAX_CHARS)
}
export function conversationMemoryKey(input: {
telegramUserId: string
telegramChatId: string
isPrivateChat: boolean
}): string {
return input.isPrivateChat
? input.telegramUserId
: `group:${input.telegramChatId}:${input.telegramUserId}`
}
export function createInMemoryAssistantConversationMemoryStore(
maxTurns: number
): AssistantConversationMemoryStore {
const memory = new Map<string, AssistantConversationState>()
return {
get(key) {
return memory.get(key) ?? { summary: null, turns: [] }
},
appendTurn(key, turn) {
const current = memory.get(key) ?? { summary: null, turns: [] }
const nextTurns = [...current.turns, turn]
if (nextTurns.length <= maxTurns) {
const nextState = {
summary: current.summary,
turns: nextTurns
}
memory.set(key, nextState)
return nextState
}
const overflowCount = nextTurns.length - maxTurns
const overflow = nextTurns.slice(0, overflowCount)
const retained = nextTurns.slice(overflowCount)
const nextState = {
summary: summarizeTurns(current.summary, overflow),
turns: retained
}
memory.set(key, nextState)
return nextState
}
}
}
export function createInMemoryAssistantRateLimiter(config: {
burstLimit: number
burstWindowMs: number
rollingLimit: number
rollingWindowMs: number
}): AssistantRateLimiter {
const timestamps = new Map<string, number[]>()
return {
consume(key) {
const now = Date.now()
const events = (timestamps.get(key) ?? []).filter(
(timestamp) => now - timestamp < config.rollingWindowMs
)
const burstEvents = events.filter((timestamp) => now - timestamp < config.burstWindowMs)
if (burstEvents.length >= config.burstLimit) {
const oldestBurstEvent = burstEvents[0] ?? now
return {
allowed: false,
retryAfterMs: Math.max(1, config.burstWindowMs - (now - oldestBurstEvent))
}
}
if (events.length >= config.rollingLimit) {
const oldestEvent = events[0] ?? now
return {
allowed: false,
retryAfterMs: Math.max(1, config.rollingWindowMs - (now - oldestEvent))
}
}
events.push(now)
timestamps.set(key, events)
return {
allowed: true,
retryAfterMs: 0
}
}
}
}
export function createInMemoryAssistantUsageTracker(): AssistantUsageTracker {
const usage = new Map<string, AssistantUsageSnapshot>()
return {
record(input) {
const key = `${input.householdId}:${input.telegramUserId}`
const current = usage.get(key)
usage.set(key, {
householdId: input.householdId,
telegramUserId: input.telegramUserId,
displayName: input.displayName,
requestCount: (current?.requestCount ?? 0) + 1,
inputTokens: (current?.inputTokens ?? 0) + input.usage.inputTokens,
outputTokens: (current?.outputTokens ?? 0) + input.usage.outputTokens,
totalTokens: (current?.totalTokens ?? 0) + input.usage.totalTokens,
updatedAt: new Date().toISOString()
})
},
listHouseholdUsage(householdId) {
return [...usage.values()]
.filter((entry) => entry.householdId === householdId)
.sort((left, right) => right.totalTokens - left.totalTokens)
}
}
}

View File

@@ -15,9 +15,9 @@ export interface BotRuntimeConfig {
schedulerOidcAllowedEmails: readonly string[]
reminderJobsEnabled: boolean
openaiApiKey?: string
parserModel: string
purchaseParserModel: string
assistantModel: string
assistantRouterModel: string
assistantTimeoutMs: number
assistantMemoryMaxTurns: number
assistantRateLimitBurst: number
@@ -127,10 +127,9 @@ export function getBotRuntimeConfig(env: NodeJS.ProcessEnv = process.env): BotRu
miniAppAuthEnabled,
schedulerOidcAllowedEmails,
reminderJobsEnabled,
parserModel: env.PARSER_MODEL?.trim() || 'gpt-4o-mini',
purchaseParserModel:
env.PURCHASE_PARSER_MODEL?.trim() || env.PARSER_MODEL?.trim() || 'gpt-4o-mini',
purchaseParserModel: env.PURCHASE_PARSER_MODEL?.trim() || 'gpt-4o-mini',
assistantModel: env.ASSISTANT_MODEL?.trim() || 'gpt-4o-mini',
assistantRouterModel: env.ASSISTANT_ROUTER_MODEL?.trim() || 'gpt-5-nano',
assistantTimeoutMs: parsePositiveInteger(
env.ASSISTANT_TIMEOUT_MS,
20_000,

View File

@@ -16,7 +16,10 @@ import {
createInMemoryAssistantUsageTracker,
registerDmAssistant
} from './dm-assistant'
import type { PurchaseMessageIngestionRepository } from './purchase-topic-ingestion'
import {
registerConfiguredPurchaseTopicIngestion,
type PurchaseMessageIngestionRepository
} from './purchase-topic-ingestion'
function createTestBot() {
const bot = createTelegramBot('000000:test-token')
@@ -264,6 +267,22 @@ function createHouseholdRepository(): HouseholdConfigurationRepository {
}
}
function createBoundHouseholdRepository(
role: 'purchase' | 'payments'
): HouseholdConfigurationRepository {
const repository = createHouseholdRepository()
return {
...repository,
findHouseholdTopicByTelegramContext: async () => ({
householdId: 'household-1',
role,
telegramThreadId: '777',
topicName: role === 'purchase' ? 'Purchases' : 'Payments'
})
}
}
function createFinanceService(): FinanceCommandService {
return {
getMemberByTelegramUserId: async () => ({
@@ -1243,6 +1262,186 @@ Confirm or cancel below.`,
})
})
test('uses the shared router for playful addressed topic replies without calling the full assistant', async () => {
const bot = createTestBot()
const calls: Array<{ method: string; payload: unknown }> = []
let assistantCalls = 0
bot.api.config.use(async (_prev, method, payload) => {
calls.push({ method, payload })
if (method === 'sendMessage') {
return {
ok: true,
result: {
message_id: calls.length,
date: Math.floor(Date.now() / 1000),
chat: {
id: -100123,
type: 'supergroup'
},
text: (payload as { text?: string }).text ?? 'ok'
}
} as never
}
return {
ok: true,
result: true
} as never
})
registerDmAssistant({
bot,
assistant: {
async respond() {
assistantCalls += 1
return {
text: 'Should not be called.',
usage: {
inputTokens: 10,
outputTokens: 2,
totalTokens: 12
}
}
}
},
topicRouter: async () => ({
route: 'chat_reply',
replyText: 'Тут. Если что-то реально купили, подключусь.',
helperKind: null,
shouldStartTyping: false,
shouldClearWorkflow: false,
confidence: 96,
reason: 'smalltalk'
}),
purchaseRepository: createPurchaseRepository(),
purchaseInterpreter: async () => null,
householdConfigurationRepository: createHouseholdRepository(),
promptRepository: createPromptRepository(),
financeServiceForHousehold: () => createFinanceService(),
memoryStore: createInMemoryAssistantConversationMemoryStore(12),
rateLimiter: createInMemoryAssistantRateLimiter({
burstLimit: 5,
burstWindowMs: 60_000,
rollingLimit: 50,
rollingWindowMs: 86_400_000
}),
usageTracker: createInMemoryAssistantUsageTracker()
})
await bot.handleUpdate(topicMentionUpdate('@household_test_bot А ты тут?') as never)
expect(assistantCalls).toBe(0)
expect(calls).toHaveLength(1)
expect(calls[0]).toMatchObject({
method: 'sendMessage',
payload: {
text: 'Тут. Если что-то реально купили, подключусь.'
}
})
})
test('reuses the purchase-topic route instead of calling the shared router twice', async () => {
const bot = createTestBot()
const calls: Array<{ method: string; payload: unknown }> = []
let assistantCalls = 0
let routerCalls = 0
const householdConfigurationRepository = createBoundHouseholdRepository('purchase')
const topicRouter = async () => {
routerCalls += 1
return {
route: 'topic_helper' as const,
replyText: null,
helperKind: 'assistant' as const,
shouldStartTyping: true,
shouldClearWorkflow: false,
confidence: 96,
reason: 'question'
}
}
bot.api.config.use(async (_prev, method, payload) => {
calls.push({ method, payload })
if (method === 'sendMessage') {
return {
ok: true,
result: {
message_id: calls.length,
date: Math.floor(Date.now() / 1000),
chat: {
id: -100123,
type: 'supergroup'
},
text: (payload as { text?: string }).text ?? 'ok'
}
} as never
}
return {
ok: true,
result: true
} as never
})
registerConfiguredPurchaseTopicIngestion(
bot,
householdConfigurationRepository,
createPurchaseRepository(),
{
router: topicRouter
}
)
registerDmAssistant({
bot,
assistant: {
async respond() {
assistantCalls += 1
return {
text: 'Still here.',
usage: {
inputTokens: 10,
outputTokens: 3,
totalTokens: 13
}
}
}
},
topicRouter,
purchaseRepository: createPurchaseRepository(),
purchaseInterpreter: async () => null,
householdConfigurationRepository,
promptRepository: createPromptRepository(),
financeServiceForHousehold: () => createFinanceService(),
memoryStore: createInMemoryAssistantConversationMemoryStore(12),
rateLimiter: createInMemoryAssistantRateLimiter({
burstLimit: 5,
burstWindowMs: 60_000,
rollingLimit: 50,
rollingWindowMs: 86_400_000
}),
usageTracker: createInMemoryAssistantUsageTracker()
})
await bot.handleUpdate(topicMentionUpdate('@household_test_bot how is life?') as never)
expect(routerCalls).toBe(1)
expect(assistantCalls).toBe(1)
expect(calls).toEqual(
expect.arrayContaining([
expect.objectContaining({
method: 'sendMessage',
payload: expect.objectContaining({
text: 'Still here.'
})
})
])
)
})
test('stays silent for regular group chatter when the bot is not addressed', async () => {
const bot = createTestBot()
const calls: Array<{ method: string; payload: unknown }> = []

View File

@@ -10,7 +10,13 @@ import type { Bot, Context } from 'grammy'
import { resolveReplyLocale } from './bot-locale'
import { getBotTranslations, type BotLocale } from './i18n'
import type { AssistantReply, ConversationalAssistant } from './openai-chat-assistant'
import type {
AssistantConversationMemoryStore,
AssistantRateLimiter,
AssistantUsageTracker
} from './assistant-state'
import { conversationMemoryKey } from './assistant-state'
import type { ConversationalAssistant } from './openai-chat-assistant'
import type { PurchaseMessageInterpreter } from './openai-purchase-interpreter'
import {
formatPaymentBalanceReplyText,
@@ -25,9 +31,18 @@ import type {
PurchaseProposalActionResult,
PurchaseTopicRecord
} from './purchase-topic-ingestion'
import type { TopicMessageRouter, TopicMessageRole } from './topic-message-router'
import { fallbackTopicMessageRoute, getCachedTopicMessageRoute } from './topic-message-router'
import { startTypingIndicator } from './telegram-chat-action'
import { stripExplicitBotMention } from './telegram-mentions'
export type { AssistantConversationMemoryStore, AssistantUsageTracker } from './assistant-state'
export {
createInMemoryAssistantConversationMemoryStore,
createInMemoryAssistantRateLimiter,
createInMemoryAssistantUsageTracker
} from './assistant-state'
const ASSISTANT_PAYMENT_ACTION = 'assistant_payment_confirmation' as const
const ASSISTANT_PAYMENT_CONFIRM_CALLBACK_PREFIX = 'assistant_payment:confirm:'
const ASSISTANT_PAYMENT_CANCEL_CALLBACK_PREFIX = 'assistant_payment:cancel:'
@@ -35,57 +50,11 @@ const ASSISTANT_PURCHASE_CONFIRM_CALLBACK_PREFIX = 'assistant_purchase:confirm:'
const ASSISTANT_PURCHASE_CANCEL_CALLBACK_PREFIX = 'assistant_purchase:cancel:'
const DM_ASSISTANT_MESSAGE_SOURCE = 'telegram-dm-assistant'
const GROUP_ASSISTANT_MESSAGE_SOURCE = 'telegram-group-assistant'
const MEMORY_SUMMARY_MAX_CHARS = 1200
const PURCHASE_VERB_PATTERN =
/\b(?:bought|buy|got|picked up|spent|купил(?:а|и)?|взял(?:а|и)?|выложил(?:а|и)?|отдал(?:а|и)?|потратил(?:а|и)?)\b/iu
const PURCHASE_MONEY_PATTERN =
/(?:\d+(?:[.,]\d{1,2})?\s*(?:|gel|lari|лари|usd|\$|доллар(?:а|ов)?|кровн\p{L}*)|\b\d+(?:[.,]\d{1,2})\b)/iu
interface AssistantConversationTurn {
role: 'user' | 'assistant'
text: string
}
interface AssistantConversationState {
summary: string | null
turns: AssistantConversationTurn[]
}
export interface AssistantConversationMemoryStore {
get(key: string): AssistantConversationState
appendTurn(key: string, turn: AssistantConversationTurn): AssistantConversationState
}
export interface AssistantRateLimitResult {
allowed: boolean
retryAfterMs: number
}
export interface AssistantRateLimiter {
consume(key: string): AssistantRateLimitResult
}
export interface AssistantUsageSnapshot {
householdId: string
telegramUserId: string
displayName: string
requestCount: number
inputTokens: number
outputTokens: number
totalTokens: number
updatedAt: string
}
export interface AssistantUsageTracker {
record(input: {
householdId: string
telegramUserId: string
displayName: string
usage: AssistantReply['usage']
}): void
listHouseholdUsage(householdId: string): readonly AssistantUsageSnapshot[]
}
type PurchaseActionResult = Extract<
PurchaseProposalActionResult,
{ status: 'confirmed' | 'already_confirmed' | 'cancelled' | 'already_cancelled' }
@@ -132,136 +101,6 @@ function isReplyToBotMessage(ctx: Context): boolean {
return replyAuthor.id === ctx.me.id
}
function summarizeTurns(
summary: string | null,
turns: readonly AssistantConversationTurn[]
): string {
const next = [summary, ...turns.map((turn) => `${turn.role}: ${turn.text}`)]
.filter(Boolean)
.join('\n')
return next.length <= MEMORY_SUMMARY_MAX_CHARS
? next
: next.slice(next.length - MEMORY_SUMMARY_MAX_CHARS)
}
function conversationMemoryKey(input: {
telegramUserId: string
telegramChatId: string
isPrivateChat: boolean
}): string {
return input.isPrivateChat
? input.telegramUserId
: `group:${input.telegramChatId}:${input.telegramUserId}`
}
export function createInMemoryAssistantConversationMemoryStore(
maxTurns: number
): AssistantConversationMemoryStore {
const memory = new Map<string, AssistantConversationState>()
return {
get(key) {
return memory.get(key) ?? { summary: null, turns: [] }
},
appendTurn(key, turn) {
const current = memory.get(key) ?? { summary: null, turns: [] }
const nextTurns = [...current.turns, turn]
if (nextTurns.length <= maxTurns) {
const nextState = {
summary: current.summary,
turns: nextTurns
}
memory.set(key, nextState)
return nextState
}
const overflowCount = nextTurns.length - maxTurns
const overflow = nextTurns.slice(0, overflowCount)
const retained = nextTurns.slice(overflowCount)
const nextState = {
summary: summarizeTurns(current.summary, overflow),
turns: retained
}
memory.set(key, nextState)
return nextState
}
}
}
export function createInMemoryAssistantRateLimiter(config: {
burstLimit: number
burstWindowMs: number
rollingLimit: number
rollingWindowMs: number
}): AssistantRateLimiter {
const timestamps = new Map<string, number[]>()
return {
consume(key) {
const now = Date.now()
const events = (timestamps.get(key) ?? []).filter(
(timestamp) => now - timestamp < config.rollingWindowMs
)
const burstEvents = events.filter((timestamp) => now - timestamp < config.burstWindowMs)
if (burstEvents.length >= config.burstLimit) {
const oldestBurstEvent = burstEvents[0] ?? now
return {
allowed: false,
retryAfterMs: Math.max(1, config.burstWindowMs - (now - oldestBurstEvent))
}
}
if (events.length >= config.rollingLimit) {
const oldestEvent = events[0] ?? now
return {
allowed: false,
retryAfterMs: Math.max(1, config.rollingWindowMs - (now - oldestEvent))
}
}
events.push(now)
timestamps.set(key, events)
return {
allowed: true,
retryAfterMs: 0
}
}
}
}
export function createInMemoryAssistantUsageTracker(): AssistantUsageTracker {
const usage = new Map<string, AssistantUsageSnapshot>()
return {
record(input) {
const key = `${input.householdId}:${input.telegramUserId}`
const current = usage.get(key)
usage.set(key, {
householdId: input.householdId,
telegramUserId: input.telegramUserId,
displayName: input.displayName,
requestCount: (current?.requestCount ?? 0) + 1,
inputTokens: (current?.inputTokens ?? 0) + input.usage.inputTokens,
outputTokens: (current?.outputTokens ?? 0) + input.usage.outputTokens,
totalTokens: (current?.totalTokens ?? 0) + input.usage.totalTokens,
updatedAt: new Date().toISOString()
})
},
listHouseholdUsage(householdId) {
return [...usage.values()]
.filter((entry) => entry.householdId === householdId)
.sort((left, right) => right.totalTokens - left.totalTokens)
}
}
}
function formatRetryDelay(locale: BotLocale, retryAfterMs: number): string {
const t = getBotTranslations(locale).assistant
const roundedMinutes = Math.ceil(retryAfterMs / 60_000)
@@ -476,6 +315,45 @@ async function resolveAssistantConfig(
}
}
async function routeGroupAssistantMessage(input: {
router: TopicMessageRouter | undefined
locale: BotLocale
topicRole: TopicMessageRole
messageText: string
isExplicitMention: boolean
isReplyToBot: boolean
assistantContext: string | null
assistantTone: string | null
memoryStore: AssistantConversationMemoryStore
memoryKey: string
}) {
if (!input.router) {
return fallbackTopicMessageRoute({
locale: input.locale,
topicRole: input.topicRole,
messageText: input.messageText,
isExplicitMention: input.isExplicitMention,
isReplyToBot: input.isReplyToBot,
activeWorkflow: null,
assistantContext: input.assistantContext,
assistantTone: input.assistantTone,
recentTurns: input.memoryStore.get(input.memoryKey).turns
})
}
return input.router({
locale: input.locale,
topicRole: input.topicRole,
messageText: input.messageText,
isExplicitMention: input.isExplicitMention,
isReplyToBot: input.isReplyToBot,
activeWorkflow: null,
assistantContext: input.assistantContext,
assistantTone: input.assistantTone,
recentTurns: input.memoryStore.get(input.memoryKey).turns
})
}
function formatAssistantLedger(
dashboard: NonNullable<Awaited<ReturnType<FinanceCommandService['generateDashboard']>>>
) {
@@ -699,6 +577,7 @@ async function replyWithAssistant(input: {
export function registerDmAssistant(options: {
bot: Bot
assistant?: ConversationalAssistant
topicRouter?: TopicMessageRouter
purchaseRepository?: PurchaseMessageIngestionRepository
purchaseInterpreter?: PurchaseMessageInterpreter
householdConfigurationRepository: HouseholdConfigurationRepository
@@ -1267,25 +1146,21 @@ export function registerDmAssistant(options: {
await next()
return
}
if (
!isAddressed &&
const binding =
ctx.msg &&
'is_topic_message' in ctx.msg &&
ctx.msg.is_topic_message === true &&
'message_thread_id' in ctx.msg &&
ctx.msg.message_thread_id !== undefined
) {
const binding =
await options.householdConfigurationRepository.findHouseholdTopicByTelegramContext({
telegramChatId,
telegramThreadId: ctx.msg.message_thread_id.toString()
})
? await options.householdConfigurationRepository.findHouseholdTopicByTelegramContext({
telegramChatId,
telegramThreadId: ctx.msg.message_thread_id.toString()
})
: null
if (binding) {
await next()
return
}
if (binding && !isAddressed) {
await next()
return
}
const member = await options.householdConfigurationRepository.getHouseholdMember(
@@ -1330,22 +1205,78 @@ export function registerDmAssistant(options: {
}
try {
const financeService = options.financeServiceForHousehold(household.householdId)
const [settings, assistantConfig] = await Promise.all([
options.householdConfigurationRepository.getHouseholdBillingSettings(household.householdId),
resolveAssistantConfig(options.householdConfigurationRepository, household.householdId)
])
const memoryKey = conversationMemoryKey({
telegramUserId,
telegramChatId,
isPrivateChat: false
})
const messageText = mention?.strippedText ?? ctx.msg.text.trim()
const assistantConfig = await resolveAssistantConfig(
options.householdConfigurationRepository,
household.householdId
)
const topicRole: TopicMessageRole =
binding?.role === 'purchase' ||
binding?.role === 'payments' ||
binding?.role === 'reminders' ||
binding?.role === 'feedback'
? binding.role
: 'generic'
const cachedRoute =
topicRole === 'purchase' || topicRole === 'payments'
? getCachedTopicMessageRoute(ctx, topicRole)
: null
const route =
cachedRoute ??
(options.topicRouter
? await routeGroupAssistantMessage({
router: options.topicRouter,
locale,
topicRole,
messageText,
isExplicitMention: Boolean(mention),
isReplyToBot: isReplyToBotMessage(ctx),
assistantContext: assistantConfig.assistantContext,
assistantTone: assistantConfig.assistantTone,
memoryStore: options.memoryStore,
memoryKey
})
: null)
if (options.purchaseRepository && options.purchaseInterpreter) {
if (route) {
if (route.route === 'chat_reply' || route.route === 'dismiss_workflow') {
if (route.replyText) {
options.memoryStore.appendTurn(memoryKey, {
role: 'user',
text: messageText
})
options.memoryStore.appendTurn(memoryKey, {
role: 'assistant',
text: route.replyText
})
await ctx.reply(route.replyText)
}
return
}
if (route.route === 'silent') {
await next()
return
}
}
const financeService = options.financeServiceForHousehold(household.householdId)
const settings = await options.householdConfigurationRepository.getHouseholdBillingSettings(
household.householdId
)
if (!binding && options.purchaseRepository && options.purchaseInterpreter) {
const purchaseRecord = createGroupPurchaseRecord(ctx, household.householdId, messageText)
if (purchaseRecord) {
if (
purchaseRecord &&
(!route || route.route === 'purchase_candidate' || route.route === 'topic_helper')
) {
const purchaseResult = await options.purchaseRepository.save(
purchaseRecord,
options.purchaseInterpreter,
@@ -1373,15 +1304,7 @@ export function registerDmAssistant(options: {
await ctx.reply(buildPurchaseClarificationText(locale, purchaseResult))
return
}
if (!isAddressed) {
await next()
return
}
}
} else if (!isAddressed) {
await next()
return
}
if (!isAddressed || messageText.length === 0) {

View File

@@ -35,6 +35,7 @@ import { getBotRuntimeConfig } from './config'
import { registerHouseholdSetupCommands } from './household-setup'
import { createOpenAiChatAssistant } from './openai-chat-assistant'
import { createOpenAiPurchaseInterpreter } from './openai-purchase-interpreter'
import { createOpenAiTopicMessageRouter } from './topic-message-router'
import {
createPurchaseMessageRepository,
registerConfiguredPurchaseTopicIngestion
@@ -145,6 +146,11 @@ const conversationalAssistant = createOpenAiChatAssistant(
runtime.assistantModel,
runtime.assistantTimeoutMs
)
const topicMessageRouter = createOpenAiTopicMessageRouter(
runtime.openaiApiKey,
runtime.assistantRouterModel,
Math.min(runtime.assistantTimeoutMs, 5_000)
)
const anonymousFeedbackRepositoryClients = new Map<
string,
ReturnType<typeof createDbAnonymousFeedbackRepository>
@@ -237,6 +243,12 @@ if (purchaseRepositoryClient && householdConfigurationRepositoryClient) {
householdConfigurationRepositoryClient.repository,
purchaseRepositoryClient.repository,
{
...(topicMessageRouter
? {
router: topicMessageRouter,
memoryStore: assistantMemoryStore
}
: {}),
...(purchaseInterpreter
? {
interpreter: purchaseInterpreter
@@ -253,6 +265,12 @@ if (purchaseRepositoryClient && householdConfigurationRepositoryClient) {
financeServiceForHousehold,
paymentConfirmationServiceForHousehold,
{
...(topicMessageRouter
? {
router: topicMessageRouter,
memoryStore: assistantMemoryStore
}
: {}),
logger: getLogger('payment-ingestion')
}
)
@@ -432,6 +450,11 @@ if (
assistant: conversationalAssistant
}
: {}),
...(topicMessageRouter
? {
topicRouter: topicMessageRouter
}
: {}),
logger: getLogger('dm-assistant')
})
} else {
@@ -458,6 +481,11 @@ if (
assistant: conversationalAssistant
}
: {}),
...(topicMessageRouter
? {
topicRouter: topicMessageRouter
}
: {}),
logger: getLogger('dm-assistant')
})
}

View File

@@ -29,12 +29,17 @@ export interface ConversationalAssistant {
const ASSISTANT_SYSTEM_PROMPT = [
'You are Kojori, a household finance assistant for one specific household.',
'Stay within the provided household context and recent conversation context.',
'Be calm, concise, playful when appropriate, and quiet by default.',
'Do not act like a form validator or aggressive parser.',
'Do not invent balances, members, billing periods, or completed actions.',
'If the user asks you to mutate household state, do not claim the action is complete unless the system explicitly says it was confirmed and saved.',
'For unsupported writes, explain the limitation briefly and suggest the explicit command or confirmation flow.',
'Prefer concise, practical answers.',
'Default to one to three short sentences.',
'For simple greetings or small talk, reply in a single short sentence unless the user asks for more.',
'If the user is joking or testing you, you may answer playfully in one short sentence.',
'If the user tells you to stop, back off briefly and do not keep asking follow-up questions.',
'Do not repeat the same clarification after the user declines, backs off, or says they are only thinking.',
'Do not restate the full household context unless the user explicitly asks for details.',
'Avoid bullet lists unless the user asked for a list or several distinct items.',
'Reply in the user language inferred from the latest user message and locale context.'

View File

@@ -25,13 +25,13 @@ function candidate(overrides: Partial<PaymentTopicCandidate> = {}): PaymentTopic
}
}
function paymentUpdate(text: string) {
function paymentUpdate(text: string, threadId = 888) {
return {
update_id: 1001,
message: {
message_id: 55,
date: Math.floor(Date.now() / 1000),
message_thread_id: 888,
message_thread_id: threadId,
is_topic_message: true,
chat: {
id: -10012345,
@@ -646,4 +646,127 @@ describe('registerConfiguredPaymentTopicIngestion', () => {
text: expect.stringContaining('Я могу записать эту оплату аренды: 472.50 GEL.')
})
})
test('uses router for playful addressed replies in the payments topic', async () => {
const bot = createTelegramBot('000000:test-token')
const calls: Array<{ method: string; payload: unknown }> = []
const promptRepository = createPromptRepository()
bot.botInfo = {
id: 999000,
is_bot: true,
first_name: 'Household Test Bot',
username: 'household_test_bot',
can_join_groups: true,
can_read_all_group_messages: false,
supports_inline_queries: false,
can_connect_to_business: false,
has_main_web_app: false,
has_topics_enabled: true,
allows_users_to_create_topics: false
}
bot.api.config.use(async (_prev, method, payload) => {
calls.push({ method, payload })
return {
ok: true,
result: true
} as never
})
registerConfiguredPaymentTopicIngestion(
bot,
createHouseholdRepository() as never,
promptRepository,
() => createFinanceService(),
() => createPaymentConfirmationService(),
{
router: async () => ({
route: 'chat_reply',
replyText: 'Тут. Если это про оплату, разберёмся.',
helperKind: null,
shouldStartTyping: false,
shouldClearWorkflow: false,
confidence: 94,
reason: 'smalltalk'
})
}
)
await bot.handleUpdate(paymentUpdate('@household_test_bot а ты тут?') as never)
expect(calls).toHaveLength(1)
expect(calls[0]).toMatchObject({
method: 'sendMessage',
payload: {
text: 'Тут. Если это про оплату, разберёмся.'
}
})
})
test('keeps a pending payment workflow in another thread when dismissing here', async () => {
const bot = createTelegramBot('000000:test-token')
const promptRepository = createPromptRepository()
bot.botInfo = {
id: 999000,
is_bot: true,
first_name: 'Household Test Bot',
username: 'household_test_bot',
can_join_groups: true,
can_read_all_group_messages: false,
supports_inline_queries: false,
can_connect_to_business: false,
has_main_web_app: false,
has_topics_enabled: true,
allows_users_to_create_topics: false
}
bot.api.config.use(async () => {
return {
ok: true,
result: true
} as never
})
await promptRepository.upsertPendingAction({
telegramUserId: '10002',
telegramChatId: '-10012345',
action: 'payment_topic_clarification',
payload: {
threadId: '999',
rawText: 'За жилье отправил'
},
expiresAt: null
})
registerConfiguredPaymentTopicIngestion(
bot,
createHouseholdRepository() as never,
promptRepository,
() => createFinanceService(),
() => createPaymentConfirmationService(),
{
router: async () => ({
route: 'dismiss_workflow',
replyText: 'Окей, молчу.',
helperKind: null,
shouldStartTyping: false,
shouldClearWorkflow: true,
confidence: 97,
reason: 'backoff'
})
}
)
await bot.handleUpdate(paymentUpdate('@household_test_bot stop', 888) as never)
expect(await promptRepository.getPendingAction('-10012345', '10002')).toMatchObject({
action: 'payment_topic_clarification',
payload: {
threadId: '999',
rawText: 'За жилье отправил'
}
})
})
})

View File

@@ -9,12 +9,21 @@ import type {
} from '@household/ports'
import { getBotTranslations, type BotLocale } from './i18n'
import type { AssistantConversationMemoryStore } from './assistant-state'
import { conversationMemoryKey } from './assistant-state'
import {
formatPaymentBalanceReplyText,
formatPaymentProposalText,
maybeCreatePaymentBalanceReply,
maybeCreatePaymentProposal,
parsePaymentProposalPayload,
synthesizePaymentConfirmationText
} from './payment-proposals'
import {
cacheTopicMessageRoute,
getCachedTopicMessageRoute,
type TopicMessageRouter
} from './topic-message-router'
import { stripExplicitBotMention } from './telegram-mentions'
const PAYMENT_TOPIC_CONFIRM_CALLBACK_PREFIX = 'payment_topic:confirm:'
@@ -94,6 +103,15 @@ function attachmentCount(ctx: Context): number {
return 0
}
function isReplyToBotMessage(ctx: Context): boolean {
const replyAuthor = ctx.msg?.reply_to_message?.from
if (!replyAuthor) {
return false
}
return replyAuthor.id === ctx.me.id
}
function toCandidateFromContext(ctx: Context): PaymentTopicCandidate | null {
const message = ctx.message
const rawText = stripExplicitBotMention(ctx)?.strippedText ?? readMessageText(ctx)
@@ -150,6 +168,99 @@ export function resolveConfiguredPaymentTopicRecord(
}
}
async function resolveAssistantConfig(
householdConfigurationRepository: HouseholdConfigurationRepository,
householdId: string
): Promise<{
assistantContext: string | null
assistantTone: string | null
}> {
const config = householdConfigurationRepository.getHouseholdAssistantConfig
? await householdConfigurationRepository.getHouseholdAssistantConfig(householdId)
: null
return {
assistantContext: config?.assistantContext ?? null,
assistantTone: config?.assistantTone ?? null
}
}
function memoryKeyForRecord(record: PaymentTopicRecord): string {
return conversationMemoryKey({
telegramUserId: record.senderTelegramUserId,
telegramChatId: record.chatId,
isPrivateChat: false
})
}
function appendConversation(
memoryStore: AssistantConversationMemoryStore | undefined,
record: PaymentTopicRecord,
userText: string,
assistantText: string
): void {
if (!memoryStore) {
return
}
const key = memoryKeyForRecord(record)
memoryStore.appendTurn(key, {
role: 'user',
text: userText
})
memoryStore.appendTurn(key, {
role: 'assistant',
text: assistantText
})
}
async function routePaymentTopicMessage(input: {
record: PaymentTopicRecord
locale: BotLocale
topicRole: 'payments'
isExplicitMention: boolean
isReplyToBot: boolean
activeWorkflow: 'payment_clarification' | 'payment_confirmation' | null
assistantContext: string | null
assistantTone: string | null
memoryStore: AssistantConversationMemoryStore | undefined
router: TopicMessageRouter | undefined
}) {
if (!input.router) {
return input.activeWorkflow
? {
route: 'payment_followup' as const,
replyText: null,
helperKind: 'payment' as const,
shouldStartTyping: false,
shouldClearWorkflow: false,
confidence: 75,
reason: 'legacy_payment_followup'
}
: {
route: 'payment_candidate' as const,
replyText: null,
helperKind: 'payment' as const,
shouldStartTyping: false,
shouldClearWorkflow: false,
confidence: 75,
reason: 'legacy_payment_candidate'
}
}
return input.router({
locale: input.locale,
topicRole: input.topicRole,
messageText: input.record.rawText,
isExplicitMention: input.isExplicitMention,
isReplyToBot: input.isReplyToBot,
activeWorkflow: input.activeWorkflow,
assistantContext: input.assistantContext,
assistantTone: input.assistantTone,
recentTurns: input.memoryStore?.get(memoryKeyForRecord(input.record)).turns ?? []
})
}
export function buildPaymentAcknowledgement(
locale: BotLocale,
result:
@@ -265,6 +376,8 @@ export function registerConfiguredPaymentTopicIngestion(
financeServiceForHousehold: (householdId: string) => FinanceCommandService,
paymentServiceForHousehold: (householdId: string) => PaymentConfirmationService,
options: {
router?: TopicMessageRouter
memoryStore?: AssistantConversationMemoryStore
logger?: Logger
} = {}
): void {
@@ -422,9 +535,6 @@ export function registerConfiguredPaymentTopicIngestion(
try {
const locale = await resolveTopicLocale(ctx, householdConfigurationRepository)
const t = getBotTranslations(locale).payments
const financeService = financeServiceForHousehold(record.householdId)
const member = await financeService.getMemberByTelegramUserId(record.senderTelegramUserId)
const pending = await promptRepository.getPendingAction(
record.chatId,
record.senderTelegramUserId
@@ -437,6 +547,88 @@ export function registerConfiguredPaymentTopicIngestion(
clarificationPayload && clarificationPayload.threadId === record.threadId
? `${clarificationPayload.rawText}\n${record.rawText}`
: record.rawText
const confirmationPayload =
pending?.action === PAYMENT_TOPIC_CONFIRMATION_ACTION
? parsePaymentTopicConfirmationPayload(pending.payload)
: null
const assistantConfig = await resolveAssistantConfig(
householdConfigurationRepository,
record.householdId
)
const activeWorkflow =
clarificationPayload && clarificationPayload.threadId === record.threadId
? 'payment_clarification'
: confirmationPayload && confirmationPayload.telegramThreadId === record.threadId
? 'payment_confirmation'
: null
const route =
getCachedTopicMessageRoute(ctx, 'payments') ??
(await routePaymentTopicMessage({
record,
locale,
topicRole: 'payments',
isExplicitMention: stripExplicitBotMention(ctx) !== null,
isReplyToBot: isReplyToBotMessage(ctx),
activeWorkflow,
assistantContext: assistantConfig.assistantContext,
assistantTone: assistantConfig.assistantTone,
memoryStore: options.memoryStore,
router: options.router
}))
cacheTopicMessageRoute(ctx, 'payments', route)
if (route.route === 'silent') {
await next()
return
}
if (route.shouldClearWorkflow && activeWorkflow !== null) {
await promptRepository.clearPendingAction(record.chatId, record.senderTelegramUserId)
}
if (route.route === 'chat_reply' || route.route === 'dismiss_workflow') {
if (route.replyText) {
await replyToPaymentMessage(ctx, route.replyText)
appendConversation(options.memoryStore, record, record.rawText, route.replyText)
}
return
}
if (route.route === 'topic_helper') {
const financeService = financeServiceForHousehold(record.householdId)
const member = await financeService.getMemberByTelegramUserId(record.senderTelegramUserId)
if (!member) {
await next()
return
}
const balanceReply = await maybeCreatePaymentBalanceReply({
rawText: combinedText,
householdId: record.householdId,
memberId: member.id,
financeService,
householdConfigurationRepository
})
if (!balanceReply) {
await next()
return
}
const helperText = formatPaymentBalanceReplyText(locale, balanceReply)
await replyToPaymentMessage(ctx, helperText)
appendConversation(options.memoryStore, record, record.rawText, helperText)
return
}
if (route.route !== 'payment_candidate' && route.route !== 'payment_followup') {
await next()
return
}
const t = getBotTranslations(locale).payments
const financeService = financeServiceForHousehold(record.householdId)
const member = await financeService.getMemberByTelegramUserId(record.senderTelegramUserId)
if (!member) {
await next()
@@ -469,6 +661,7 @@ export function registerConfiguredPaymentTopicIngestion(
})
await replyToPaymentMessage(ctx, t.clarification)
appendConversation(options.memoryStore, record, record.rawText, t.clarification)
return
}
@@ -476,11 +669,13 @@ export function registerConfiguredPaymentTopicIngestion(
if (proposal.status === 'unsupported_currency') {
await replyToPaymentMessage(ctx, t.unsupportedCurrency)
appendConversation(options.memoryStore, record, record.rawText, t.unsupportedCurrency)
return
}
if (proposal.status === 'no_balance') {
await replyToPaymentMessage(ctx, t.noBalance)
appendConversation(options.memoryStore, record, record.rawText, t.noBalance)
return
}
@@ -502,15 +697,17 @@ export function registerConfiguredPaymentTopicIngestion(
expiresAt: nowInstant().add({ milliseconds: PAYMENT_TOPIC_ACTION_TTL_MS })
})
const proposalText = formatPaymentProposalText({
locale,
surface: 'topic',
proposal
})
await replyToPaymentMessage(
ctx,
formatPaymentProposalText({
locale,
surface: 'topic',
proposal
}),
proposalText,
paymentProposalReplyMarkup(locale, proposal.payload.proposalId)
)
appendConversation(options.memoryStore, record, record.rawText, proposalText)
}
} catch (error) {
options.logger?.error(

View File

@@ -1232,6 +1232,136 @@ Confirm or cancel below.`
})
})
test('replies playfully to addressed banter with router and skips purchase save', async () => {
const bot = createTestBot()
const calls: Array<{ method: string; payload: unknown }> = []
let saveCalls = 0
bot.api.config.use(async (_prev, method, payload) => {
calls.push({ method, payload })
return {
ok: true,
result: {
message_id: calls.length,
date: Math.floor(Date.now() / 1000),
chat: {
id: Number(config.householdChatId),
type: 'supergroup'
},
text: 'ok'
}
} as never
})
const repository: PurchaseMessageIngestionRepository = {
async hasClarificationContext() {
return false
},
async save() {
saveCalls += 1
throw new Error('not used')
},
async confirm() {
throw new Error('not used')
},
async cancel() {
throw new Error('not used')
},
async toggleParticipant() {
throw new Error('not used')
}
}
registerPurchaseTopicIngestion(bot, config, repository, {
router: async () => ({
route: 'chat_reply',
replyText: 'Тут. Если что-то реально купили, подключусь.',
helperKind: null,
shouldStartTyping: false,
shouldClearWorkflow: false,
confidence: 95,
reason: 'smalltalk'
})
})
await bot.handleUpdate(purchaseUpdate('@household_test_bot А ты тут?') as never)
expect(saveCalls).toBe(0)
expect(calls).toHaveLength(1)
expect(calls[0]).toMatchObject({
method: 'sendMessage',
payload: {
text: 'Тут. Если что-то реально купили, подключусь.'
}
})
})
test('clears active purchase clarification when router dismisses the workflow', async () => {
const bot = createTestBot()
const calls: Array<{ method: string; payload: unknown }> = []
let clearCalls = 0
bot.api.config.use(async (_prev, method, payload) => {
calls.push({ method, payload })
return {
ok: true,
result: {
message_id: calls.length,
date: Math.floor(Date.now() / 1000),
chat: {
id: Number(config.householdChatId),
type: 'supergroup'
},
text: 'ok'
}
} as never
})
const repository: PurchaseMessageIngestionRepository = {
async hasClarificationContext() {
return true
},
async clearClarificationContext() {
clearCalls += 1
},
async save() {
throw new Error('not used')
},
async confirm() {
throw new Error('not used')
},
async cancel() {
throw new Error('not used')
},
async toggleParticipant() {
throw new Error('not used')
}
}
registerPurchaseTopicIngestion(bot, config, repository, {
router: async () => ({
route: 'dismiss_workflow',
replyText: 'Окей, молчу.',
helperKind: null,
shouldStartTyping: false,
shouldClearWorkflow: true,
confidence: 98,
reason: 'backoff'
})
})
await bot.handleUpdate(purchaseUpdate('Отстань') as never)
expect(clearCalls).toBe(1)
expect(calls).toHaveLength(1)
expect(calls[0]).toMatchObject({
method: 'sendMessage',
payload: {
text: 'Окей, молчу.'
}
})
})
test('continues purchase handling for replies to bot messages without a fresh mention', async () => {
const bot = createTestBot()
const calls: Array<{ method: string; payload: unknown }> = []

View File

@@ -9,11 +9,19 @@ import type {
import { createDbClient, schema } from '@household/db'
import { getBotTranslations, type BotLocale } from './i18n'
import type { AssistantConversationMemoryStore } from './assistant-state'
import { conversationMemoryKey } from './assistant-state'
import type {
PurchaseInterpretationAmountSource,
PurchaseInterpretation,
PurchaseMessageInterpreter
} from './openai-purchase-interpreter'
import {
cacheTopicMessageRoute,
getCachedTopicMessageRoute,
type TopicMessageRouter,
type TopicMessageRoutingResult
} from './topic-message-router'
import { startTypingIndicator } from './telegram-chat-action'
import { stripExplicitBotMention } from './telegram-mentions'
@@ -30,20 +38,6 @@ const MONEY_SIGNAL_PATTERN =
/\b\d+(?:[.,]\d{1,2})?\s*(?:|gel|lari|usd|\$)\b|\d+(?:[.,]\d{1,2})?\s*(?:лари|лри|tetri|тетри|доллар(?:а|ов)?)(?=$|[^\p{L}])|\b(?:for|за|на|до)\s+\d+(?:[.,]\d{1,2})?\b|\b(?:paid|spent)\s+\d+(?:[.,]\d{1,2})?\b|(?:^|[^\p{L}])(?:заплатил(?:а|и)?|потратил(?:а|и)?|отдал(?:а|и)?|выложил(?:а|и)?|сторговался(?:\s+до)?)(?:\s+\d+(?:[.,]\d{1,2})?|\s+до\s+\d+(?:[.,]\d{1,2})?)(?=$|[^\p{L}])/iu
const STANDALONE_NUMBER_PATTERN = /\b\d+(?:[.,]\d{1,2})?\b/gu
type PurchaseTopicEngagement =
| {
kind: 'direct'
showProcessingReply: boolean
}
| {
kind: 'clarification'
showProcessingReply: boolean
}
| {
kind: 'likely_purchase'
showProcessingReply: true
}
type StoredPurchaseProcessingStatus =
| 'pending_confirmation'
| 'clarification_needed'
@@ -202,6 +196,7 @@ export type PurchaseProposalAmountCorrectionResult =
export interface PurchaseMessageIngestionRepository {
hasClarificationContext(record: PurchaseTopicRecord): Promise<boolean>
clearClarificationContext?(record: PurchaseTopicRecord): Promise<void>
save(
record: PurchaseTopicRecord,
interpreter?: PurchaseMessageInterpreter,
@@ -285,36 +280,6 @@ function looksLikeLikelyCompletedPurchase(rawText: string): boolean {
return Array.from(rawText.matchAll(STANDALONE_NUMBER_PATTERN)).length === 1
}
async function resolvePurchaseTopicEngagement(
ctx: Pick<Context, 'msg' | 'me'>,
record: PurchaseTopicRecord,
repository: Pick<PurchaseMessageIngestionRepository, 'hasClarificationContext'>
): Promise<PurchaseTopicEngagement | null> {
const hasExplicitMention = stripExplicitBotMention(ctx) !== null
if (hasExplicitMention || isReplyToCurrentBot(ctx)) {
return {
kind: 'direct',
showProcessingReply: looksLikeLikelyCompletedPurchase(record.rawText)
}
}
if (await repository.hasClarificationContext(record)) {
return {
kind: 'clarification',
showProcessingReply: false
}
}
if (looksLikeLikelyCompletedPurchase(record.rawText)) {
return {
kind: 'likely_purchase',
showProcessingReply: true
}
}
return null
}
function normalizeInterpretation(
interpretation: PurchaseInterpretation | null,
parserError: string | null
@@ -516,6 +481,22 @@ async function sendPurchaseProcessingReply(
}
}
function shouldShowProcessingReply(
ctx: Pick<Context, 'msg' | 'me'>,
record: PurchaseTopicRecord,
route: TopicMessageRoutingResult
): boolean {
if (route.route !== 'purchase_candidate' || !route.shouldStartTyping) {
return false
}
if (stripExplicitBotMention(ctx) !== null || isReplyToCurrentBot(ctx)) {
return looksLikeLikelyCompletedPurchase(record.rawText)
}
return true
}
async function finalizePurchaseReply(
ctx: Context,
pendingReply: PendingPurchaseReply | null,
@@ -943,6 +924,23 @@ export function createPurchaseMessageRepository(databaseUrl: string): {
return Boolean(clarificationContext && clarificationContext.length > 0)
},
async clearClarificationContext(record) {
await db
.update(schema.purchaseMessages)
.set({
processingStatus: 'ignored_not_purchase',
needsReview: 0
})
.where(
and(
eq(schema.purchaseMessages.householdId, record.householdId),
eq(schema.purchaseMessages.senderTelegramUserId, record.senderTelegramUserId),
eq(schema.purchaseMessages.telegramThreadId, record.threadId),
eq(schema.purchaseMessages.processingStatus, 'clarification_needed')
)
)
},
async save(record, interpreter, defaultCurrency, options) {
const matchedMember = await db
.select({ id: schema.members.id })
@@ -1441,6 +1439,118 @@ async function resolveAssistantConfig(
}
}
function memoryKeyForRecord(record: PurchaseTopicRecord): string {
return conversationMemoryKey({
telegramUserId: record.senderTelegramUserId,
telegramChatId: record.chatId,
isPrivateChat: false
})
}
function appendConversation(
memoryStore: AssistantConversationMemoryStore | undefined,
record: PurchaseTopicRecord,
userText: string,
assistantText: string
): void {
if (!memoryStore) {
return
}
const key = memoryKeyForRecord(record)
memoryStore.appendTurn(key, {
role: 'user',
text: userText
})
memoryStore.appendTurn(key, {
role: 'assistant',
text: assistantText
})
}
async function routePurchaseTopicMessage(input: {
ctx: Pick<Context, 'msg' | 'me'>
record: PurchaseTopicRecord
locale: BotLocale
repository: Pick<
PurchaseMessageIngestionRepository,
'hasClarificationContext' | 'clearClarificationContext'
>
router: TopicMessageRouter | undefined
memoryStore: AssistantConversationMemoryStore | undefined
assistantContext?: string | null
assistantTone?: string | null
}): Promise<TopicMessageRoutingResult> {
if (!input.router) {
const hasExplicitMention = stripExplicitBotMention(input.ctx) !== null
const isReply = isReplyToCurrentBot(input.ctx)
const hasClarificationContext = await input.repository.hasClarificationContext(input.record)
if (hasExplicitMention || isReply) {
return {
route: 'purchase_candidate',
replyText: null,
helperKind: 'purchase',
shouldStartTyping: true,
shouldClearWorkflow: false,
confidence: 75,
reason: 'legacy_direct'
}
}
if (hasClarificationContext) {
return {
route: 'purchase_followup',
replyText: null,
helperKind: 'purchase',
shouldStartTyping: true,
shouldClearWorkflow: false,
confidence: 75,
reason: 'legacy_clarification'
}
}
if (looksLikeLikelyCompletedPurchase(input.record.rawText)) {
return {
route: 'purchase_candidate',
replyText: null,
helperKind: 'purchase',
shouldStartTyping: true,
shouldClearWorkflow: false,
confidence: 75,
reason: 'legacy_likely_purchase'
}
}
return {
route: 'silent',
replyText: null,
helperKind: null,
shouldStartTyping: false,
shouldClearWorkflow: false,
confidence: 80,
reason: 'legacy_silent'
}
}
const key = memoryKeyForRecord(input.record)
const recentTurns = input.memoryStore?.get(key).turns ?? []
return input.router({
locale: input.locale,
topicRole: 'purchase',
messageText: input.record.rawText,
isExplicitMention: stripExplicitBotMention(input.ctx) !== null,
isReplyToBot: isReplyToCurrentBot(input.ctx),
activeWorkflow: (await input.repository.hasClarificationContext(input.record))
? 'purchase_clarification'
: null,
assistantContext: input.assistantContext ?? null,
assistantTone: input.assistantTone ?? null,
recentTurns
})
}
async function handlePurchaseMessageResult(
ctx: Context,
record: PurchaseTopicRecord,
@@ -1766,6 +1876,8 @@ export function registerPurchaseTopicIngestion(
repository: PurchaseMessageIngestionRepository,
options: {
interpreter?: PurchaseMessageInterpreter
router?: TopicMessageRouter
memoryStore?: AssistantConversationMemoryStore
logger?: Logger
} = {}
): void {
@@ -1787,20 +1899,54 @@ export function registerPurchaseTopicIngestion(
let typingIndicator: ReturnType<typeof startTypingIndicator> | null = null
try {
const engagement = await resolvePurchaseTopicEngagement(ctx, record, repository)
if (!engagement) {
const route =
getCachedTopicMessageRoute(ctx, 'purchase') ??
(await routePurchaseTopicMessage({
ctx,
record,
locale: 'en',
repository,
router: options.router,
memoryStore: options.memoryStore
}))
cacheTopicMessageRoute(ctx, 'purchase', route)
if (route.route === 'silent') {
await next()
return
}
typingIndicator = options.interpreter ? startTypingIndicator(ctx) : null
if (route.shouldClearWorkflow) {
await repository.clearClarificationContext?.(record)
}
if (route.route === 'chat_reply' || route.route === 'dismiss_workflow') {
if (route.replyText) {
await replyToPurchaseMessage(ctx, route.replyText)
appendConversation(options.memoryStore, record, record.rawText, route.replyText)
}
return
}
if (route.route === 'topic_helper') {
await next()
return
}
if (route.route !== 'purchase_candidate' && route.route !== 'purchase_followup') {
await next()
return
}
typingIndicator =
options.interpreter && route.shouldStartTyping ? startTypingIndicator(ctx) : null
const pendingReply =
options.interpreter && engagement.showProcessingReply
options.interpreter && shouldShowProcessingReply(ctx, record, route)
? await sendPurchaseProcessingReply(ctx, getBotTranslations('en').purchase.processing)
: null
const result = await repository.save(record, options.interpreter, 'GEL')
if (engagement.kind === 'direct' && result.status === 'ignored_not_purchase') {
if (result.status === 'ignored_not_purchase') {
return await next()
}
await handlePurchaseMessageResult(ctx, record, result, 'en', options.logger, pendingReply)
@@ -1828,6 +1974,8 @@ export function registerConfiguredPurchaseTopicIngestion(
repository: PurchaseMessageIngestionRepository,
options: {
interpreter?: PurchaseMessageInterpreter
router?: TopicMessageRouter
memoryStore?: AssistantConversationMemoryStore
logger?: Logger
} = {}
): void {
@@ -1864,13 +2012,6 @@ export function registerConfiguredPurchaseTopicIngestion(
let typingIndicator: ReturnType<typeof startTypingIndicator> | null = null
try {
const engagement = await resolvePurchaseTopicEngagement(ctx, record, repository)
if (!engagement) {
await next()
return
}
typingIndicator = options.interpreter ? startTypingIndicator(ctx) : null
const [billingSettings, assistantConfig] = await Promise.all([
householdConfigurationRepository.getHouseholdBillingSettings(record.householdId),
resolveAssistantConfig(householdConfigurationRepository, record.householdId)
@@ -1879,8 +2020,51 @@ export function registerConfiguredPurchaseTopicIngestion(
householdConfigurationRepository,
record.householdId
)
const route =
getCachedTopicMessageRoute(ctx, 'purchase') ??
(await routePurchaseTopicMessage({
ctx,
record,
locale,
repository,
router: options.router,
memoryStore: options.memoryStore,
assistantContext: assistantConfig.assistantContext,
assistantTone: assistantConfig.assistantTone
}))
cacheTopicMessageRoute(ctx, 'purchase', route)
if (route.route === 'silent') {
await next()
return
}
if (route.shouldClearWorkflow) {
await repository.clearClarificationContext?.(record)
}
if (route.route === 'chat_reply' || route.route === 'dismiss_workflow') {
if (route.replyText) {
await replyToPurchaseMessage(ctx, route.replyText)
appendConversation(options.memoryStore, record, record.rawText, route.replyText)
}
return
}
if (route.route === 'topic_helper') {
await next()
return
}
if (route.route !== 'purchase_candidate' && route.route !== 'purchase_followup') {
await next()
return
}
typingIndicator =
options.interpreter && route.shouldStartTyping ? startTypingIndicator(ctx) : null
const pendingReply =
options.interpreter && engagement.showProcessingReply
options.interpreter && shouldShowProcessingReply(ctx, record, route)
? await sendPurchaseProcessingReply(ctx, getBotTranslations(locale).purchase.processing)
: null
const result = await repository.save(
@@ -1892,7 +2076,7 @@ export function registerConfiguredPurchaseTopicIngestion(
assistantTone: assistantConfig.assistantTone
}
)
if (engagement.kind === 'direct' && result.status === 'ignored_not_purchase') {
if (result.status === 'ignored_not_purchase') {
return await next()
}

View File

@@ -0,0 +1,421 @@
import type { Context } from 'grammy'
import { extractOpenAiResponseText, parseJsonFromResponseText } from './openai-responses'
export type TopicMessageRole = 'generic' | 'purchase' | 'payments' | 'reminders' | 'feedback'
export type TopicWorkflowState =
| 'purchase_clarification'
| 'payment_clarification'
| 'payment_confirmation'
| null
export type TopicMessageRoute =
| 'silent'
| 'chat_reply'
| 'purchase_candidate'
| 'purchase_followup'
| 'payment_candidate'
| 'payment_followup'
| 'topic_helper'
| 'dismiss_workflow'
export interface TopicMessageRoutingInput {
locale: 'en' | 'ru'
topicRole: TopicMessageRole
messageText: string
isExplicitMention: boolean
isReplyToBot: boolean
activeWorkflow: TopicWorkflowState
assistantContext?: string | null
assistantTone?: string | null
recentTurns?: readonly {
role: 'user' | 'assistant'
text: string
}[]
}
export interface TopicMessageRoutingResult {
route: TopicMessageRoute
replyText: string | null
helperKind: 'assistant' | 'purchase' | 'payment' | 'reminder' | null
shouldStartTyping: boolean
shouldClearWorkflow: boolean
confidence: number
reason: string | null
}
export type TopicMessageRouter = (
input: TopicMessageRoutingInput
) => Promise<TopicMessageRoutingResult>
const topicMessageRouteCacheKey = Symbol('topic-message-route-cache')
type CachedTopicMessageRole = Extract<TopicMessageRole, 'purchase' | 'payments'>
type TopicMessageRouteCacheEntry = {
topicRole: CachedTopicMessageRole
route: TopicMessageRoutingResult
}
type ContextWithTopicMessageRouteCache = Context & {
[topicMessageRouteCacheKey]?: TopicMessageRouteCacheEntry
}
const BACKOFF_PATTERN =
/\b(?:leave me alone|go away|stop|not now|back off|shut up)\b|(?:^|[^\p{L}])(?:отстань|хватит|не сейчас|замолчи|оставь(?:\s+меня)?\s+в\s+покое)(?=$|[^\p{L}])/iu
const PLANNING_PATTERN =
/\b(?:want to buy|thinking about buying|thinking of buying|going to buy|plan to buy|might buy)\b|(?:^|[^\p{L}])(?:хочу|думаю|планирую|может)\s+(?:купить|взять|заказать)(?=$|[^\p{L}])/iu
const LIKELY_PURCHASE_PATTERN =
/\b(?:bought|ordered|picked up|spent|paid)\b|(?:^|[^\p{L}])(?:купил(?:а|и)?|взял(?:а|и)?|заказал(?:а|и)?|потратил(?:а|и)?|заплатил(?:а|и)?|сторговался(?:\s+до)?)(?=$|[^\p{L}])/iu
const LIKELY_PAYMENT_PATTERN =
/\b(?:paid rent|paid utilities|rent paid|utilities paid)\b|(?:^|[^\p{L}])(?:оплатил(?:а|и)?|заплатил(?:а|и)?)(?=$|[^\p{L}])/iu
const LETTER_PATTERN = /\p{L}/u
function normalizeRoute(value: string): TopicMessageRoute {
return value === 'chat_reply' ||
value === 'purchase_candidate' ||
value === 'purchase_followup' ||
value === 'payment_candidate' ||
value === 'payment_followup' ||
value === 'topic_helper' ||
value === 'dismiss_workflow'
? value
: 'silent'
}
function normalizeHelperKind(value: string | null): TopicMessageRoutingResult['helperKind'] {
return value === 'assistant' ||
value === 'purchase' ||
value === 'payment' ||
value === 'reminder'
? value
: null
}
function normalizeConfidence(value: number | null | undefined): number {
if (typeof value !== 'number' || Number.isNaN(value)) {
return 0
}
return Math.max(0, Math.min(100, Math.round(value)))
}
function fallbackReply(locale: 'en' | 'ru', kind: 'backoff' | 'watching'): string {
if (locale === 'ru') {
return kind === 'backoff'
? 'Окей, молчу.'
: 'Я тут. Если будет реальная покупка или оплата, подключусь.'
}
return kind === 'backoff'
? "Okay, I'll back off."
: "I'm here. If there's a real purchase or payment, I'll jump in."
}
export function fallbackTopicMessageRoute(
input: TopicMessageRoutingInput
): TopicMessageRoutingResult {
const normalized = input.messageText.trim()
const isAddressed = input.isExplicitMention || input.isReplyToBot
if (normalized.length === 0 || !LETTER_PATTERN.test(normalized)) {
return {
route: 'silent',
replyText: null,
helperKind: null,
shouldStartTyping: false,
shouldClearWorkflow: false,
confidence: 100,
reason: 'empty'
}
}
if (BACKOFF_PATTERN.test(normalized)) {
return {
route: 'dismiss_workflow',
replyText: isAddressed ? fallbackReply(input.locale, 'backoff') : null,
helperKind: null,
shouldStartTyping: false,
shouldClearWorkflow: input.activeWorkflow !== null,
confidence: 94,
reason: 'backoff'
}
}
if (input.topicRole === 'purchase') {
if (input.activeWorkflow === 'purchase_clarification') {
return {
route: 'purchase_followup',
replyText: null,
helperKind: 'purchase',
shouldStartTyping: true,
shouldClearWorkflow: false,
confidence: 72,
reason: 'active_purchase_workflow'
}
}
if (!PLANNING_PATTERN.test(normalized) && LIKELY_PURCHASE_PATTERN.test(normalized)) {
return {
route: 'purchase_candidate',
replyText: null,
helperKind: 'purchase',
shouldStartTyping: true,
shouldClearWorkflow: false,
confidence: 70,
reason: 'likely_purchase'
}
}
}
if (input.topicRole === 'payments') {
if (
input.activeWorkflow === 'payment_clarification' ||
input.activeWorkflow === 'payment_confirmation'
) {
return {
route: 'payment_followup',
replyText: null,
helperKind: 'payment',
shouldStartTyping: false,
shouldClearWorkflow: false,
confidence: 72,
reason: 'active_payment_workflow'
}
}
if (!PLANNING_PATTERN.test(normalized) && LIKELY_PAYMENT_PATTERN.test(normalized)) {
return {
route: 'payment_candidate',
replyText: null,
helperKind: 'payment',
shouldStartTyping: false,
shouldClearWorkflow: false,
confidence: 68,
reason: 'likely_payment'
}
}
}
if (isAddressed) {
return {
route: 'topic_helper',
replyText: null,
helperKind: 'assistant',
shouldStartTyping: true,
shouldClearWorkflow: false,
confidence: 60,
reason: 'addressed'
}
}
return {
route: 'silent',
replyText: null,
helperKind: null,
shouldStartTyping: false,
shouldClearWorkflow: false,
confidence: 70,
reason: 'quiet_default'
}
}
function buildRecentTurns(input: TopicMessageRoutingInput): string | null {
const recentTurns = input.recentTurns
?.slice(-4)
.map((turn) => `${turn.role}: ${turn.text.trim()}`)
.filter((line) => line.length > 0)
return recentTurns && recentTurns.length > 0
? ['Recent conversation with this user in the household chat:', ...recentTurns].join('\n')
: null
}
export function cacheTopicMessageRoute(
ctx: Context,
topicRole: CachedTopicMessageRole,
route: TopicMessageRoutingResult
): void {
;(ctx as ContextWithTopicMessageRouteCache)[topicMessageRouteCacheKey] = {
topicRole,
route
}
}
export function getCachedTopicMessageRoute(
ctx: Context,
topicRole: CachedTopicMessageRole
): TopicMessageRoutingResult | null {
const cached = (ctx as ContextWithTopicMessageRouteCache)[topicMessageRouteCacheKey]
return cached?.topicRole === topicRole ? cached.route : null
}
export function createOpenAiTopicMessageRouter(
apiKey: string | undefined,
model: string,
timeoutMs: number
): TopicMessageRouter | undefined {
if (!apiKey) {
return undefined
}
return async (input) => {
const abortController = new AbortController()
const timeout = setTimeout(() => abortController.abort(), timeoutMs)
try {
const response = await fetch('https://api.openai.com/v1/responses', {
method: 'POST',
signal: abortController.signal,
headers: {
authorization: `Bearer ${apiKey}`,
'content-type': 'application/json'
},
body: JSON.stringify({
model,
input: [
{
role: 'system',
content: [
'You are a first-pass router for a household Telegram bot in a group chat topic.',
'Your job is to decide whether the bot should stay silent, send a short playful reply, continue a workflow, or invoke a heavier helper.',
'Prefer silence over speaking.',
'Do not start purchase or payment workflows for planning, hypotheticals, negotiations, tests, or obvious jokes.',
'Treat “stop”, “leave me alone”, “just thinking”, “not a purchase”, and similar messages as backoff or dismissal signals.',
'When the user directly addresses the bot with small talk, joking, or testing, prefer chat_reply with one short sentence.',
'Use topic_helper only when the message is a real question or request that likely needs household knowledge or a topic-specific helper.',
'Use purchase_candidate only for a clear completed shared purchase.',
'Use purchase_followup only when there is active purchase clarification and the latest message looks like a real answer to it.',
'Use payment_candidate only for a clear payment confirmation.',
'Use payment_followup only when there is active payment clarification/confirmation and the latest message looks like a real answer to it.',
'For absurd or playful messages, be light and short. Never loop or interrogate.',
'Set shouldStartTyping to true only if the chosen route will likely trigger a slower helper or assistant call.',
input.assistantTone ? `Use this tone lightly: ${input.assistantTone}.` : null,
input.assistantContext
? `Household flavor context: ${input.assistantContext}`
: null,
'Return only JSON matching the schema.'
]
.filter(Boolean)
.join(' ')
},
{
role: 'user',
content: [
`User locale: ${input.locale}`,
`Topic role: ${input.topicRole}`,
`Explicit mention: ${input.isExplicitMention ? 'yes' : 'no'}`,
`Reply to bot: ${input.isReplyToBot ? 'yes' : 'no'}`,
`Active workflow: ${input.activeWorkflow ?? 'none'}`,
buildRecentTurns(input),
`Latest message:\n${input.messageText}`
]
.filter(Boolean)
.join('\n\n')
}
],
text: {
format: {
type: 'json_schema',
name: 'topic_message_route',
schema: {
type: 'object',
additionalProperties: false,
properties: {
route: {
type: 'string',
enum: [
'silent',
'chat_reply',
'purchase_candidate',
'purchase_followup',
'payment_candidate',
'payment_followup',
'topic_helper',
'dismiss_workflow'
]
},
replyText: {
anyOf: [{ type: 'string' }, { type: 'null' }]
},
helperKind: {
anyOf: [
{
type: 'string',
enum: ['assistant', 'purchase', 'payment', 'reminder']
},
{ type: 'null' }
]
},
shouldStartTyping: {
type: 'boolean'
},
shouldClearWorkflow: {
type: 'boolean'
},
confidence: {
type: 'number',
minimum: 0,
maximum: 100
},
reason: {
anyOf: [{ type: 'string' }, { type: 'null' }]
}
},
required: [
'route',
'replyText',
'helperKind',
'shouldStartTyping',
'shouldClearWorkflow',
'confidence',
'reason'
]
}
}
}
})
})
if (!response.ok) {
return fallbackTopicMessageRoute(input)
}
const payload = (await response.json()) as Record<string, unknown>
const text = extractOpenAiResponseText(payload)
const parsed = parseJsonFromResponseText(text ?? '')
if (!parsed || typeof parsed !== 'object' || Array.isArray(parsed)) {
return fallbackTopicMessageRoute(input)
}
const parsedObject = parsed as Record<string, unknown>
const route = normalizeRoute(
typeof parsedObject.route === 'string' ? parsedObject.route : 'silent'
)
const replyText =
typeof parsedObject.replyText === 'string' && parsedObject.replyText.trim().length > 0
? parsedObject.replyText.trim()
: null
return {
route,
replyText,
helperKind:
typeof parsedObject.helperKind === 'string' || parsedObject.helperKind === null
? normalizeHelperKind(parsedObject.helperKind)
: null,
shouldStartTyping: parsedObject.shouldStartTyping === true,
shouldClearWorkflow: parsedObject.shouldClearWorkflow === true,
confidence: normalizeConfidence(
typeof parsedObject.confidence === 'number' ? parsedObject.confidence : null
),
reason: typeof parsedObject.reason === 'string' ? parsedObject.reason : null
}
} catch {
return fallbackTopicMessageRoute(input)
} finally {
clearTimeout(timeout)
}
}
}