Fix topic message history persistence

This commit is contained in:
2026-03-12 19:21:37 +04:00
parent 23faeef738
commit b7fa489d24
7 changed files with 420 additions and 52 deletions

View File

@@ -1736,7 +1736,7 @@ Confirm or cancel below.`,
}) })
}) })
test('loads persisted thread and same-day chat history for memory-style prompts', async () => { test('loads persisted thread and same-day chat history including bot replies', async () => {
const bot = createTestBot() const bot = createTestBot()
const calls: Array<{ method: string; payload: unknown }> = [] const calls: Array<{ method: string; payload: unknown }> = []
const topicMessageHistoryRepository = createTopicMessageHistoryRepository() const topicMessageHistoryRepository = createTopicMessageHistoryRepository()
@@ -1800,9 +1800,12 @@ Confirm or cancel below.`,
await bot.handleUpdate(topicMessageUpdate('I think we need a TV in the house') as never) await bot.handleUpdate(topicMessageUpdate('I think we need a TV in the house') as never)
await bot.handleUpdate(topicMessageUpdate('Bot, do you remember what we said today?') as never) await bot.handleUpdate(topicMessageUpdate('Bot, do you remember what we said today?') as never)
await bot.handleUpdate(topicMessageUpdate('Bot, do you remember what you answered?') as never)
expect(recentThreadTexts).toContain('I think we need a TV in the house') expect(recentThreadTexts).toContain('I think we need a TV in the house')
expect(recentThreadTexts).toContain('Yes. You were discussing a TV for the house.')
expect(sameDayTexts).toContain('I think we need a TV in the house') expect(sameDayTexts).toContain('I think we need a TV in the house')
expect(sameDayTexts).toContain('Yes. You were discussing a TV for the house.')
expect(calls.at(-1)).toMatchObject({ expect(calls.at(-1)).toMatchObject({
method: 'sendMessage', method: 'sendMessage',
payload: { payload: {

View File

@@ -40,8 +40,11 @@ import {
} from './topic-message-router' } from './topic-message-router'
import { import {
historyRecordToTurn, historyRecordToTurn,
persistTopicHistoryMessage,
shouldLoadExpandedChatHistory, shouldLoadExpandedChatHistory,
startOfCurrentDayInTimezone startOfCurrentDayInTimezone,
telegramMessageIdFromMessage,
telegramMessageSentAtFromMessage
} from './topic-history' } from './topic-history'
import { startTypingIndicator } from './telegram-chat-action' import { startTypingIndicator } from './telegram-chat-action'
import { stripExplicitBotMention } from './telegram-mentions' import { stripExplicitBotMention } from './telegram-mentions'
@@ -392,12 +395,8 @@ async function persistIncomingTopicMessage(input: {
rawText: string rawText: string
messageSentAt: ReturnType<typeof currentMessageSentAt> messageSentAt: ReturnType<typeof currentMessageSentAt>
}) { }) {
const normalizedText = input.rawText.trim() await persistTopicHistoryMessage({
if (!input.repository || normalizedText.length === 0) { repository: input.repository,
return
}
await input.repository.saveMessage({
householdId: input.householdId, householdId: input.householdId,
telegramChatId: input.telegramChatId, telegramChatId: input.telegramChatId,
telegramThreadId: input.telegramThreadId, telegramThreadId: input.telegramThreadId,
@@ -406,11 +405,39 @@ async function persistIncomingTopicMessage(input: {
senderTelegramUserId: input.senderTelegramUserId, senderTelegramUserId: input.senderTelegramUserId,
senderDisplayName: input.senderDisplayName, senderDisplayName: input.senderDisplayName,
isBot: false, isBot: false,
rawText: normalizedText, rawText: input.rawText,
messageSentAt: input.messageSentAt messageSentAt: input.messageSentAt
}) })
} }
async function replyAndPersistTopicMessage(input: {
ctx: Context
repository: TopicMessageHistoryRepository | undefined
householdId: string
telegramChatId: string
telegramThreadId: string | null
text: string
replyOptions?: Parameters<Context['reply']>[1]
}) {
const reply = await input.ctx.reply(input.text, input.replyOptions)
await persistTopicHistoryMessage({
repository: input.repository,
householdId: input.householdId,
telegramChatId: input.telegramChatId,
telegramThreadId: input.telegramThreadId,
telegramMessageId: telegramMessageIdFromMessage(reply),
telegramUpdateId: null,
senderTelegramUserId: input.ctx.me?.id?.toString() ?? null,
senderDisplayName: null,
isBot: true,
rawText: input.text,
messageSentAt: telegramMessageSentAtFromMessage(reply)
})
return reply
}
async function routeGroupAssistantMessage(input: { async function routeGroupAssistantMessage(input: {
router: TopicMessageRouter | undefined router: TopicMessageRouter | undefined
locale: BotLocale locale: BotLocale
@@ -582,6 +609,8 @@ async function replyWithAssistant(input: {
memoryStore: AssistantConversationMemoryStore memoryStore: AssistantConversationMemoryStore
usageTracker: AssistantUsageTracker usageTracker: AssistantUsageTracker
logger: Logger | undefined logger: Logger | undefined
topicMessageHistoryRepository?: TopicMessageHistoryRepository
telegramThreadId: string | null
recentThreadMessages: readonly { recentThreadMessages: readonly {
role: 'user' | 'assistant' role: 'user' | 'assistant'
speaker: string speaker: string
@@ -598,6 +627,18 @@ async function replyWithAssistant(input: {
const t = getBotTranslations(input.locale).assistant const t = getBotTranslations(input.locale).assistant
if (!input.assistant) { if (!input.assistant) {
if (input.topicMessageHistoryRepository) {
await replyAndPersistTopicMessage({
ctx: input.ctx,
repository: input.topicMessageHistoryRepository,
householdId: input.householdId,
telegramChatId: input.telegramChatId,
telegramThreadId: input.telegramThreadId,
text: t.unavailable
})
return
}
await input.ctx.reply(t.unavailable) await input.ctx.reply(t.unavailable)
return return
} }
@@ -672,6 +713,18 @@ async function replyWithAssistant(input: {
'Assistant reply generated' 'Assistant reply generated'
) )
if (input.topicMessageHistoryRepository) {
await replyAndPersistTopicMessage({
ctx: input.ctx,
repository: input.topicMessageHistoryRepository,
householdId: input.householdId,
telegramChatId: input.telegramChatId,
telegramThreadId: input.telegramThreadId,
text: reply.text
})
return
}
await input.ctx.reply(reply.text) await input.ctx.reply(reply.text)
} catch (error) { } catch (error) {
input.logger?.error( input.logger?.error(
@@ -688,6 +741,18 @@ async function replyWithAssistant(input: {
}, },
'Assistant reply failed' 'Assistant reply failed'
) )
if (input.topicMessageHistoryRepository) {
await replyAndPersistTopicMessage({
ctx: input.ctx,
repository: input.topicMessageHistoryRepository,
householdId: input.householdId,
telegramChatId: input.telegramChatId,
telegramThreadId: input.telegramThreadId,
text: t.unavailable
})
return
}
await input.ctx.reply(t.unavailable) await input.ctx.reply(t.unavailable)
} finally { } finally {
typingIndicator.stop() typingIndicator.stop()
@@ -1230,6 +1295,7 @@ export function registerDmAssistant(options: {
memoryStore: options.memoryStore, memoryStore: options.memoryStore,
usageTracker: options.usageTracker, usageTracker: options.usageTracker,
logger: options.logger, logger: options.logger,
telegramThreadId: null,
recentThreadMessages: [], recentThreadMessages: [],
sameDayChatMessages: [] sameDayChatMessages: []
}) })
@@ -1389,7 +1455,14 @@ export function registerDmAssistant(options: {
role: 'assistant', role: 'assistant',
text: route.replyText text: route.replyText
}) })
await ctx.reply(route.replyText) await replyAndPersistTopicMessage({
ctx,
repository: options.topicMessageHistoryRepository,
householdId: household.householdId,
telegramChatId,
telegramThreadId,
text: route.replyText
})
} }
return return
} }
@@ -1429,14 +1502,29 @@ export function registerDmAssistant(options: {
null null
) )
await ctx.reply(purchaseText, { await replyAndPersistTopicMessage({
reply_markup: purchaseProposalReplyMarkup(locale, purchaseResult.purchaseMessageId) ctx,
repository: options.topicMessageHistoryRepository,
householdId: household.householdId,
telegramChatId,
telegramThreadId,
text: purchaseText,
replyOptions: {
reply_markup: purchaseProposalReplyMarkup(locale, purchaseResult.purchaseMessageId)
}
}) })
return return
} }
if (purchaseResult.status === 'clarification_needed') { if (purchaseResult.status === 'clarification_needed') {
await ctx.reply(buildPurchaseClarificationText(locale, purchaseResult)) await replyAndPersistTopicMessage({
ctx,
repository: options.topicMessageHistoryRepository,
householdId: household.householdId,
telegramChatId,
telegramThreadId,
text: buildPurchaseClarificationText(locale, purchaseResult)
})
return return
} }
} }
@@ -1451,7 +1539,14 @@ export function registerDmAssistant(options: {
const t = getBotTranslations(locale).assistant const t = getBotTranslations(locale).assistant
if (!rateLimit.allowed) { if (!rateLimit.allowed) {
await ctx.reply(t.rateLimited(formatRetryDelay(locale, rateLimit.retryAfterMs))) await replyAndPersistTopicMessage({
ctx,
repository: options.topicMessageHistoryRepository,
householdId: household.householdId,
telegramChatId,
telegramThreadId,
text: t.rateLimited(formatRetryDelay(locale, rateLimit.retryAfterMs))
})
return return
} }
@@ -1464,7 +1559,14 @@ export function registerDmAssistant(options: {
}) })
if (paymentBalanceReply) { if (paymentBalanceReply) {
await ctx.reply(formatPaymentBalanceReplyText(locale, paymentBalanceReply)) await replyAndPersistTopicMessage({
ctx,
repository: options.topicMessageHistoryRepository,
householdId: household.householdId,
telegramChatId,
telegramThreadId,
text: formatPaymentBalanceReplyText(locale, paymentBalanceReply)
})
return return
} }
@@ -1488,7 +1590,14 @@ export function registerDmAssistant(options: {
text: memberInsightReply text: memberInsightReply
}) })
await ctx.reply(memberInsightReply) await replyAndPersistTopicMessage({
ctx,
repository: options.topicMessageHistoryRepository,
householdId: household.householdId,
telegramChatId,
telegramThreadId,
text: memberInsightReply
})
return return
} }
@@ -1508,6 +1617,12 @@ export function registerDmAssistant(options: {
memoryStore: options.memoryStore, memoryStore: options.memoryStore,
usageTracker: options.usageTracker, usageTracker: options.usageTracker,
logger: options.logger, logger: options.logger,
telegramThreadId,
...(options.topicMessageHistoryRepository
? {
topicMessageHistoryRepository: options.topicMessageHistoryRepository
}
: {}),
recentThreadMessages, recentThreadMessages,
sameDayChatMessages: await listExpandedChatMessages({ sameDayChatMessages: await listExpandedChatMessages({
repository: options.topicMessageHistoryRepository, repository: options.topicMessageHistoryRepository,

View File

@@ -26,7 +26,12 @@ import {
looksLikeDirectBotAddress, looksLikeDirectBotAddress,
type TopicMessageRouter type TopicMessageRouter
} from './topic-message-router' } from './topic-message-router'
import { historyRecordToTurn } from './topic-history' import {
historyRecordToTurn,
persistTopicHistoryMessage,
telegramMessageIdFromMessage,
telegramMessageSentAtFromMessage
} from './topic-history'
import { stripExplicitBotMention } from './telegram-mentions' import { stripExplicitBotMention } from './telegram-mentions'
const PAYMENT_TOPIC_CONFIRM_CALLBACK_PREFIX = 'payment_topic:confirm:' const PAYMENT_TOPIC_CONFIRM_CALLBACK_PREFIX = 'payment_topic:confirm:'
@@ -239,11 +244,8 @@ async function persistIncomingTopicMessage(
repository: TopicMessageHistoryRepository | undefined, repository: TopicMessageHistoryRepository | undefined,
record: PaymentTopicRecord record: PaymentTopicRecord
) { ) {
if (!repository || record.rawText.trim().length === 0) { await persistTopicHistoryMessage({
return repository,
}
await repository.saveMessage({
householdId: record.householdId, householdId: record.householdId,
telegramChatId: record.chatId, telegramChatId: record.chatId,
telegramThreadId: record.threadId, telegramThreadId: record.threadId,
@@ -252,7 +254,7 @@ async function persistIncomingTopicMessage(
senderTelegramUserId: record.senderTelegramUserId, senderTelegramUserId: record.senderTelegramUserId,
senderDisplayName: null, senderDisplayName: null,
isBot: false, isBot: false,
rawText: record.rawText.trim(), rawText: record.rawText,
messageSentAt: record.messageSentAt messageSentAt: record.messageSentAt
}) })
} }
@@ -397,14 +399,18 @@ function paymentProposalReplyMarkup(locale: BotLocale, proposalId: string) {
async function replyToPaymentMessage( async function replyToPaymentMessage(
ctx: Context, ctx: Context,
text: string, text: string,
replyMarkup?: { inline_keyboard: Array<Array<{ text: string; callback_data: string }>> } replyMarkup?: { inline_keyboard: Array<Array<{ text: string; callback_data: string }>> },
history?: {
repository: TopicMessageHistoryRepository | undefined
record: PaymentTopicRecord
}
): Promise<void> { ): Promise<void> {
const message = ctx.msg const message = ctx.msg
if (!message) { if (!message) {
return return
} }
await ctx.reply(text, { const reply = await ctx.reply(text, {
reply_parameters: { reply_parameters: {
message_id: message.message_id message_id: message.message_id
}, },
@@ -414,6 +420,20 @@ async function replyToPaymentMessage(
} }
: {}) : {})
}) })
await persistTopicHistoryMessage({
repository: history?.repository,
householdId: history?.record.householdId ?? '',
telegramChatId: history?.record.chatId ?? '',
telegramThreadId: history?.record.threadId ?? null,
telegramMessageId: telegramMessageIdFromMessage(reply),
telegramUpdateId: null,
senderTelegramUserId: ctx.me?.id?.toString() ?? null,
senderDisplayName: null,
isBot: true,
rawText: text,
messageSentAt: telegramMessageSentAtFromMessage(reply)
})
} }
export function registerConfiguredPaymentTopicIngestion( export function registerConfiguredPaymentTopicIngestion(
@@ -637,7 +657,10 @@ export function registerConfiguredPaymentTopicIngestion(
if (route.route === 'chat_reply' || route.route === 'dismiss_workflow') { if (route.route === 'chat_reply' || route.route === 'dismiss_workflow') {
if (route.replyText) { if (route.replyText) {
await replyToPaymentMessage(ctx, route.replyText) await replyToPaymentMessage(ctx, route.replyText, undefined, {
repository: options.historyRepository,
record
})
appendConversation(options.memoryStore, record, record.rawText, route.replyText) appendConversation(options.memoryStore, record, record.rawText, route.replyText)
} }
return return
@@ -665,7 +688,10 @@ export function registerConfiguredPaymentTopicIngestion(
} }
const helperText = formatPaymentBalanceReplyText(locale, balanceReply) const helperText = formatPaymentBalanceReplyText(locale, balanceReply)
await replyToPaymentMessage(ctx, helperText) await replyToPaymentMessage(ctx, helperText, undefined, {
repository: options.historyRepository,
record
})
appendConversation(options.memoryStore, record, record.rawText, helperText) appendConversation(options.memoryStore, record, record.rawText, helperText)
return return
} }
@@ -709,7 +735,10 @@ export function registerConfiguredPaymentTopicIngestion(
expiresAt: nowInstant().add({ milliseconds: PAYMENT_TOPIC_ACTION_TTL_MS }) expiresAt: nowInstant().add({ milliseconds: PAYMENT_TOPIC_ACTION_TTL_MS })
}) })
await replyToPaymentMessage(ctx, t.clarification) await replyToPaymentMessage(ctx, t.clarification, undefined, {
repository: options.historyRepository,
record
})
appendConversation(options.memoryStore, record, record.rawText, t.clarification) appendConversation(options.memoryStore, record, record.rawText, t.clarification)
return return
} }
@@ -717,13 +746,19 @@ export function registerConfiguredPaymentTopicIngestion(
await promptRepository.clearPendingAction(record.chatId, record.senderTelegramUserId) await promptRepository.clearPendingAction(record.chatId, record.senderTelegramUserId)
if (proposal.status === 'unsupported_currency') { if (proposal.status === 'unsupported_currency') {
await replyToPaymentMessage(ctx, t.unsupportedCurrency) await replyToPaymentMessage(ctx, t.unsupportedCurrency, undefined, {
repository: options.historyRepository,
record
})
appendConversation(options.memoryStore, record, record.rawText, t.unsupportedCurrency) appendConversation(options.memoryStore, record, record.rawText, t.unsupportedCurrency)
return return
} }
if (proposal.status === 'no_balance') { if (proposal.status === 'no_balance') {
await replyToPaymentMessage(ctx, t.noBalance) await replyToPaymentMessage(ctx, t.noBalance, undefined, {
repository: options.historyRepository,
record
})
appendConversation(options.memoryStore, record, record.rawText, t.noBalance) appendConversation(options.memoryStore, record, record.rawText, t.noBalance)
return return
} }
@@ -754,7 +789,11 @@ export function registerConfiguredPaymentTopicIngestion(
await replyToPaymentMessage( await replyToPaymentMessage(
ctx, ctx,
proposalText, proposalText,
paymentProposalReplyMarkup(locale, proposal.payload.proposalId) paymentProposalReplyMarkup(locale, proposal.payload.proposalId),
{
repository: options.historyRepository,
record
}
) )
appendConversation(options.memoryStore, record, record.rawText, proposalText) appendConversation(options.memoryStore, record, record.rawText, proposalText)
} }

View File

@@ -1,4 +1,10 @@
import { instantFromEpochSeconds, instantToDate, Money, type Instant } from '@household/domain' import {
instantFromEpochSeconds,
instantToDate,
Money,
nowInstant,
type Instant
} from '@household/domain'
import { and, desc, eq } from 'drizzle-orm' import { and, desc, eq } from 'drizzle-orm'
import type { Bot, Context } from 'grammy' import type { Bot, Context } from 'grammy'
import type { Logger } from '@household/observability' import type { Logger } from '@household/observability'
@@ -23,7 +29,12 @@ import {
type TopicMessageRouter, type TopicMessageRouter,
type TopicMessageRoutingResult type TopicMessageRoutingResult
} from './topic-message-router' } from './topic-message-router'
import { historyRecordToTurn } from './topic-history' import {
historyRecordToTurn,
persistTopicHistoryMessage,
telegramMessageIdFromMessage,
telegramMessageSentAtFromMessage
} from './topic-history'
import { startTypingIndicator } from './telegram-chat-action' import { startTypingIndicator } from './telegram-chat-action'
import { stripExplicitBotMention } from './telegram-mentions' import { stripExplicitBotMention } from './telegram-mentions'
@@ -435,6 +446,10 @@ async function replyToPurchaseMessage(
callback_data: string callback_data: string
}> }>
> >
},
history?: {
repository: TopicMessageHistoryRepository | undefined
record: PurchaseTopicRecord
} }
): Promise<void> { ): Promise<void> {
const message = ctx.msg const message = ctx.msg
@@ -442,7 +457,7 @@ async function replyToPurchaseMessage(
return return
} }
await ctx.reply(text, { const reply = await ctx.reply(text, {
reply_parameters: { reply_parameters: {
message_id: message.message_id message_id: message.message_id
}, },
@@ -452,6 +467,20 @@ async function replyToPurchaseMessage(
} }
: {}) : {})
}) })
await persistTopicHistoryMessage({
repository: history?.repository,
householdId: history?.record.householdId ?? '',
telegramChatId: history?.record.chatId ?? '',
telegramThreadId: history?.record.threadId ?? null,
telegramMessageId: telegramMessageIdFromMessage(reply),
telegramUpdateId: null,
senderTelegramUserId: ctx.me?.id?.toString() ?? null,
senderDisplayName: null,
isBot: true,
rawText: text,
messageSentAt: telegramMessageSentAtFromMessage(reply)
})
} }
interface PendingPurchaseReply { interface PendingPurchaseReply {
@@ -511,6 +540,10 @@ async function finalizePurchaseReply(
callback_data: string callback_data: string
}> }>
> >
},
history?: {
repository: TopicMessageHistoryRepository | undefined
record: PurchaseTopicRecord
} }
): Promise<void> { ): Promise<void> {
if (!text) { if (!text) {
@@ -524,7 +557,7 @@ async function finalizePurchaseReply(
} }
if (!pendingReply) { if (!pendingReply) {
await replyToPurchaseMessage(ctx, text, replyMarkup) await replyToPurchaseMessage(ctx, text, replyMarkup, history)
return return
} }
@@ -535,8 +568,22 @@ async function finalizePurchaseReply(
text, text,
replyMarkup ? { reply_markup: replyMarkup } : {} replyMarkup ? { reply_markup: replyMarkup } : {}
) )
await persistTopicHistoryMessage({
repository: history?.repository,
householdId: history?.record.householdId ?? '',
telegramChatId: history?.record.chatId ?? '',
telegramThreadId: history?.record.threadId ?? null,
telegramMessageId: pendingReply.messageId.toString(),
telegramUpdateId: null,
senderTelegramUserId: ctx.me?.id?.toString() ?? null,
senderDisplayName: null,
isBot: true,
rawText: text,
messageSentAt: nowInstant()
})
} catch { } catch {
await replyToPurchaseMessage(ctx, text, replyMarkup) await replyToPurchaseMessage(ctx, text, replyMarkup, history)
} }
} }
@@ -1500,11 +1547,8 @@ async function persistIncomingTopicMessage(
repository: TopicMessageHistoryRepository | undefined, repository: TopicMessageHistoryRepository | undefined,
record: PurchaseTopicRecord record: PurchaseTopicRecord
) { ) {
if (!repository || record.rawText.trim().length === 0) { await persistTopicHistoryMessage({
return repository,
}
await repository.saveMessage({
householdId: record.householdId, householdId: record.householdId,
telegramChatId: record.chatId, telegramChatId: record.chatId,
telegramThreadId: record.threadId, telegramThreadId: record.threadId,
@@ -1513,7 +1557,7 @@ async function persistIncomingTopicMessage(
senderTelegramUserId: record.senderTelegramUserId, senderTelegramUserId: record.senderTelegramUserId,
senderDisplayName: record.senderDisplayName ?? null, senderDisplayName: record.senderDisplayName ?? null,
isBot: false, isBot: false,
rawText: record.rawText.trim(), rawText: record.rawText,
messageSentAt: record.messageSentAt messageSentAt: record.messageSentAt
}) })
} }
@@ -1612,7 +1656,8 @@ async function handlePurchaseMessageResult(
result: PurchaseMessageIngestionResult, result: PurchaseMessageIngestionResult,
locale: BotLocale, locale: BotLocale,
logger: Logger | undefined, logger: Logger | undefined,
pendingReply: PendingPurchaseReply | null = null pendingReply: PendingPurchaseReply | null = null,
historyRepository?: TopicMessageHistoryRepository
): Promise<void> { ): Promise<void> {
if (result.status !== 'duplicate') { if (result.status !== 'duplicate') {
logger?.info( logger?.info(
@@ -1644,6 +1689,12 @@ async function handlePurchaseMessageResult(
result.purchaseMessageId, result.purchaseMessageId,
result.participants result.participants
) )
: undefined,
historyRepository
? {
repository: historyRepository,
record
}
: undefined : undefined
) )
} }
@@ -1983,7 +2034,10 @@ export function registerPurchaseTopicIngestion(
if (route.route === 'chat_reply' || route.route === 'dismiss_workflow') { if (route.route === 'chat_reply' || route.route === 'dismiss_workflow') {
rememberUserTurn(options.memoryStore, record) rememberUserTurn(options.memoryStore, record)
if (route.replyText) { if (route.replyText) {
await replyToPurchaseMessage(ctx, route.replyText) await replyToPurchaseMessage(ctx, route.replyText, undefined, {
repository: options.historyRepository,
record
})
rememberAssistantTurn(options.memoryStore, record, route.replyText) rememberAssistantTurn(options.memoryStore, record, route.replyText)
} }
return return
@@ -2012,7 +2066,15 @@ export function registerPurchaseTopicIngestion(
if (result.status === 'ignored_not_purchase') { if (result.status === 'ignored_not_purchase') {
return await next() return await next()
} }
await handlePurchaseMessageResult(ctx, record, result, 'en', options.logger, pendingReply) await handlePurchaseMessageResult(
ctx,
record,
result,
'en',
options.logger,
pendingReply,
options.historyRepository
)
rememberAssistantTurn(options.memoryStore, record, buildPurchaseAcknowledgement(result, 'en')) rememberAssistantTurn(options.memoryStore, record, buildPurchaseAcknowledgement(result, 'en'))
} catch (error) { } catch (error) {
options.logger?.error( options.logger?.error(
@@ -2114,7 +2176,10 @@ export function registerConfiguredPurchaseTopicIngestion(
if (route.route === 'chat_reply' || route.route === 'dismiss_workflow') { if (route.route === 'chat_reply' || route.route === 'dismiss_workflow') {
rememberUserTurn(options.memoryStore, record) rememberUserTurn(options.memoryStore, record)
if (route.replyText) { if (route.replyText) {
await replyToPurchaseMessage(ctx, route.replyText) await replyToPurchaseMessage(ctx, route.replyText, undefined, {
repository: options.historyRepository,
record
})
rememberAssistantTurn(options.memoryStore, record, route.replyText) rememberAssistantTurn(options.memoryStore, record, route.replyText)
} }
return return
@@ -2151,7 +2216,15 @@ export function registerConfiguredPurchaseTopicIngestion(
return await next() return await next()
} }
await handlePurchaseMessageResult(ctx, record, result, locale, options.logger, pendingReply) await handlePurchaseMessageResult(
ctx,
record,
result,
locale,
options.logger,
pendingReply,
options.historyRepository
)
rememberAssistantTurn( rememberAssistantTurn(
options.memoryStore, options.memoryStore,
record, record,

View File

@@ -1,5 +1,5 @@
import { nowInstant, Temporal, type Instant } from '@household/domain' import { instantFromEpochSeconds, nowInstant, Temporal, type Instant } from '@household/domain'
import type { TopicMessageHistoryRecord } from '@household/ports' import type { TopicMessageHistoryRecord, TopicMessageHistoryRepository } from '@household/ports'
export interface TopicHistoryTurn { export interface TopicHistoryTurn {
role: 'user' | 'assistant' role: 'user' | 'assistant'
@@ -45,6 +45,50 @@ export function historyRecordToTurn(record: TopicMessageHistoryRecord): TopicHis
} }
} }
export function telegramMessageIdFromMessage(
message: { message_id?: number } | null | undefined
): string | null {
return typeof message?.message_id === 'number' ? message.message_id.toString() : null
}
export function telegramMessageSentAtFromMessage(
message: { date?: number } | null | undefined
): Instant | null {
return typeof message?.date === 'number' ? instantFromEpochSeconds(message.date) : null
}
export async function persistTopicHistoryMessage(input: {
repository: TopicMessageHistoryRepository | undefined
householdId: string
telegramChatId: string
telegramThreadId: string | null
telegramMessageId: string | null
telegramUpdateId: string | null
senderTelegramUserId: string | null
senderDisplayName: string | null
isBot: boolean
rawText: string
messageSentAt: Instant | null
}) {
const normalizedText = input.rawText.trim()
if (!input.repository || normalizedText.length === 0) {
return
}
await input.repository.saveMessage({
householdId: input.householdId,
telegramChatId: input.telegramChatId,
telegramThreadId: input.telegramThreadId,
telegramMessageId: input.telegramMessageId,
telegramUpdateId: input.telegramUpdateId,
senderTelegramUserId: input.senderTelegramUserId,
senderDisplayName: input.senderDisplayName,
isBot: input.isBot,
rawText: normalizedText,
messageSentAt: input.messageSentAt
})
}
export function formatThreadHistory(turns: readonly TopicHistoryTurn[]): string | null { export function formatThreadHistory(turns: readonly TopicHistoryTurn[]): string | null {
const lines = turns const lines = turns
.map((turn) => `${turn.speaker} (${turn.role}): ${turn.text}`) .map((turn) => `${turn.speaker} (${turn.role}): ${turn.text}`)

View File

@@ -0,0 +1,94 @@
import { randomUUID } from 'node:crypto'
import { afterAll, describe, expect, test } from 'bun:test'
import { inArray } from 'drizzle-orm'
import { instantFromIso } from '@household/domain'
import { createDbClient, schema } from '@household/db'
import { createDbHouseholdConfigurationRepository } from './household-config-repository'
import { createDbTopicMessageHistoryRepository } from './topic-message-history-repository'
const databaseUrl = process.env.DATABASE_URL
const testIfDatabase = databaseUrl ? test : test.skip
describe('createDbTopicMessageHistoryRepository', () => {
const createdHouseholdIds: string[] = []
afterAll(async () => {
if (!databaseUrl || createdHouseholdIds.length === 0) {
return
}
const { db, queryClient } = createDbClient(databaseUrl, {
max: 1,
prepare: false
})
await db.delete(schema.households).where(inArray(schema.households.id, createdHouseholdIds))
await queryClient.end({ timeout: 5 })
})
testIfDatabase('lists the latest same-day chat messages in chronological order', async () => {
const householdClient = createDbHouseholdConfigurationRepository(databaseUrl!)
const historyClient = createDbTopicMessageHistoryRepository(databaseUrl!)
const telegramChatId = `-100${Date.now()}`
const registered = await householdClient.repository.registerTelegramHouseholdChat({
householdName: `History Household ${randomUUID()}`,
telegramChatId,
telegramChatType: 'supergroup',
title: 'History Household'
})
createdHouseholdIds.push(registered.household.householdId)
const baseMessage = {
householdId: registered.household.householdId,
telegramChatId,
telegramThreadId: '777',
senderTelegramUserId: '10002',
senderDisplayName: 'Mia',
isBot: false
} as const
await historyClient.repository.saveMessage({
...baseMessage,
telegramMessageId: 'msg-1',
telegramUpdateId: 'upd-1',
rawText: '08:00',
messageSentAt: instantFromIso('2026-03-05T08:00:00.000Z')
})
await historyClient.repository.saveMessage({
...baseMessage,
telegramMessageId: 'msg-2',
telegramUpdateId: 'upd-2',
rawText: '10:00',
messageSentAt: instantFromIso('2026-03-05T10:00:00.000Z')
})
await historyClient.repository.saveMessage({
...baseMessage,
telegramMessageId: 'msg-3',
telegramUpdateId: 'upd-3',
rawText: '11:00',
messageSentAt: instantFromIso('2026-03-05T11:00:00.000Z')
})
await historyClient.repository.saveMessage({
...baseMessage,
telegramMessageId: 'msg-4',
telegramUpdateId: 'upd-4',
rawText: '12:00',
messageSentAt: instantFromIso('2026-03-05T12:00:00.000Z')
})
const rows = await historyClient.repository.listRecentChatMessages({
householdId: registered.household.householdId,
telegramChatId,
sentAtOrAfter: instantFromIso('2026-03-05T00:00:00.000Z'),
limit: 2
})
expect(rows.map((row) => row.rawText)).toEqual(['11:00', '12:00'])
await householdClient.close()
await historyClient.close()
})
})

View File

@@ -1,4 +1,4 @@
import { and, asc, desc, eq, gte, isNotNull } from 'drizzle-orm' import { and, desc, eq, gte, isNotNull } from 'drizzle-orm'
import { instantFromDatabaseValue, instantToDate } from '@household/domain' import { instantFromDatabaseValue, instantToDate } from '@household/domain'
import { createDbClient, schema } from '@household/db' import { createDbClient, schema } from '@household/db'
@@ -72,10 +72,10 @@ export function createDbTopicMessageHistoryRepository(databaseUrl: string): {
gte(schema.topicMessages.messageSentAt, instantToDate(input.sentAtOrAfter)) gte(schema.topicMessages.messageSentAt, instantToDate(input.sentAtOrAfter))
) )
) )
.orderBy(asc(schema.topicMessages.messageSentAt), asc(schema.topicMessages.createdAt)) .orderBy(desc(schema.topicMessages.messageSentAt), desc(schema.topicMessages.createdAt))
.limit(input.limit) .limit(input.limit)
return rows.map((row) => ({ return rows.reverse().map((row) => ({
householdId: row.householdId, householdId: row.householdId,
telegramChatId: row.telegramChatId, telegramChatId: row.telegramChatId,
telegramThreadId: row.telegramThreadId, telegramThreadId: row.telegramThreadId,