feat(bot): add topic history-aware assistant replies

This commit is contained in:
2026-03-12 19:06:51 +04:00
parent 5ebae7714c
commit 23faeef738
19 changed files with 4274 additions and 28 deletions

View File

@@ -6,7 +6,9 @@ import type {
HouseholdConfigurationRepository,
ProcessedBotMessageRepository,
TelegramPendingActionRecord,
TelegramPendingActionRepository
TelegramPendingActionRepository,
TopicMessageHistoryRecord,
TopicMessageHistoryRepository
} from '@household/ports'
import { createTelegramBot } from './bot'
@@ -679,6 +681,37 @@ function createProcessedBotMessageRepository(): ProcessedBotMessageRepository {
}
}
function createTopicMessageHistoryRepository(): TopicMessageHistoryRepository {
const rows: TopicMessageHistoryRecord[] = []
return {
async saveMessage(input) {
rows.push(input)
},
async listRecentThreadMessages(input) {
return rows
.filter(
(row) =>
row.householdId === input.householdId &&
row.telegramChatId === input.telegramChatId &&
row.telegramThreadId === input.telegramThreadId
)
.slice(-input.limit)
},
async listRecentChatMessages(input) {
return rows
.filter(
(row) =>
row.householdId === input.householdId &&
row.telegramChatId === input.telegramChatId &&
row.messageSentAt &&
row.messageSentAt.epochMilliseconds >= input.sentAtOrAfter.epochMilliseconds
)
.slice(-input.limit)
}
}
}
describe('registerDmAssistant', () => {
test('replies with a conversational DM answer and records token usage', async () => {
const bot = createTestBot()
@@ -1703,6 +1736,81 @@ Confirm or cancel below.`,
})
})
test('loads persisted thread and same-day chat history for memory-style prompts', async () => {
const bot = createTestBot()
const calls: Array<{ method: string; payload: unknown }> = []
const topicMessageHistoryRepository = createTopicMessageHistoryRepository()
let recentThreadTexts: string[] = []
let sameDayTexts: string[] = []
bot.api.config.use(async (_prev, method, payload) => {
calls.push({ method, payload })
if (method === 'sendMessage') {
return {
ok: true,
result: {
message_id: calls.length,
date: Math.floor(Date.now() / 1000),
chat: {
id: -100123,
type: 'supergroup'
},
text: (payload as { text?: string }).text ?? 'ok'
}
} as never
}
return {
ok: true,
result: true
} as never
})
registerDmAssistant({
bot,
assistant: {
async respond(input) {
recentThreadTexts = input.recentThreadMessages?.map((message) => message.text) ?? []
sameDayTexts = input.sameDayChatMessages?.map((message) => message.text) ?? []
return {
text: 'Yes. You were discussing a TV for the house.',
usage: {
inputTokens: 20,
outputTokens: 9,
totalTokens: 29
}
}
}
},
householdConfigurationRepository: createHouseholdRepository(),
promptRepository: createPromptRepository(),
financeServiceForHousehold: () => createFinanceService(),
memoryStore: createInMemoryAssistantConversationMemoryStore(12),
rateLimiter: createInMemoryAssistantRateLimiter({
burstLimit: 5,
burstWindowMs: 60_000,
rollingLimit: 50,
rollingWindowMs: 86_400_000
}),
usageTracker: createInMemoryAssistantUsageTracker(),
topicMessageHistoryRepository
})
await bot.handleUpdate(topicMessageUpdate('I think we need a TV in the house') as never)
await bot.handleUpdate(topicMessageUpdate('Bot, do you remember what we said today?') as never)
expect(recentThreadTexts).toContain('I think we need a TV in the house')
expect(sameDayTexts).toContain('I think we need a TV in the house')
expect(calls.at(-1)).toMatchObject({
method: 'sendMessage',
payload: {
text: 'Yes. You were discussing a TV for the house.'
}
})
})
test('ignores duplicate deliveries of the same DM update', async () => {
const bot = createTestBot()
const calls: Array<{ method: string; payload: unknown }> = []

View File

@@ -4,7 +4,8 @@ import type { Logger } from '@household/observability'
import type {
HouseholdConfigurationRepository,
ProcessedBotMessageRepository,
TelegramPendingActionRepository
TelegramPendingActionRepository,
TopicMessageHistoryRepository
} from '@household/ports'
import type { Bot, Context } from 'grammy'
@@ -37,6 +38,11 @@ import {
getCachedTopicMessageRoute,
looksLikeDirectBotAddress
} from './topic-message-router'
import {
historyRecordToTurn,
shouldLoadExpandedChatHistory,
startOfCurrentDayInTimezone
} from './topic-history'
import { startTypingIndicator } from './telegram-chat-action'
import { stripExplicitBotMention } from './telegram-mentions'
@@ -319,6 +325,92 @@ async function resolveAssistantConfig(
}
}
function currentThreadId(ctx: Context): string | null {
return ctx.msg && 'message_thread_id' in ctx.msg && ctx.msg.message_thread_id !== undefined
? ctx.msg.message_thread_id.toString()
: null
}
function currentMessageId(ctx: Context): string | null {
return ctx.msg?.message_id?.toString() ?? null
}
function currentMessageSentAt(ctx: Context) {
return typeof ctx.msg?.date === 'number' ? instantFromEpochSeconds(ctx.msg.date) : null
}
async function listRecentThreadMessages(input: {
repository: TopicMessageHistoryRepository | undefined
householdId: string
telegramChatId: string
telegramThreadId: string | null
}) {
if (!input.repository || !input.telegramThreadId) {
return []
}
const messages = await input.repository.listRecentThreadMessages({
householdId: input.householdId,
telegramChatId: input.telegramChatId,
telegramThreadId: input.telegramThreadId,
limit: 8
})
return messages.map(historyRecordToTurn)
}
async function listExpandedChatMessages(input: {
repository: TopicMessageHistoryRepository | undefined
householdId: string
telegramChatId: string
timezone: string
shouldLoad: boolean
}) {
if (!input.repository || !input.shouldLoad) {
return []
}
const messages = await input.repository.listRecentChatMessages({
householdId: input.householdId,
telegramChatId: input.telegramChatId,
sentAtOrAfter: startOfCurrentDayInTimezone(input.timezone),
limit: 40
})
return messages.map(historyRecordToTurn)
}
async function persistIncomingTopicMessage(input: {
repository: TopicMessageHistoryRepository | undefined
householdId: string
telegramChatId: string
telegramThreadId: string | null
telegramMessageId: string | null
telegramUpdateId: string | null
senderTelegramUserId: string
senderDisplayName: string | null
rawText: string
messageSentAt: ReturnType<typeof currentMessageSentAt>
}) {
const normalizedText = input.rawText.trim()
if (!input.repository || normalizedText.length === 0) {
return
}
await input.repository.saveMessage({
householdId: input.householdId,
telegramChatId: input.telegramChatId,
telegramThreadId: input.telegramThreadId,
telegramMessageId: input.telegramMessageId,
telegramUpdateId: input.telegramUpdateId,
senderTelegramUserId: input.senderTelegramUserId,
senderDisplayName: input.senderDisplayName,
isBot: false,
rawText: normalizedText,
messageSentAt: input.messageSentAt
})
}
async function routeGroupAssistantMessage(input: {
router: TopicMessageRouter | undefined
locale: BotLocale
@@ -330,6 +422,12 @@ async function routeGroupAssistantMessage(input: {
assistantTone: string | null
memoryStore: AssistantConversationMemoryStore
memoryKey: string
recentThreadMessages: readonly {
role: 'user' | 'assistant'
speaker: string
text: string
threadId: string | null
}[]
}) {
if (!input.router) {
return fallbackTopicMessageRoute({
@@ -341,7 +439,8 @@ async function routeGroupAssistantMessage(input: {
activeWorkflow: null,
assistantContext: input.assistantContext,
assistantTone: input.assistantTone,
recentTurns: input.memoryStore.get(input.memoryKey).turns
recentTurns: input.memoryStore.get(input.memoryKey).turns,
recentThreadMessages: input.recentThreadMessages
})
}
@@ -354,7 +453,8 @@ async function routeGroupAssistantMessage(input: {
activeWorkflow: null,
assistantContext: input.assistantContext,
assistantTone: input.assistantTone,
recentTurns: input.memoryStore.get(input.memoryKey).turns
recentTurns: input.memoryStore.get(input.memoryKey).turns,
recentThreadMessages: input.recentThreadMessages
})
}
@@ -469,6 +569,7 @@ async function buildHouseholdContext(input: {
async function replyWithAssistant(input: {
ctx: Context
assistant: ConversationalAssistant | undefined
topicRole: TopicMessageRole
householdId: string
memberId: string
memberDisplayName: string
@@ -481,6 +582,18 @@ async function replyWithAssistant(input: {
memoryStore: AssistantConversationMemoryStore
usageTracker: AssistantUsageTracker
logger: Logger | undefined
recentThreadMessages: readonly {
role: 'user' | 'assistant'
speaker: string
text: string
threadId: string | null
}[]
sameDayChatMessages: readonly {
role: 'user' | 'assistant'
speaker: string
text: string
threadId: string | null
}[]
}): Promise<void> {
const t = getBotTranslations(input.locale).assistant
@@ -516,9 +629,12 @@ async function replyWithAssistant(input: {
const assistantResponseStartedAt = Date.now()
const reply = await input.assistant.respond({
locale: input.locale,
topicRole: input.topicRole,
householdContext,
memorySummary: memory.summary,
recentTurns: memory.turns,
recentThreadMessages: input.recentThreadMessages,
sameDayChatMessages: input.sameDayChatMessages,
userMessage: input.userMessage
})
assistantResponseMs = Date.now() - assistantResponseStartedAt
@@ -582,6 +698,7 @@ export function registerDmAssistant(options: {
bot: Bot
assistant?: ConversationalAssistant
topicRouter?: TopicMessageRouter
topicMessageHistoryRepository?: TopicMessageHistoryRepository
purchaseRepository?: PurchaseMessageIngestionRepository
purchaseInterpreter?: PurchaseMessageInterpreter
householdConfigurationRepository: HouseholdConfigurationRepository
@@ -1100,6 +1217,7 @@ export function registerDmAssistant(options: {
await replyWithAssistant({
ctx,
assistant: options.assistant,
topicRole: 'generic',
householdId: member.householdId,
memberId: member.id,
memberDisplayName: member.displayName,
@@ -1111,7 +1229,9 @@ export function registerDmAssistant(options: {
financeService,
memoryStore: options.memoryStore,
usageTracker: options.usageTracker,
logger: options.logger
logger: options.logger,
recentThreadMessages: [],
sameDayChatMessages: []
})
} catch (error) {
if (dedupeClaim) {
@@ -1217,6 +1337,7 @@ export function registerDmAssistant(options: {
telegramChatId,
isPrivateChat: false
})
const telegramThreadId = currentThreadId(ctx)
const messageText = mention?.strippedText ?? ctx.msg.text.trim()
const assistantConfig = await resolveAssistantConfig(
options.householdConfigurationRepository,
@@ -1233,6 +1354,12 @@ export function registerDmAssistant(options: {
topicRole === 'purchase' || topicRole === 'payments'
? getCachedTopicMessageRoute(ctx, topicRole)
: null
const recentThreadMessages = await listRecentThreadMessages({
repository: options.topicMessageHistoryRepository,
householdId: household.householdId,
telegramChatId,
telegramThreadId
})
const route =
cachedRoute ??
(options.topicRouter
@@ -1246,7 +1373,8 @@ export function registerDmAssistant(options: {
assistantContext: assistantConfig.assistantContext,
assistantTone: assistantConfig.assistantTone,
memoryStore: options.memoryStore,
memoryKey
memoryKey,
recentThreadMessages
})
: null)
@@ -1367,6 +1495,7 @@ export function registerDmAssistant(options: {
await replyWithAssistant({
ctx,
assistant: options.assistant,
topicRole,
householdId: household.householdId,
memberId: member.id,
memberDisplayName: member.displayName,
@@ -1378,7 +1507,15 @@ export function registerDmAssistant(options: {
financeService,
memoryStore: options.memoryStore,
usageTracker: options.usageTracker,
logger: options.logger
logger: options.logger,
recentThreadMessages,
sameDayChatMessages: await listExpandedChatMessages({
repository: options.topicMessageHistoryRepository,
householdId: household.householdId,
telegramChatId,
timezone: settings.timezone,
shouldLoad: shouldLoadExpandedChatHistory(messageText)
})
})
} catch (error) {
if (dedupeClaim) {
@@ -1390,6 +1527,19 @@ export function registerDmAssistant(options: {
}
throw error
} finally {
await persistIncomingTopicMessage({
repository: options.topicMessageHistoryRepository,
householdId: household.householdId,
telegramChatId,
telegramThreadId: currentThreadId(ctx),
telegramMessageId: currentMessageId(ctx),
telegramUpdateId: ctx.update.update_id?.toString() ?? null,
senderTelegramUserId: telegramUserId,
senderDisplayName: ctx.from?.first_name ?? member.displayName ?? ctx.from?.username ?? null,
rawText: mention?.strippedText ?? ctx.msg.text.trim(),
messageSentAt: currentMessageSentAt(ctx)
})
}
})
}

View File

@@ -18,7 +18,8 @@ import {
createDbHouseholdConfigurationRepository,
createDbProcessedBotMessageRepository,
createDbReminderDispatchRepository,
createDbTelegramPendingActionRepository
createDbTelegramPendingActionRepository,
createDbTopicMessageHistoryRepository
} from '@household/adapters-db'
import { configureLogger, getLogger } from '@household/observability'
@@ -127,6 +128,9 @@ const processedBotMessageRepositoryClient =
const purchaseRepositoryClient = runtime.databaseUrl
? createPurchaseMessageRepository(runtime.databaseUrl!)
: null
const topicMessageHistoryRepositoryClient = runtime.databaseUrl
? createDbTopicMessageHistoryRepository(runtime.databaseUrl!)
: null
const purchaseInterpreter = createOpenAiPurchaseInterpreter(
runtime.openaiApiKey,
runtime.purchaseParserModel
@@ -237,6 +241,10 @@ if (purchaseRepositoryClient) {
shutdownTasks.push(purchaseRepositoryClient.close)
}
if (topicMessageHistoryRepositoryClient) {
shutdownTasks.push(topicMessageHistoryRepositoryClient.close)
}
if (purchaseRepositoryClient && householdConfigurationRepositoryClient) {
registerConfiguredPurchaseTopicIngestion(
bot,
@@ -246,7 +254,12 @@ if (purchaseRepositoryClient && householdConfigurationRepositoryClient) {
...(topicMessageRouter
? {
router: topicMessageRouter,
memoryStore: assistantMemoryStore
memoryStore: assistantMemoryStore,
...(topicMessageHistoryRepositoryClient
? {
historyRepository: topicMessageHistoryRepositoryClient.repository
}
: {})
}
: {}),
...(purchaseInterpreter
@@ -268,7 +281,12 @@ if (purchaseRepositoryClient && householdConfigurationRepositoryClient) {
...(topicMessageRouter
? {
router: topicMessageRouter,
memoryStore: assistantMemoryStore
memoryStore: assistantMemoryStore,
...(topicMessageHistoryRepositoryClient
? {
historyRepository: topicMessageHistoryRepositoryClient.repository
}
: {})
}
: {}),
logger: getLogger('payment-ingestion')
@@ -440,6 +458,11 @@ if (
purchaseRepository: purchaseRepositoryClient.repository
}
: {}),
...(topicMessageHistoryRepositoryClient
? {
topicMessageHistoryRepository: topicMessageHistoryRepositoryClient.repository
}
: {}),
...(purchaseInterpreter
? {
purchaseInterpreter
@@ -471,6 +494,11 @@ if (
purchaseRepository: purchaseRepositoryClient.repository
}
: {}),
...(topicMessageHistoryRepositoryClient
? {
topicMessageHistoryRepository: topicMessageHistoryRepositoryClient.repository
}
: {}),
...(purchaseInterpreter
? {
purchaseInterpreter

View File

@@ -41,6 +41,7 @@ describe('createOpenAiChatAssistant', () => {
try {
const reply = await assistant!.respond({
locale: 'en',
topicRole: 'reminders',
householdContext: 'Household: Kojori House',
memorySummary: null,
recentTurns: [],
@@ -51,10 +52,16 @@ describe('createOpenAiChatAssistant', () => {
expect(capturedBody).not.toBeNull()
expect(capturedBody!.max_output_tokens).toBe(220)
expect(capturedBody!.model).toBe('gpt-5-mini')
expect(capturedBody!.input[0]).toMatchObject({
role: 'system',
content: expect.stringContaining('Default to one to three short sentences.')
})
expect(capturedBody!.input[0]?.role).toBe('system')
expect(capturedBody!.input[0]?.content).toContain('Default to one to three short sentences.')
expect(capturedBody!.input[0]?.content).toContain(
'There is no general feature for creating or scheduling arbitrary personal reminders'
)
expect(capturedBody!.input[1]?.role).toBe('system')
expect(capturedBody!.input[1]?.content).toContain('Topic role: reminders')
expect(capturedBody!.input[1]?.content).toContain(
'You cannot create, schedule, snooze, or manage arbitrary personal reminders.'
)
} finally {
globalThis.fetch = originalFetch
}

View File

@@ -1,4 +1,5 @@
import { extractOpenAiResponseText, type OpenAiResponsePayload } from './openai-responses'
import type { TopicMessageRole } from './topic-message-router'
const ASSISTANT_MAX_OUTPUT_TOKENS = 220
@@ -16,16 +17,70 @@ export interface AssistantReply {
export interface ConversationalAssistant {
respond(input: {
locale: 'en' | 'ru'
topicRole: TopicMessageRole
householdContext: string
memorySummary: string | null
recentTurns: readonly {
role: 'user' | 'assistant'
text: string
}[]
recentThreadMessages?: readonly {
role: 'user' | 'assistant'
speaker: string
text: string
threadId: string | null
}[]
sameDayChatMessages?: readonly {
role: 'user' | 'assistant'
speaker: string
text: string
threadId: string | null
}[]
userMessage: string
}): Promise<AssistantReply>
}
function topicCapabilityNotes(topicRole: TopicMessageRole): string {
switch (topicRole) {
case 'purchase':
return [
'Purchase topic capabilities:',
'- You can discuss shared household purchases, clarify intent, and help with purchase recording flow.',
'- You cannot claim a purchase was saved unless the system explicitly confirmed it.',
'- You cannot create unrelated reminders, tasks, or household settings changes.'
].join('\n')
case 'payments':
return [
'Payments topic capabilities:',
'- You can discuss rent and utility payment status and supported payment confirmation flows.',
'- You cannot claim a payment was recorded unless the system explicitly confirmed it.',
'- You cannot schedule reminders or create arbitrary tasks.'
].join('\n')
case 'reminders':
return [
'Reminders topic capabilities:',
'- You can discuss existing household rent/utilities reminder timing and the supported utility-bill collection flow.',
'- You cannot create, schedule, snooze, or manage arbitrary personal reminders.',
'- You cannot promise future reminder setup. If asked, say that this feature is not supported.'
].join('\n')
case 'feedback':
return [
'Feedback topic capabilities:',
'- You can discuss the anonymous feedback flow and household feedback context.',
'- You cannot claim a submission was posted unless the system explicitly confirmed it.',
'- You cannot schedule reminders or create unrelated workflow items.'
].join('\n')
case 'generic':
default:
return [
'General household chat capabilities:',
'- You can answer household finance and context questions using the provided information.',
'- You cannot create arbitrary reminders, scheduled tasks, or background jobs.',
'- Never imply unsupported features exist.'
].join('\n')
}
}
const ASSISTANT_SYSTEM_PROMPT = [
'You are Kojori, a household finance assistant for one specific household.',
'Stay within the provided household context and recent conversation context.',
@@ -41,6 +96,8 @@ const ASSISTANT_SYSTEM_PROMPT = [
'If the user tells you to stop, back off briefly and do not keep asking follow-up questions.',
'Do not repeat the same clarification after the user declines, backs off, or says they are only thinking.',
'Do not restate the full household context unless the user explicitly asks for details.',
'Do not imply capabilities that are not explicitly provided in the system context.',
'There is no general feature for creating or scheduling arbitrary personal reminders unless the system explicitly says so.',
'Avoid bullet lists unless the user asked for a list or several distinct items.',
'Reply in the user language inferred from the latest user message and locale context.'
].join(' ')
@@ -79,6 +136,8 @@ export function createOpenAiChatAssistant(
role: 'system',
content: [
`User locale: ${input.locale}`,
`Topic role: ${input.topicRole}`,
topicCapabilityNotes(input.topicRole),
'Bounded household context:',
input.householdContext,
input.memorySummary ? `Conversation summary:\n${input.memorySummary}` : null,
@@ -87,6 +146,24 @@ export function createOpenAiChatAssistant(
'Recent conversation turns:',
...input.recentTurns.map((turn) => `${turn.role}: ${turn.text}`)
].join('\n')
: null,
input.recentThreadMessages && input.recentThreadMessages.length > 0
? [
'Recent topic thread messages:',
...input.recentThreadMessages.map(
(message) => `${message.speaker} (${message.role}): ${message.text}`
)
].join('\n')
: null,
input.sameDayChatMessages && input.sameDayChatMessages.length > 0
? [
'Additional same-day household chat history:',
...input.sameDayChatMessages.map((message) =>
message.threadId
? `[thread ${message.threadId}] ${message.speaker} (${message.role}): ${message.text}`
: `${message.speaker} (${message.role}): ${message.text}`
)
].join('\n')
: null
]
.filter(Boolean)

View File

@@ -5,7 +5,8 @@ import type { Logger } from '@household/observability'
import type {
HouseholdConfigurationRepository,
HouseholdTopicBindingRecord,
TelegramPendingActionRepository
TelegramPendingActionRepository,
TopicMessageHistoryRepository
} from '@household/ports'
import { getBotTranslations, type BotLocale } from './i18n'
@@ -25,6 +26,7 @@ import {
looksLikeDirectBotAddress,
type TopicMessageRouter
} from './topic-message-router'
import { historyRecordToTurn } from './topic-history'
import { stripExplicitBotMention } from './telegram-mentions'
const PAYMENT_TOPIC_CONFIRM_CALLBACK_PREFIX = 'payment_topic:confirm:'
@@ -215,6 +217,46 @@ function appendConversation(
})
}
async function listRecentThreadMessages(
repository: TopicMessageHistoryRepository | undefined,
record: PaymentTopicRecord
) {
if (!repository) {
return []
}
const messages = await repository.listRecentThreadMessages({
householdId: record.householdId,
telegramChatId: record.chatId,
telegramThreadId: record.threadId,
limit: 8
})
return messages.map(historyRecordToTurn)
}
async function persistIncomingTopicMessage(
repository: TopicMessageHistoryRepository | undefined,
record: PaymentTopicRecord
) {
if (!repository || record.rawText.trim().length === 0) {
return
}
await repository.saveMessage({
householdId: record.householdId,
telegramChatId: record.chatId,
telegramThreadId: record.threadId,
telegramMessageId: record.messageId,
telegramUpdateId: String(record.updateId),
senderTelegramUserId: record.senderTelegramUserId,
senderDisplayName: null,
isBot: false,
rawText: record.rawText.trim(),
messageSentAt: record.messageSentAt
})
}
async function routePaymentTopicMessage(input: {
record: PaymentTopicRecord
locale: BotLocale
@@ -225,6 +267,7 @@ async function routePaymentTopicMessage(input: {
assistantContext: string | null
assistantTone: string | null
memoryStore: AssistantConversationMemoryStore | undefined
historyRepository: TopicMessageHistoryRepository | undefined
router: TopicMessageRouter | undefined
}) {
if (!input.router) {
@@ -249,6 +292,8 @@ async function routePaymentTopicMessage(input: {
}
}
const recentThreadMessages = await listRecentThreadMessages(input.historyRepository, input.record)
return input.router({
locale: input.locale,
topicRole: input.topicRole,
@@ -258,7 +303,8 @@ async function routePaymentTopicMessage(input: {
activeWorkflow: input.activeWorkflow,
assistantContext: input.assistantContext,
assistantTone: input.assistantTone,
recentTurns: input.memoryStore?.get(memoryKeyForRecord(input.record)).turns ?? []
recentTurns: input.memoryStore?.get(memoryKeyForRecord(input.record)).turns ?? [],
recentThreadMessages
})
}
@@ -379,6 +425,7 @@ export function registerConfiguredPaymentTopicIngestion(
options: {
router?: TopicMessageRouter
memoryStore?: AssistantConversationMemoryStore
historyRepository?: TopicMessageHistoryRepository
logger?: Logger
} = {}
): void {
@@ -574,6 +621,7 @@ export function registerConfiguredPaymentTopicIngestion(
assistantContext: assistantConfig.assistantContext,
assistantTone: assistantConfig.assistantTone,
memoryStore: options.memoryStore,
historyRepository: options.historyRepository,
router: options.router
}))
cacheTopicMessageRoute(ctx, 'payments', route)
@@ -722,6 +770,8 @@ export function registerConfiguredPaymentTopicIngestion(
},
'Failed to ingest payment confirmation'
)
} finally {
await persistIncomingTopicMessage(options.historyRepository, record)
}
})
}

View File

@@ -1,9 +1,12 @@
import { describe, expect, test } from 'bun:test'
import { instantFromIso } from '@household/domain'
import type { HouseholdConfigurationRepository } from '@household/ports'
import type {
HouseholdConfigurationRepository,
TopicMessageHistoryRecord,
TopicMessageHistoryRepository
} from '@household/ports'
import { createTelegramBot } from './bot'
import { createInMemoryAssistantConversationMemoryStore } from './assistant-state'
import {
buildPurchaseAcknowledgement,
@@ -154,6 +157,37 @@ function createTestBot() {
return bot
}
function createTopicMessageHistoryRepository(): TopicMessageHistoryRepository {
const rows: TopicMessageHistoryRecord[] = []
return {
async saveMessage(input) {
rows.push(input)
},
async listRecentThreadMessages(input) {
return rows
.filter(
(row) =>
row.householdId === input.householdId &&
row.telegramChatId === input.telegramChatId &&
row.telegramThreadId === input.telegramThreadId
)
.slice(-input.limit)
},
async listRecentChatMessages(input) {
return rows
.filter(
(row) =>
row.householdId === input.householdId &&
row.telegramChatId === input.telegramChatId &&
row.messageSentAt &&
row.messageSentAt.epochMilliseconds >= input.sentAtOrAfter.epochMilliseconds
)
.slice(-input.limit)
}
}
}
describe('extractPurchaseTopicCandidate', () => {
test('returns record when message belongs to configured topic', () => {
const record = extractPurchaseTopicCandidate(candidate(), config)
@@ -1667,7 +1701,7 @@ Confirm or cancel below.`,
test('uses recent silent planning context for direct bot-address advice replies', async () => {
const bot = createTestBot()
const calls: Array<{ method: string; payload: unknown }> = []
const memoryStore = createInMemoryAssistantConversationMemoryStore(12)
const historyRepository = createTopicMessageHistoryRepository()
let sawDirectAddress = false
let recentTurnTexts: string[] = []
@@ -1699,7 +1733,7 @@ Confirm or cancel below.`,
}
registerPurchaseTopicIngestion(bot, config, repository, {
memoryStore,
historyRepository,
router: async (input) => {
if (input.messageText.includes('думаю купить')) {
return {
@@ -1714,7 +1748,7 @@ Confirm or cancel below.`,
}
sawDirectAddress = input.isExplicitMention
recentTurnTexts = input.recentTurns?.map((turn) => turn.text) ?? []
recentTurnTexts = input.recentThreadMessages?.map((turn) => turn.text) ?? []
return {
route: 'chat_reply',
@@ -1795,7 +1829,7 @@ Confirm or cancel below.`,
test('keeps silent planning context scoped to the current purchase thread', async () => {
const bot = createTestBot()
const calls: Array<{ method: string; payload: unknown }> = []
const memoryStore = createInMemoryAssistantConversationMemoryStore(12)
const historyRepository = createTopicMessageHistoryRepository()
let recentTurnTexts: string[] = []
bot.api.config.use(async (_prev, method, payload) => {
@@ -1874,7 +1908,7 @@ Confirm or cancel below.`,
householdConfigurationRepository as unknown as HouseholdConfigurationRepository,
repository,
{
memoryStore,
historyRepository,
router: async (input) => {
if (input.messageText.includes('картошки')) {
return {
@@ -1888,7 +1922,7 @@ Confirm or cancel below.`,
}
}
recentTurnTexts = input.recentTurns?.map((turn) => turn.text) ?? []
recentTurnTexts = input.recentThreadMessages?.map((turn) => turn.text) ?? []
return {
route: 'chat_reply',

View File

@@ -4,7 +4,8 @@ import type { Bot, Context } from 'grammy'
import type { Logger } from '@household/observability'
import type {
HouseholdConfigurationRepository,
HouseholdTopicBindingRecord
HouseholdTopicBindingRecord,
TopicMessageHistoryRepository
} from '@household/ports'
import { createDbClient, schema } from '@household/db'
@@ -22,6 +23,7 @@ import {
type TopicMessageRouter,
type TopicMessageRoutingResult
} from './topic-message-router'
import { historyRecordToTurn } from './topic-history'
import { startTypingIndicator } from './telegram-chat-action'
import { stripExplicitBotMention } from './telegram-mentions'
@@ -1476,6 +1478,46 @@ function rememberAssistantTurn(
})
}
async function listRecentThreadMessages(
repository: TopicMessageHistoryRepository | undefined,
record: PurchaseTopicRecord
) {
if (!repository) {
return []
}
const messages = await repository.listRecentThreadMessages({
householdId: record.householdId,
telegramChatId: record.chatId,
telegramThreadId: record.threadId,
limit: 8
})
return messages.map(historyRecordToTurn)
}
async function persistIncomingTopicMessage(
repository: TopicMessageHistoryRepository | undefined,
record: PurchaseTopicRecord
) {
if (!repository || record.rawText.trim().length === 0) {
return
}
await repository.saveMessage({
householdId: record.householdId,
telegramChatId: record.chatId,
telegramThreadId: record.threadId,
telegramMessageId: record.messageId,
telegramUpdateId: String(record.updateId),
senderTelegramUserId: record.senderTelegramUserId,
senderDisplayName: record.senderDisplayName ?? null,
isBot: false,
rawText: record.rawText.trim(),
messageSentAt: record.messageSentAt
})
}
async function routePurchaseTopicMessage(input: {
ctx: Pick<Context, 'msg' | 'me'>
record: PurchaseTopicRecord
@@ -1486,6 +1528,7 @@ async function routePurchaseTopicMessage(input: {
>
router: TopicMessageRouter | undefined
memoryStore: AssistantConversationMemoryStore | undefined
historyRepository: TopicMessageHistoryRepository | undefined
assistantContext?: string | null
assistantTone?: string | null
}): Promise<TopicMessageRoutingResult> {
@@ -1543,6 +1586,7 @@ async function routePurchaseTopicMessage(input: {
const key = memoryKeyForRecord(input.record)
const recentTurns = input.memoryStore?.get(key).turns ?? []
const recentThreadMessages = await listRecentThreadMessages(input.historyRepository, input.record)
return input.router({
locale: input.locale,
@@ -1557,7 +1601,8 @@ async function routePurchaseTopicMessage(input: {
: null,
assistantContext: input.assistantContext ?? null,
assistantTone: input.assistantTone ?? null,
recentTurns
recentTurns,
recentThreadMessages
})
}
@@ -1890,6 +1935,7 @@ export function registerPurchaseTopicIngestion(
interpreter?: PurchaseMessageInterpreter
router?: TopicMessageRouter
memoryStore?: AssistantConversationMemoryStore
historyRepository?: TopicMessageHistoryRepository
logger?: Logger
} = {}
): void {
@@ -1919,7 +1965,8 @@ export function registerPurchaseTopicIngestion(
locale: 'en',
repository,
router: options.router,
memoryStore: options.memoryStore
memoryStore: options.memoryStore,
historyRepository: options.historyRepository
}))
cacheTopicMessageRoute(ctx, 'purchase', route)
@@ -1980,6 +2027,7 @@ export function registerPurchaseTopicIngestion(
'Failed to ingest purchase topic message'
)
} finally {
await persistIncomingTopicMessage(options.historyRepository, record)
typingIndicator?.stop()
}
})
@@ -1993,6 +2041,7 @@ export function registerConfiguredPurchaseTopicIngestion(
interpreter?: PurchaseMessageInterpreter
router?: TopicMessageRouter
memoryStore?: AssistantConversationMemoryStore
historyRepository?: TopicMessageHistoryRepository
logger?: Logger
} = {}
): void {
@@ -2046,6 +2095,7 @@ export function registerConfiguredPurchaseTopicIngestion(
repository,
router: options.router,
memoryStore: options.memoryStore,
historyRepository: options.historyRepository,
assistantContext: assistantConfig.assistantContext,
assistantTone: assistantConfig.assistantTone
}))
@@ -2121,6 +2171,7 @@ export function registerConfiguredPurchaseTopicIngestion(
'Failed to ingest purchase topic message'
)
} finally {
await persistIncomingTopicMessage(options.historyRepository, record)
typingIndicator?.stop()
}
})

View File

@@ -0,0 +1,66 @@
import { nowInstant, Temporal, type Instant } from '@household/domain'
import type { TopicMessageHistoryRecord } from '@household/ports'
export interface TopicHistoryTurn {
role: 'user' | 'assistant'
speaker: string
text: string
threadId: string | null
}
const MEMORY_LOOKUP_PATTERN =
/\b(?:do you remember|remember|what were we talking about|what did we say today)\b|(?:^|[^\p{L}])(?:помнишь|ты\s+помнишь|что\s+мы\s+сегодня\s+обсуждали|о\s+чем\s+мы\s+говорили)(?=$|[^\p{L}])/iu
export function shouldLoadExpandedChatHistory(text: string): boolean {
return MEMORY_LOOKUP_PATTERN.test(text.trim())
}
export function startOfCurrentDayInTimezone(
timezone: string,
referenceInstant = nowInstant()
): Instant {
const zoned = referenceInstant.toZonedDateTimeISO(timezone)
const startOfDay = Temporal.ZonedDateTime.from({
timeZone: timezone,
year: zoned.year,
month: zoned.month,
day: zoned.day,
hour: 0,
minute: 0,
second: 0,
millisecond: 0,
microsecond: 0,
nanosecond: 0
})
return startOfDay.toInstant()
}
export function historyRecordToTurn(record: TopicMessageHistoryRecord): TopicHistoryTurn {
return {
role: record.isBot ? 'assistant' : 'user',
speaker: record.senderDisplayName ?? (record.isBot ? 'Kojori Bot' : 'Unknown'),
text: record.rawText.trim(),
threadId: record.telegramThreadId
}
}
export function formatThreadHistory(turns: readonly TopicHistoryTurn[]): string | null {
const lines = turns
.map((turn) => `${turn.speaker} (${turn.role}): ${turn.text}`)
.filter((line) => line.trim().length > 0)
return lines.length > 0 ? lines.join('\n') : null
}
export function formatSameDayChatHistory(turns: readonly TopicHistoryTurn[]): string | null {
const lines = turns
.map((turn) =>
turn.threadId
? `[thread ${turn.threadId}] ${turn.speaker} (${turn.role}): ${turn.text}`
: `${turn.speaker} (${turn.role}): ${turn.text}`
)
.filter((line) => line.trim().length > 0)
return lines.length > 0 ? lines.join('\n') : null
}

View File

@@ -31,6 +31,12 @@ export interface TopicMessageRoutingInput {
role: 'user' | 'assistant'
text: string
}[]
recentThreadMessages?: readonly {
role: 'user' | 'assistant'
speaker: string
text: string
threadId: string | null
}[]
}
export interface TopicMessageRoutingResult {
@@ -251,6 +257,17 @@ function buildRecentTurns(input: TopicMessageRoutingInput): string | null {
: null
}
function buildRecentThreadMessages(input: TopicMessageRoutingInput): string | null {
const recentMessages = input.recentThreadMessages
?.slice(-8)
.map((message) => `${message.speaker} (${message.role}): ${message.text.trim()}`)
.filter((line) => line.length > 0)
return recentMessages && recentMessages.length > 0
? ['Recent messages in this topic thread:', ...recentMessages].join('\n')
: null
}
export function cacheTopicMessageRoute(
ctx: Context,
topicRole: CachedTopicMessageRole,
@@ -305,6 +322,7 @@ export function createOpenAiTopicMessageRouter(
'When the user directly addresses the bot with small talk, joking, or testing, prefer chat_reply with one short sentence.',
'In a purchase topic, if the user is discussing a possible future purchase and asks for an opinion, prefer chat_reply with a short contextual opinion instead of a workflow.',
'Use the recent conversation when writing replyText. Do not ignore the already-established subject.',
'The recent thread messages are more important than the per-user memory summary.',
'If the user asks what you think about a price or quantity, mention the actual item/price from context when possible.',
'Use topic_helper only when the message is a real question or request that likely needs household knowledge or a topic-specific helper.',
'Use purchase_candidate only for a clear completed shared purchase.',
@@ -331,6 +349,7 @@ export function createOpenAiTopicMessageRouter(
`Reply to bot: ${input.isReplyToBot ? 'yes' : 'no'}`,
`Looks like direct address: ${looksLikeDirectBotAddress(input.messageText) ? 'yes' : 'no'}`,
`Active workflow: ${input.activeWorkflow ?? 'none'}`,
buildRecentThreadMessages(input),
buildRecentTurns(input),
`Latest message:\n${input.messageText}`
]

View File

@@ -4,3 +4,4 @@ export { createDbHouseholdConfigurationRepository } from './household-config-rep
export { createDbProcessedBotMessageRepository } from './processed-bot-message-repository'
export { createDbReminderDispatchRepository } from './reminder-dispatch-repository'
export { createDbTelegramPendingActionRepository } from './telegram-pending-action-repository'
export { createDbTopicMessageHistoryRepository } from './topic-message-history-repository'

View File

@@ -0,0 +1,99 @@
import { and, asc, desc, eq, gte, isNotNull } from 'drizzle-orm'
import { instantFromDatabaseValue, instantToDate } from '@household/domain'
import { createDbClient, schema } from '@household/db'
import type { TopicMessageHistoryRepository } from '@household/ports'
export function createDbTopicMessageHistoryRepository(databaseUrl: string): {
repository: TopicMessageHistoryRepository
close: () => Promise<void>
} {
const { db, queryClient } = createDbClient(databaseUrl, {
max: 3,
prepare: false
})
const repository: TopicMessageHistoryRepository = {
async saveMessage(input) {
await db
.insert(schema.topicMessages)
.values({
householdId: input.householdId,
telegramChatId: input.telegramChatId,
telegramThreadId: input.telegramThreadId,
telegramMessageId: input.telegramMessageId,
telegramUpdateId: input.telegramUpdateId,
senderTelegramUserId: input.senderTelegramUserId,
senderDisplayName: input.senderDisplayName,
isBot: input.isBot ? 1 : 0,
rawText: input.rawText,
messageSentAt: input.messageSentAt ? instantToDate(input.messageSentAt) : null
})
.onConflictDoNothing()
},
async listRecentThreadMessages(input) {
const rows = await db
.select()
.from(schema.topicMessages)
.where(
and(
eq(schema.topicMessages.householdId, input.householdId),
eq(schema.topicMessages.telegramChatId, input.telegramChatId),
eq(schema.topicMessages.telegramThreadId, input.telegramThreadId)
)
)
.orderBy(desc(schema.topicMessages.messageSentAt), desc(schema.topicMessages.createdAt))
.limit(input.limit)
return rows.reverse().map((row) => ({
householdId: row.householdId,
telegramChatId: row.telegramChatId,
telegramThreadId: row.telegramThreadId,
telegramMessageId: row.telegramMessageId,
telegramUpdateId: row.telegramUpdateId,
senderTelegramUserId: row.senderTelegramUserId,
senderDisplayName: row.senderDisplayName,
isBot: row.isBot === 1,
rawText: row.rawText,
messageSentAt: instantFromDatabaseValue(row.messageSentAt)
}))
},
async listRecentChatMessages(input) {
const rows = await db
.select()
.from(schema.topicMessages)
.where(
and(
eq(schema.topicMessages.householdId, input.householdId),
eq(schema.topicMessages.telegramChatId, input.telegramChatId),
isNotNull(schema.topicMessages.messageSentAt),
gte(schema.topicMessages.messageSentAt, instantToDate(input.sentAtOrAfter))
)
)
.orderBy(asc(schema.topicMessages.messageSentAt), asc(schema.topicMessages.createdAt))
.limit(input.limit)
return rows.map((row) => ({
householdId: row.householdId,
telegramChatId: row.telegramChatId,
telegramThreadId: row.telegramThreadId,
telegramMessageId: row.telegramMessageId,
telegramUpdateId: row.telegramUpdateId,
senderTelegramUserId: row.senderTelegramUserId,
senderDisplayName: row.senderDisplayName,
isBot: row.isBot === 1,
rawText: row.rawText,
messageSentAt: instantFromDatabaseValue(row.messageSentAt)
}))
}
}
return {
repository,
close: async () => {
await queryClient.end({ timeout: 5 })
}
}
}

View File

@@ -19,6 +19,7 @@
"0015_white_owl.sql": "a9dec4c536c660d7eb0fcea42a3bedb1301408551977d098dff8324d7d5b26bd",
"0016_equal_susan_delgado.sql": "1698bf0516d16d2d7929dcb1bd2bb76d5a629eaba3d0bb2533c1ae926408de7a",
"0017_gigantic_selene.sql": "232d61b979675ddb97c9d69d14406dc15dd095ee6a332d3fa71d10416204fade",
"0018_nimble_kojori.sql": "818738e729119c6de8049dcfca562926a5dc6e321ecbbf9cf38e02bc70b5a0dc"
"0018_nimble_kojori.sql": "818738e729119c6de8049dcfca562926a5dc6e321ecbbf9cf38e02bc70b5a0dc",
"0019_faithful_madame_masque.sql": "38711341799b04a7c47fcc64fd19faf5b26e6f183d6a4c01d492b9929cd63641"
}
}

View File

@@ -0,0 +1,20 @@
CREATE TABLE "topic_messages" (
"id" uuid PRIMARY KEY DEFAULT gen_random_uuid() NOT NULL,
"household_id" uuid NOT NULL,
"telegram_chat_id" text NOT NULL,
"telegram_thread_id" text,
"telegram_message_id" text,
"telegram_update_id" text,
"sender_telegram_user_id" text,
"sender_display_name" text,
"is_bot" integer DEFAULT 0 NOT NULL,
"raw_text" text NOT NULL,
"message_sent_at" timestamp with time zone,
"created_at" timestamp with time zone DEFAULT now() NOT NULL
);
--> statement-breakpoint
ALTER TABLE "topic_messages" ADD CONSTRAINT "topic_messages_household_id_households_id_fk" FOREIGN KEY ("household_id") REFERENCES "public"."households"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
CREATE INDEX "topic_messages_household_thread_sent_idx" ON "topic_messages" USING btree ("household_id","telegram_chat_id","telegram_thread_id","message_sent_at");--> statement-breakpoint
CREATE INDEX "topic_messages_household_chat_sent_idx" ON "topic_messages" USING btree ("household_id","telegram_chat_id","message_sent_at");--> statement-breakpoint
CREATE UNIQUE INDEX "topic_messages_household_tg_message_unique" ON "topic_messages" USING btree ("household_id","telegram_chat_id","telegram_message_id");--> statement-breakpoint
CREATE UNIQUE INDEX "topic_messages_household_tg_update_unique" ON "topic_messages" USING btree ("household_id","telegram_update_id");

File diff suppressed because it is too large Load Diff

View File

@@ -134,6 +134,13 @@
"when": 1773252000000,
"tag": "0018_nimble_kojori",
"breakpoints": true
},
{
"idx": 19,
"version": "7",
"when": 1773327708167,
"tag": "0019_faithful_madame_masque",
"breakpoints": true
}
]
}

View File

@@ -499,6 +499,48 @@ export const processedBotMessages = pgTable(
})
)
export const topicMessages = pgTable(
'topic_messages',
{
id: uuid('id').defaultRandom().primaryKey(),
householdId: uuid('household_id')
.notNull()
.references(() => households.id, { onDelete: 'cascade' }),
telegramChatId: text('telegram_chat_id').notNull(),
telegramThreadId: text('telegram_thread_id'),
telegramMessageId: text('telegram_message_id'),
telegramUpdateId: text('telegram_update_id'),
senderTelegramUserId: text('sender_telegram_user_id'),
senderDisplayName: text('sender_display_name'),
isBot: integer('is_bot').default(0).notNull(),
rawText: text('raw_text').notNull(),
messageSentAt: timestamp('message_sent_at', { withTimezone: true }),
createdAt: timestamp('created_at', { withTimezone: true }).defaultNow().notNull()
},
(table) => ({
householdThreadSentIdx: index('topic_messages_household_thread_sent_idx').on(
table.householdId,
table.telegramChatId,
table.telegramThreadId,
table.messageSentAt
),
householdChatSentIdx: index('topic_messages_household_chat_sent_idx').on(
table.householdId,
table.telegramChatId,
table.messageSentAt
),
householdMessageUnique: uniqueIndex('topic_messages_household_tg_message_unique').on(
table.householdId,
table.telegramChatId,
table.telegramMessageId
),
householdUpdateUnique: uniqueIndex('topic_messages_household_tg_update_unique').on(
table.householdId,
table.telegramUpdateId
)
})
)
export const anonymousMessages = pgTable(
'anonymous_messages',
{
@@ -682,6 +724,7 @@ export type BillingCycleExchangeRate = typeof billingCycleExchangeRates.$inferSe
export type UtilityBill = typeof utilityBills.$inferSelect
export type PurchaseEntry = typeof purchaseEntries.$inferSelect
export type PurchaseMessage = typeof purchaseMessages.$inferSelect
export type TopicMessage = typeof topicMessages.$inferSelect
export type AnonymousMessage = typeof anonymousMessages.$inferSelect
export type PaymentConfirmation = typeof paymentConfirmations.$inferSelect
export type PaymentRecord = typeof paymentRecords.$inferSelect

View File

@@ -66,3 +66,9 @@ export {
type TelegramPendingActionRepository,
type TelegramPendingActionType
} from './telegram-pending-actions'
export type {
ListRecentChatTopicMessagesInput,
ListRecentThreadTopicMessagesInput,
TopicMessageHistoryRecord,
TopicMessageHistoryRepository
} from './topic-message-history'

View File

@@ -0,0 +1,38 @@
import type { Instant } from '@household/domain'
export interface TopicMessageHistoryRecord {
householdId: string
telegramChatId: string
telegramThreadId: string | null
telegramMessageId: string | null
telegramUpdateId: string | null
senderTelegramUserId: string | null
senderDisplayName: string | null
isBot: boolean
rawText: string
messageSentAt: Instant | null
}
export interface ListRecentThreadTopicMessagesInput {
householdId: string
telegramChatId: string
telegramThreadId: string
limit: number
}
export interface ListRecentChatTopicMessagesInput {
householdId: string
telegramChatId: string
sentAtOrAfter: Instant
limit: number
}
export interface TopicMessageHistoryRepository {
saveMessage(input: TopicMessageHistoryRecord): Promise<void>
listRecentThreadMessages(
input: ListRecentThreadTopicMessagesInput
): Promise<readonly TopicMessageHistoryRecord[]>
listRecentChatMessages(
input: ListRecentChatTopicMessagesInput
): Promise<readonly TopicMessageHistoryRecord[]>
}