feat(bot): support tagged assistant replies in topics

This commit is contained in:
2026-03-11 12:15:56 +04:00
parent 9553ca342a
commit fce2d30beb
7 changed files with 587 additions and 93 deletions

View File

@@ -58,6 +58,29 @@ function privateMessageUpdate(text: string) {
}
}
function topicMentionUpdate(text: string) {
return {
update_id: 3001,
message: {
message_id: 88,
date: Math.floor(Date.now() / 1000),
message_thread_id: 777,
is_topic_message: true,
chat: {
id: -100123,
type: 'supergroup'
},
from: {
id: 123456,
is_bot: false,
first_name: 'Stan',
language_code: 'en'
},
text
}
}
}
function privateCallbackUpdate(data: string) {
return {
update_id: 2002,
@@ -943,6 +966,86 @@ describe('registerDmAssistant', () => {
})
})
test('replies as the general assistant when explicitly mentioned in a household topic', async () => {
const bot = createTestBot()
const calls: Array<{ method: string; payload: unknown }> = []
let assistantCalls = 0
bot.api.config.use(async (_prev, method, payload) => {
calls.push({ method, payload })
if (method === 'sendMessage') {
return {
ok: true,
result: {
message_id: calls.length,
date: Math.floor(Date.now() / 1000),
chat: {
id: -100123,
type: 'supergroup'
},
text: (payload as { text?: string }).text ?? 'ok'
}
} as never
}
return {
ok: true,
result: true
} as never
})
registerDmAssistant({
bot,
assistant: {
async respond(input) {
assistantCalls += 1
expect(input.userMessage).toBe('how is life?')
return {
text: 'Still standing.',
usage: {
inputTokens: 15,
outputTokens: 4,
totalTokens: 19
}
}
}
},
householdConfigurationRepository: createHouseholdRepository(),
promptRepository: createPromptRepository(),
financeServiceForHousehold: () => createFinanceService(),
memoryStore: createInMemoryAssistantConversationMemoryStore(12),
rateLimiter: createInMemoryAssistantRateLimiter({
burstLimit: 5,
burstWindowMs: 60_000,
rollingLimit: 50,
rollingWindowMs: 86_400_000
}),
usageTracker: createInMemoryAssistantUsageTracker()
})
await bot.handleUpdate(topicMentionUpdate('@household_test_bot how is life?') as never)
expect(assistantCalls).toBe(1)
expect(calls).toHaveLength(2)
expect(calls[0]).toMatchObject({
method: 'sendChatAction',
payload: {
chat_id: -100123,
action: 'typing',
message_thread_id: 777
}
})
expect(calls[1]).toMatchObject({
method: 'sendMessage',
payload: {
chat_id: -100123,
message_thread_id: 777,
text: 'Still standing.'
}
})
})
test('ignores duplicate deliveries of the same DM update', async () => {
const bot = createTestBot()
const calls: Array<{ method: string; payload: unknown }> = []

View File

@@ -19,6 +19,7 @@ import type {
PurchaseTopicRecord
} from './purchase-topic-ingestion'
import { startTypingIndicator } from './telegram-chat-action'
import { stripExplicitBotMention } from './telegram-mentions'
const ASSISTANT_PAYMENT_ACTION = 'assistant_payment_confirmation' as const
const ASSISTANT_PAYMENT_CONFIRM_CALLBACK_PREFIX = 'assistant_payment:confirm:'
@@ -26,6 +27,7 @@ const ASSISTANT_PAYMENT_CANCEL_CALLBACK_PREFIX = 'assistant_payment:cancel:'
const ASSISTANT_PURCHASE_CONFIRM_CALLBACK_PREFIX = 'assistant_purchase:confirm:'
const ASSISTANT_PURCHASE_CANCEL_CALLBACK_PREFIX = 'assistant_purchase:cancel:'
const DM_ASSISTANT_MESSAGE_SOURCE = 'telegram-dm-assistant'
const GROUP_ASSISTANT_MESSAGE_SOURCE = 'telegram-group-assistant'
const MEMORY_SUMMARY_MAX_CHARS = 1200
const PURCHASE_VERB_PATTERN =
/\b(?:bought|buy|got|picked up|spent|купил(?:а|и)?|взял(?:а|и)?|выложил(?:а|и)?|отдал(?:а|и)?|потратил(?:а|и)?)\b/iu
@@ -106,6 +108,10 @@ function isPrivateChat(ctx: Context): boolean {
return ctx.chat?.type === 'private'
}
function isGroupChat(ctx: Context): boolean {
return ctx.chat?.type === 'group' || ctx.chat?.type === 'supergroup'
}
function isCommandMessage(ctx: Context): boolean {
return typeof ctx.msg?.text === 'string' && ctx.msg.text.trim().startsWith('/')
}
@@ -123,6 +129,16 @@ function summarizeTurns(
: next.slice(next.length - MEMORY_SUMMARY_MAX_CHARS)
}
function conversationMemoryKey(input: {
telegramUserId: string
telegramChatId: string
isPrivateChat: boolean
}): string {
return input.isPrivateChat
? input.telegramUserId
: `group:${input.telegramChatId}:${input.telegramUserId}`
}
export function createInMemoryAssistantConversationMemoryStore(
maxTurns: number
): AssistantConversationMemoryStore {
@@ -455,6 +471,118 @@ async function buildHouseholdContext(input: {
return lines.join('\n')
}
async function replyWithAssistant(input: {
ctx: Context
assistant: ConversationalAssistant | undefined
householdId: string
memberId: string
memberDisplayName: string
telegramUserId: string
telegramChatId: string
locale: BotLocale
userMessage: string
householdConfigurationRepository: HouseholdConfigurationRepository
financeService: FinanceCommandService
memoryStore: AssistantConversationMemoryStore
usageTracker: AssistantUsageTracker
logger: Logger | undefined
}): Promise<void> {
const t = getBotTranslations(input.locale).assistant
if (!input.assistant) {
await input.ctx.reply(t.unavailable)
return
}
const memoryKey = conversationMemoryKey({
telegramUserId: input.telegramUserId,
telegramChatId: input.telegramChatId,
isPrivateChat: isPrivateChat(input.ctx)
})
const memory = input.memoryStore.get(memoryKey)
const typingIndicator = startTypingIndicator(input.ctx)
const assistantStartedAt = Date.now()
let stage: 'household_context' | 'assistant_response' = 'household_context'
let contextBuildMs: number | null = null
let assistantResponseMs: number | null = null
try {
const contextStartedAt = Date.now()
const householdContext = await buildHouseholdContext({
householdId: input.householdId,
memberId: input.memberId,
memberDisplayName: input.memberDisplayName,
locale: input.locale,
householdConfigurationRepository: input.householdConfigurationRepository,
financeService: input.financeService
})
contextBuildMs = Date.now() - contextStartedAt
stage = 'assistant_response'
const assistantResponseStartedAt = Date.now()
const reply = await input.assistant.respond({
locale: input.locale,
householdContext,
memorySummary: memory.summary,
recentTurns: memory.turns,
userMessage: input.userMessage
})
assistantResponseMs = Date.now() - assistantResponseStartedAt
input.usageTracker.record({
householdId: input.householdId,
telegramUserId: input.telegramUserId,
displayName: input.memberDisplayName,
usage: reply.usage
})
input.memoryStore.appendTurn(memoryKey, {
role: 'user',
text: input.userMessage
})
input.memoryStore.appendTurn(memoryKey, {
role: 'assistant',
text: reply.text
})
input.logger?.info(
{
event: 'assistant.reply',
householdId: input.householdId,
telegramUserId: input.telegramUserId,
contextBuildMs,
assistantResponseMs,
totalDurationMs: Date.now() - assistantStartedAt,
householdContextChars: householdContext.length,
recentTurnsCount: memory.turns.length,
memorySummaryChars: memory.summary?.length ?? 0,
inputTokens: reply.usage.inputTokens,
outputTokens: reply.usage.outputTokens,
totalTokens: reply.usage.totalTokens
},
'Assistant reply generated'
)
await input.ctx.reply(reply.text)
} catch (error) {
input.logger?.error(
{
event: 'assistant.reply_failed',
householdId: input.householdId,
telegramUserId: input.telegramUserId,
stage,
contextBuildMs,
assistantResponseMs,
totalDurationMs: Date.now() - assistantStartedAt,
...describeError(error),
error
},
'Assistant reply failed'
)
await input.ctx.reply(t.unavailable)
} finally {
typingIndicator.stop()
}
}
export function registerDmAssistant(options: {
bot: Bot
assistant?: ConversationalAssistant
@@ -741,6 +869,11 @@ export function registerDmAssistant(options: {
await next()
return
}
const memoryKey = conversationMemoryKey({
telegramUserId,
telegramChatId,
isPrivateChat: true
})
const memberships =
await options.householdConfigurationRepository.listHouseholdMembersByTelegramUserId(
@@ -831,11 +964,11 @@ export function registerDmAssistant(options: {
? buildPurchaseClarificationText(locale, purchaseResult)
: getBotTranslations(locale).purchase.parseFailed
options.memoryStore.appendTurn(telegramUserId, {
options.memoryStore.appendTurn(memoryKey, {
role: 'user',
text: ctx.msg.text
})
options.memoryStore.appendTurn(telegramUserId, {
options.memoryStore.appendTurn(memoryKey, {
role: 'assistant',
text: purchaseText
})
@@ -902,11 +1035,11 @@ export function registerDmAssistant(options: {
amount.toMajorString(),
amount.currency
)
options.memoryStore.appendTurn(telegramUserId, {
options.memoryStore.appendTurn(memoryKey, {
role: 'user',
text: ctx.msg.text
})
options.memoryStore.appendTurn(telegramUserId, {
options.memoryStore.appendTurn(memoryKey, {
role: 'assistant',
text: proposalText
})
@@ -917,93 +1050,22 @@ export function registerDmAssistant(options: {
return
}
if (!options.assistant) {
await ctx.reply(t.unavailable)
return
}
const memory = options.memoryStore.get(telegramUserId)
const typingIndicator = startTypingIndicator(ctx)
const assistantStartedAt = Date.now()
let stage: 'household_context' | 'assistant_response' = 'household_context'
let contextBuildMs: number | null = null
let assistantResponseMs: number | null = null
try {
const contextStartedAt = Date.now()
const householdContext = await buildHouseholdContext({
householdId: member.householdId,
memberId: member.id,
memberDisplayName: member.displayName,
locale,
householdConfigurationRepository: options.householdConfigurationRepository,
financeService
})
contextBuildMs = Date.now() - contextStartedAt
stage = 'assistant_response'
const assistantResponseStartedAt = Date.now()
const reply = await options.assistant.respond({
locale,
householdContext,
memorySummary: memory.summary,
recentTurns: memory.turns,
userMessage: ctx.msg.text
})
assistantResponseMs = Date.now() - assistantResponseStartedAt
options.usageTracker.record({
householdId: member.householdId,
telegramUserId,
displayName: member.displayName,
usage: reply.usage
})
options.memoryStore.appendTurn(telegramUserId, {
role: 'user',
text: ctx.msg.text
})
options.memoryStore.appendTurn(telegramUserId, {
role: 'assistant',
text: reply.text
})
options.logger?.info(
{
event: 'assistant.reply',
householdId: member.householdId,
telegramUserId,
contextBuildMs,
assistantResponseMs,
totalDurationMs: Date.now() - assistantStartedAt,
householdContextChars: householdContext.length,
recentTurnsCount: memory.turns.length,
memorySummaryChars: memory.summary?.length ?? 0,
inputTokens: reply.usage.inputTokens,
outputTokens: reply.usage.outputTokens,
totalTokens: reply.usage.totalTokens
},
'DM assistant reply generated'
)
await ctx.reply(reply.text)
} catch (error) {
options.logger?.error(
{
event: 'assistant.reply_failed',
householdId: member.householdId,
telegramUserId,
stage,
contextBuildMs,
assistantResponseMs,
totalDurationMs: Date.now() - assistantStartedAt,
...describeError(error),
error
},
'DM assistant reply failed'
)
await ctx.reply(t.unavailable)
} finally {
typingIndicator.stop()
}
await replyWithAssistant({
ctx,
assistant: options.assistant,
householdId: member.householdId,
memberId: member.id,
memberDisplayName: member.displayName,
telegramUserId,
telegramChatId,
locale,
userMessage: ctx.msg.text,
householdConfigurationRepository: options.householdConfigurationRepository,
financeService,
memoryStore: options.memoryStore,
usageTracker: options.usageTracker,
logger: options.logger
})
} catch (error) {
if (dedupeClaim) {
await dedupeClaim.repository.releaseMessage({
@@ -1016,4 +1078,108 @@ export function registerDmAssistant(options: {
throw error
}
})
options.bot.on('message:text', async (ctx, next) => {
if (!isGroupChat(ctx) || isCommandMessage(ctx)) {
await next()
return
}
const mention = stripExplicitBotMention(ctx)
if (!mention || mention.strippedText.length === 0) {
await next()
return
}
const telegramUserId = ctx.from?.id?.toString()
const telegramChatId = ctx.chat?.id?.toString()
if (!telegramUserId || !telegramChatId) {
await next()
return
}
const household =
await options.householdConfigurationRepository.getTelegramHouseholdChat(telegramChatId)
if (!household) {
await next()
return
}
const member = await options.householdConfigurationRepository.getHouseholdMember(
household.householdId,
telegramUserId
)
if (!member) {
await next()
return
}
const locale = member.preferredLocale ?? household.defaultLocale ?? 'en'
const rateLimit = options.rateLimiter.consume(`${household.householdId}:${telegramUserId}`)
const t = getBotTranslations(locale).assistant
if (!rateLimit.allowed) {
await ctx.reply(t.rateLimited(formatRetryDelay(locale, rateLimit.retryAfterMs)))
return
}
const updateId = ctx.update.update_id?.toString()
const dedupeClaim =
options.messageProcessingRepository && typeof updateId === 'string'
? {
repository: options.messageProcessingRepository,
updateId
}
: null
if (dedupeClaim) {
const claim = await dedupeClaim.repository.claimMessage({
householdId: household.householdId,
source: GROUP_ASSISTANT_MESSAGE_SOURCE,
sourceMessageKey: dedupeClaim.updateId
})
if (!claim.claimed) {
options.logger?.info(
{
event: 'assistant.duplicate_update',
householdId: household.householdId,
telegramUserId,
updateId: dedupeClaim.updateId
},
'Duplicate group assistant mention ignored'
)
return
}
}
try {
await replyWithAssistant({
ctx,
assistant: options.assistant,
householdId: household.householdId,
memberId: member.id,
memberDisplayName: member.displayName,
telegramUserId,
telegramChatId,
locale,
userMessage: mention.strippedText,
householdConfigurationRepository: options.householdConfigurationRepository,
financeService: options.financeServiceForHousehold(household.householdId),
memoryStore: options.memoryStore,
usageTracker: options.usageTracker,
logger: options.logger
})
} catch (error) {
if (dedupeClaim) {
await dedupeClaim.repository.releaseMessage({
householdId: household.householdId,
source: GROUP_ASSISTANT_MESSAGE_SOURCE,
sourceMessageKey: dedupeClaim.updateId
})
}
throw error
}
})
}

View File

@@ -556,4 +556,90 @@ describe('registerConfiguredPaymentTopicIngestion', () => {
expect(paymentConfirmationService.submitted).toHaveLength(0)
})
test('skips explicitly tagged bot messages in the payments topic', async () => {
const bot = createTelegramBot('000000:test-token')
const calls: Array<{ method: string; payload: unknown }> = []
const promptRepository = createPromptRepository()
const paymentConfirmationService = createPaymentConfirmationService()
bot.botInfo = {
id: 999000,
is_bot: true,
first_name: 'Household Test Bot',
username: 'household_test_bot',
can_join_groups: true,
can_read_all_group_messages: false,
supports_inline_queries: false,
can_connect_to_business: false,
has_main_web_app: false,
has_topics_enabled: true,
allows_users_to_create_topics: false
}
bot.api.config.use(async (_prev, method, payload) => {
calls.push({ method, payload })
return {
ok: true,
result: true
} as never
})
registerConfiguredPaymentTopicIngestion(
bot,
createHouseholdRepository() as never,
promptRepository,
() => createFinanceService(),
() => paymentConfirmationService
)
await bot.handleUpdate(paymentUpdate('@household_test_bot как жизнь?') as never)
expect(calls).toHaveLength(0)
expect(await promptRepository.getPendingAction('-10012345', '10002')).toBeNull()
})
test('still handles tagged payment-like messages in the payments topic', async () => {
const bot = createTelegramBot('000000:test-token')
const calls: Array<{ method: string; payload: unknown }> = []
const promptRepository = createPromptRepository()
const paymentConfirmationService = createPaymentConfirmationService()
bot.botInfo = {
id: 999000,
is_bot: true,
first_name: 'Household Test Bot',
username: 'household_test_bot',
can_join_groups: true,
can_read_all_group_messages: false,
supports_inline_queries: false,
can_connect_to_business: false,
has_main_web_app: false,
has_topics_enabled: true,
allows_users_to_create_topics: false
}
bot.api.config.use(async (_prev, method, payload) => {
calls.push({ method, payload })
return {
ok: true,
result: true
} as never
})
registerConfiguredPaymentTopicIngestion(
bot,
createHouseholdRepository() as never,
promptRepository,
() => createFinanceService(),
() => paymentConfirmationService
)
await bot.handleUpdate(paymentUpdate('@household_test_bot за жилье закинул') as never)
expect(calls).toHaveLength(1)
expect(calls[0]?.payload).toMatchObject({
text: 'Я могу записать эту оплату аренды: 472.50 GEL. Подтвердите или отмените ниже.'
})
})
})

View File

@@ -15,6 +15,7 @@ import {
parsePaymentProposalPayload,
synthesizePaymentConfirmationText
} from './payment-proposals'
import { stripExplicitBotMention } from './telegram-mentions'
const PAYMENT_TOPIC_CONFIRM_CALLBACK_PREFIX = 'payment_topic:confirm:'
const PAYMENT_TOPIC_CANCEL_CALLBACK_PREFIX = 'payment_topic:cancel:'
@@ -95,7 +96,7 @@ function attachmentCount(ctx: Context): number {
function toCandidateFromContext(ctx: Context): PaymentTopicCandidate | null {
const message = ctx.message
const rawText = readMessageText(ctx)
const rawText = stripExplicitBotMention(ctx)?.strippedText ?? readMessageText(ctx)
if (!message || !rawText) {
return null
}

View File

@@ -542,6 +542,95 @@ describe('registerPurchaseTopicIngestion', () => {
expect(calls).toHaveLength(0)
})
test('skips explicitly tagged bot messages in the purchase topic', async () => {
const bot = createTestBot()
const calls: Array<{ method: string; payload: unknown }> = []
let saveCalls = 0
bot.api.config.use(async (_prev, method, payload) => {
calls.push({ method, payload })
return {
ok: true,
result: true
} as never
})
const repository: PurchaseMessageIngestionRepository = {
async hasClarificationContext() {
return false
},
async save() {
saveCalls += 1
return {
status: 'ignored_not_purchase' as const,
purchaseMessageId: 'ignored-1'
}
},
async confirm() {
throw new Error('not used')
},
async cancel() {
throw new Error('not used')
}
}
registerPurchaseTopicIngestion(bot, config, repository)
await bot.handleUpdate(purchaseUpdate('@household_test_bot how is life?') as never)
expect(saveCalls).toBe(1)
expect(calls).toHaveLength(0)
})
test('still handles tagged purchase-like messages in the purchase topic', async () => {
const bot = createTestBot()
const calls: Array<{ method: string; payload: unknown }> = []
bot.api.config.use(async (_prev, method, payload) => {
calls.push({ method, payload })
return {
ok: true,
result: true
} as never
})
const repository: PurchaseMessageIngestionRepository = {
async hasClarificationContext() {
return false
},
async save(record) {
expect(record.rawText).toBe('Bought toilet paper 30 gel')
return {
status: 'pending_confirmation',
purchaseMessageId: 'proposal-1',
parsedAmountMinor: 3000n,
parsedCurrency: 'GEL',
parsedItemDescription: 'toilet paper',
parserConfidence: 92,
parserMode: 'llm'
}
},
async confirm() {
throw new Error('not used')
},
async cancel() {
throw new Error('not used')
}
}
registerPurchaseTopicIngestion(bot, config, repository)
await bot.handleUpdate(
purchaseUpdate('@household_test_bot Bought toilet paper 30 gel') as never
)
expect(calls).toHaveLength(1)
expect(calls[0]).toMatchObject({
method: 'sendMessage',
payload: {
text: 'I think this shared purchase was: toilet paper - 30.00 GEL. Confirm or cancel below.'
}
})
})
test('confirms a pending proposal and edits the bot message', async () => {
const bot = createTestBot()
const calls: Array<{ method: string; payload: unknown }> = []

View File

@@ -14,6 +14,7 @@ import type {
PurchaseMessageInterpreter
} from './openai-purchase-interpreter'
import { startTypingIndicator } from './telegram-chat-action'
import { stripExplicitBotMention } from './telegram-mentions'
const PURCHASE_CONFIRM_CALLBACK_PREFIX = 'purchase:confirm:'
const PURCHASE_CANCEL_CALLBACK_PREFIX = 'purchase:cancel:'
@@ -392,7 +393,7 @@ function toCandidateFromContext(ctx: Context): PurchaseTopicCandidate | null {
messageId: message.message_id.toString(),
threadId: message.message_thread_id.toString(),
senderTelegramUserId,
rawText: message.text,
rawText: stripExplicitBotMention(ctx)?.strippedText ?? message.text,
messageSentAt: instantFromEpochSeconds(message.date)
}
@@ -1054,6 +1055,9 @@ export function registerPurchaseTopicIngestion(
? await sendPurchaseProcessingReply(ctx, getBotTranslations('en').purchase.processing)
: null
const result = await repository.save(record, options.interpreter, 'GEL')
if (stripExplicitBotMention(ctx) && result.status === 'ignored_not_purchase') {
return await next()
}
await handlePurchaseMessageResult(ctx, record, result, 'en', options.logger, pendingReply)
} catch (error) {
options.logger?.error(
@@ -1130,6 +1134,9 @@ export function registerConfiguredPurchaseTopicIngestion(
options.interpreter,
billingSettings.settlementCurrency
)
if (stripExplicitBotMention(ctx) && result.status === 'ignored_not_purchase') {
return await next()
}
await handlePurchaseMessageResult(ctx, record, result, locale, options.logger, pendingReply)
} catch (error) {

View File

@@ -0,0 +1,42 @@
import type { Context } from 'grammy'
function escapeRegExp(value: string): string {
return value.replace(/[.*+?^${}()|[\]\\]/g, '\\$&')
}
function getMessageText(ctx: Pick<Context, 'msg'>): string | null {
const message = ctx.msg
if (!message || !('text' in message) || typeof message.text !== 'string') {
return null
}
return message.text
}
export function stripExplicitBotMention(ctx: Pick<Context, 'msg' | 'me'>): {
originalText: string
strippedText: string
} | null {
const text = getMessageText(ctx)
const username = ctx.me.username
if (!text || !username) {
return null
}
const mentionPattern = new RegExp(`(^|\\s)@${escapeRegExp(username)}\\b`, 'giu')
if (!mentionPattern.test(text)) {
return null
}
mentionPattern.lastIndex = 0
return {
originalText: text,
strippedText: text.replace(mentionPattern, '$1').replace(/\s+/gu, ' ').trim()
}
}
export function hasExplicitBotMention(ctx: Pick<Context, 'msg' | 'me'>): boolean {
return stripExplicitBotMention(ctx) !== null
}