diff --git a/apps/bot/src/dm-assistant.test.ts b/apps/bot/src/dm-assistant.test.ts index 7cd6d05..65bd55b 100644 --- a/apps/bot/src/dm-assistant.test.ts +++ b/apps/bot/src/dm-assistant.test.ts @@ -1736,7 +1736,7 @@ Confirm or cancel below.`, }) }) - test('loads persisted thread and same-day chat history for memory-style prompts', async () => { + test('loads persisted thread and same-day chat history including bot replies', async () => { const bot = createTestBot() const calls: Array<{ method: string; payload: unknown }> = [] const topicMessageHistoryRepository = createTopicMessageHistoryRepository() @@ -1800,9 +1800,12 @@ Confirm or cancel below.`, await bot.handleUpdate(topicMessageUpdate('I think we need a TV in the house') as never) await bot.handleUpdate(topicMessageUpdate('Bot, do you remember what we said today?') as never) + await bot.handleUpdate(topicMessageUpdate('Bot, do you remember what you answered?') as never) expect(recentThreadTexts).toContain('I think we need a TV in the house') + expect(recentThreadTexts).toContain('Yes. You were discussing a TV for the house.') expect(sameDayTexts).toContain('I think we need a TV in the house') + expect(sameDayTexts).toContain('Yes. You were discussing a TV for the house.') expect(calls.at(-1)).toMatchObject({ method: 'sendMessage', payload: { diff --git a/apps/bot/src/dm-assistant.ts b/apps/bot/src/dm-assistant.ts index fdf2748..4e4b396 100644 --- a/apps/bot/src/dm-assistant.ts +++ b/apps/bot/src/dm-assistant.ts @@ -40,8 +40,11 @@ import { } from './topic-message-router' import { historyRecordToTurn, + persistTopicHistoryMessage, shouldLoadExpandedChatHistory, - startOfCurrentDayInTimezone + startOfCurrentDayInTimezone, + telegramMessageIdFromMessage, + telegramMessageSentAtFromMessage } from './topic-history' import { startTypingIndicator } from './telegram-chat-action' import { stripExplicitBotMention } from './telegram-mentions' @@ -392,12 +395,8 @@ async function persistIncomingTopicMessage(input: { rawText: string messageSentAt: ReturnType }) { - const normalizedText = input.rawText.trim() - if (!input.repository || normalizedText.length === 0) { - return - } - - await input.repository.saveMessage({ + await persistTopicHistoryMessage({ + repository: input.repository, householdId: input.householdId, telegramChatId: input.telegramChatId, telegramThreadId: input.telegramThreadId, @@ -406,11 +405,39 @@ async function persistIncomingTopicMessage(input: { senderTelegramUserId: input.senderTelegramUserId, senderDisplayName: input.senderDisplayName, isBot: false, - rawText: normalizedText, + rawText: input.rawText, messageSentAt: input.messageSentAt }) } +async function replyAndPersistTopicMessage(input: { + ctx: Context + repository: TopicMessageHistoryRepository | undefined + householdId: string + telegramChatId: string + telegramThreadId: string | null + text: string + replyOptions?: Parameters[1] +}) { + const reply = await input.ctx.reply(input.text, input.replyOptions) + + await persistTopicHistoryMessage({ + repository: input.repository, + householdId: input.householdId, + telegramChatId: input.telegramChatId, + telegramThreadId: input.telegramThreadId, + telegramMessageId: telegramMessageIdFromMessage(reply), + telegramUpdateId: null, + senderTelegramUserId: input.ctx.me?.id?.toString() ?? null, + senderDisplayName: null, + isBot: true, + rawText: input.text, + messageSentAt: telegramMessageSentAtFromMessage(reply) + }) + + return reply +} + async function routeGroupAssistantMessage(input: { router: TopicMessageRouter | undefined locale: BotLocale @@ -582,6 +609,8 @@ async function replyWithAssistant(input: { memoryStore: AssistantConversationMemoryStore usageTracker: AssistantUsageTracker logger: Logger | undefined + topicMessageHistoryRepository?: TopicMessageHistoryRepository + telegramThreadId: string | null recentThreadMessages: readonly { role: 'user' | 'assistant' speaker: string @@ -598,6 +627,18 @@ async function replyWithAssistant(input: { const t = getBotTranslations(input.locale).assistant if (!input.assistant) { + if (input.topicMessageHistoryRepository) { + await replyAndPersistTopicMessage({ + ctx: input.ctx, + repository: input.topicMessageHistoryRepository, + householdId: input.householdId, + telegramChatId: input.telegramChatId, + telegramThreadId: input.telegramThreadId, + text: t.unavailable + }) + return + } + await input.ctx.reply(t.unavailable) return } @@ -672,6 +713,18 @@ async function replyWithAssistant(input: { 'Assistant reply generated' ) + if (input.topicMessageHistoryRepository) { + await replyAndPersistTopicMessage({ + ctx: input.ctx, + repository: input.topicMessageHistoryRepository, + householdId: input.householdId, + telegramChatId: input.telegramChatId, + telegramThreadId: input.telegramThreadId, + text: reply.text + }) + return + } + await input.ctx.reply(reply.text) } catch (error) { input.logger?.error( @@ -688,6 +741,18 @@ async function replyWithAssistant(input: { }, 'Assistant reply failed' ) + if (input.topicMessageHistoryRepository) { + await replyAndPersistTopicMessage({ + ctx: input.ctx, + repository: input.topicMessageHistoryRepository, + householdId: input.householdId, + telegramChatId: input.telegramChatId, + telegramThreadId: input.telegramThreadId, + text: t.unavailable + }) + return + } + await input.ctx.reply(t.unavailable) } finally { typingIndicator.stop() @@ -1230,6 +1295,7 @@ export function registerDmAssistant(options: { memoryStore: options.memoryStore, usageTracker: options.usageTracker, logger: options.logger, + telegramThreadId: null, recentThreadMessages: [], sameDayChatMessages: [] }) @@ -1389,7 +1455,14 @@ export function registerDmAssistant(options: { role: 'assistant', text: route.replyText }) - await ctx.reply(route.replyText) + await replyAndPersistTopicMessage({ + ctx, + repository: options.topicMessageHistoryRepository, + householdId: household.householdId, + telegramChatId, + telegramThreadId, + text: route.replyText + }) } return } @@ -1429,14 +1502,29 @@ export function registerDmAssistant(options: { null ) - await ctx.reply(purchaseText, { - reply_markup: purchaseProposalReplyMarkup(locale, purchaseResult.purchaseMessageId) + await replyAndPersistTopicMessage({ + ctx, + repository: options.topicMessageHistoryRepository, + householdId: household.householdId, + telegramChatId, + telegramThreadId, + text: purchaseText, + replyOptions: { + reply_markup: purchaseProposalReplyMarkup(locale, purchaseResult.purchaseMessageId) + } }) return } if (purchaseResult.status === 'clarification_needed') { - await ctx.reply(buildPurchaseClarificationText(locale, purchaseResult)) + await replyAndPersistTopicMessage({ + ctx, + repository: options.topicMessageHistoryRepository, + householdId: household.householdId, + telegramChatId, + telegramThreadId, + text: buildPurchaseClarificationText(locale, purchaseResult) + }) return } } @@ -1451,7 +1539,14 @@ export function registerDmAssistant(options: { const t = getBotTranslations(locale).assistant if (!rateLimit.allowed) { - await ctx.reply(t.rateLimited(formatRetryDelay(locale, rateLimit.retryAfterMs))) + await replyAndPersistTopicMessage({ + ctx, + repository: options.topicMessageHistoryRepository, + householdId: household.householdId, + telegramChatId, + telegramThreadId, + text: t.rateLimited(formatRetryDelay(locale, rateLimit.retryAfterMs)) + }) return } @@ -1464,7 +1559,14 @@ export function registerDmAssistant(options: { }) if (paymentBalanceReply) { - await ctx.reply(formatPaymentBalanceReplyText(locale, paymentBalanceReply)) + await replyAndPersistTopicMessage({ + ctx, + repository: options.topicMessageHistoryRepository, + householdId: household.householdId, + telegramChatId, + telegramThreadId, + text: formatPaymentBalanceReplyText(locale, paymentBalanceReply) + }) return } @@ -1488,7 +1590,14 @@ export function registerDmAssistant(options: { text: memberInsightReply }) - await ctx.reply(memberInsightReply) + await replyAndPersistTopicMessage({ + ctx, + repository: options.topicMessageHistoryRepository, + householdId: household.householdId, + telegramChatId, + telegramThreadId, + text: memberInsightReply + }) return } @@ -1508,6 +1617,12 @@ export function registerDmAssistant(options: { memoryStore: options.memoryStore, usageTracker: options.usageTracker, logger: options.logger, + telegramThreadId, + ...(options.topicMessageHistoryRepository + ? { + topicMessageHistoryRepository: options.topicMessageHistoryRepository + } + : {}), recentThreadMessages, sameDayChatMessages: await listExpandedChatMessages({ repository: options.topicMessageHistoryRepository, diff --git a/apps/bot/src/payment-topic-ingestion.ts b/apps/bot/src/payment-topic-ingestion.ts index a2d62c5..26d1832 100644 --- a/apps/bot/src/payment-topic-ingestion.ts +++ b/apps/bot/src/payment-topic-ingestion.ts @@ -26,7 +26,12 @@ import { looksLikeDirectBotAddress, type TopicMessageRouter } from './topic-message-router' -import { historyRecordToTurn } from './topic-history' +import { + historyRecordToTurn, + persistTopicHistoryMessage, + telegramMessageIdFromMessage, + telegramMessageSentAtFromMessage +} from './topic-history' import { stripExplicitBotMention } from './telegram-mentions' const PAYMENT_TOPIC_CONFIRM_CALLBACK_PREFIX = 'payment_topic:confirm:' @@ -239,11 +244,8 @@ async function persistIncomingTopicMessage( repository: TopicMessageHistoryRepository | undefined, record: PaymentTopicRecord ) { - if (!repository || record.rawText.trim().length === 0) { - return - } - - await repository.saveMessage({ + await persistTopicHistoryMessage({ + repository, householdId: record.householdId, telegramChatId: record.chatId, telegramThreadId: record.threadId, @@ -252,7 +254,7 @@ async function persistIncomingTopicMessage( senderTelegramUserId: record.senderTelegramUserId, senderDisplayName: null, isBot: false, - rawText: record.rawText.trim(), + rawText: record.rawText, messageSentAt: record.messageSentAt }) } @@ -397,14 +399,18 @@ function paymentProposalReplyMarkup(locale: BotLocale, proposalId: string) { async function replyToPaymentMessage( ctx: Context, text: string, - replyMarkup?: { inline_keyboard: Array> } + replyMarkup?: { inline_keyboard: Array> }, + history?: { + repository: TopicMessageHistoryRepository | undefined + record: PaymentTopicRecord + } ): Promise { const message = ctx.msg if (!message) { return } - await ctx.reply(text, { + const reply = await ctx.reply(text, { reply_parameters: { message_id: message.message_id }, @@ -414,6 +420,20 @@ async function replyToPaymentMessage( } : {}) }) + + await persistTopicHistoryMessage({ + repository: history?.repository, + householdId: history?.record.householdId ?? '', + telegramChatId: history?.record.chatId ?? '', + telegramThreadId: history?.record.threadId ?? null, + telegramMessageId: telegramMessageIdFromMessage(reply), + telegramUpdateId: null, + senderTelegramUserId: ctx.me?.id?.toString() ?? null, + senderDisplayName: null, + isBot: true, + rawText: text, + messageSentAt: telegramMessageSentAtFromMessage(reply) + }) } export function registerConfiguredPaymentTopicIngestion( @@ -637,7 +657,10 @@ export function registerConfiguredPaymentTopicIngestion( if (route.route === 'chat_reply' || route.route === 'dismiss_workflow') { if (route.replyText) { - await replyToPaymentMessage(ctx, route.replyText) + await replyToPaymentMessage(ctx, route.replyText, undefined, { + repository: options.historyRepository, + record + }) appendConversation(options.memoryStore, record, record.rawText, route.replyText) } return @@ -665,7 +688,10 @@ export function registerConfiguredPaymentTopicIngestion( } const helperText = formatPaymentBalanceReplyText(locale, balanceReply) - await replyToPaymentMessage(ctx, helperText) + await replyToPaymentMessage(ctx, helperText, undefined, { + repository: options.historyRepository, + record + }) appendConversation(options.memoryStore, record, record.rawText, helperText) return } @@ -709,7 +735,10 @@ export function registerConfiguredPaymentTopicIngestion( expiresAt: nowInstant().add({ milliseconds: PAYMENT_TOPIC_ACTION_TTL_MS }) }) - await replyToPaymentMessage(ctx, t.clarification) + await replyToPaymentMessage(ctx, t.clarification, undefined, { + repository: options.historyRepository, + record + }) appendConversation(options.memoryStore, record, record.rawText, t.clarification) return } @@ -717,13 +746,19 @@ export function registerConfiguredPaymentTopicIngestion( await promptRepository.clearPendingAction(record.chatId, record.senderTelegramUserId) if (proposal.status === 'unsupported_currency') { - await replyToPaymentMessage(ctx, t.unsupportedCurrency) + await replyToPaymentMessage(ctx, t.unsupportedCurrency, undefined, { + repository: options.historyRepository, + record + }) appendConversation(options.memoryStore, record, record.rawText, t.unsupportedCurrency) return } if (proposal.status === 'no_balance') { - await replyToPaymentMessage(ctx, t.noBalance) + await replyToPaymentMessage(ctx, t.noBalance, undefined, { + repository: options.historyRepository, + record + }) appendConversation(options.memoryStore, record, record.rawText, t.noBalance) return } @@ -754,7 +789,11 @@ export function registerConfiguredPaymentTopicIngestion( await replyToPaymentMessage( ctx, proposalText, - paymentProposalReplyMarkup(locale, proposal.payload.proposalId) + paymentProposalReplyMarkup(locale, proposal.payload.proposalId), + { + repository: options.historyRepository, + record + } ) appendConversation(options.memoryStore, record, record.rawText, proposalText) } diff --git a/apps/bot/src/purchase-topic-ingestion.ts b/apps/bot/src/purchase-topic-ingestion.ts index 93c3eea..08eeb51 100644 --- a/apps/bot/src/purchase-topic-ingestion.ts +++ b/apps/bot/src/purchase-topic-ingestion.ts @@ -1,4 +1,10 @@ -import { instantFromEpochSeconds, instantToDate, Money, type Instant } from '@household/domain' +import { + instantFromEpochSeconds, + instantToDate, + Money, + nowInstant, + type Instant +} from '@household/domain' import { and, desc, eq } from 'drizzle-orm' import type { Bot, Context } from 'grammy' import type { Logger } from '@household/observability' @@ -23,7 +29,12 @@ import { type TopicMessageRouter, type TopicMessageRoutingResult } from './topic-message-router' -import { historyRecordToTurn } from './topic-history' +import { + historyRecordToTurn, + persistTopicHistoryMessage, + telegramMessageIdFromMessage, + telegramMessageSentAtFromMessage +} from './topic-history' import { startTypingIndicator } from './telegram-chat-action' import { stripExplicitBotMention } from './telegram-mentions' @@ -435,6 +446,10 @@ async function replyToPurchaseMessage( callback_data: string }> > + }, + history?: { + repository: TopicMessageHistoryRepository | undefined + record: PurchaseTopicRecord } ): Promise { const message = ctx.msg @@ -442,7 +457,7 @@ async function replyToPurchaseMessage( return } - await ctx.reply(text, { + const reply = await ctx.reply(text, { reply_parameters: { message_id: message.message_id }, @@ -452,6 +467,20 @@ async function replyToPurchaseMessage( } : {}) }) + + await persistTopicHistoryMessage({ + repository: history?.repository, + householdId: history?.record.householdId ?? '', + telegramChatId: history?.record.chatId ?? '', + telegramThreadId: history?.record.threadId ?? null, + telegramMessageId: telegramMessageIdFromMessage(reply), + telegramUpdateId: null, + senderTelegramUserId: ctx.me?.id?.toString() ?? null, + senderDisplayName: null, + isBot: true, + rawText: text, + messageSentAt: telegramMessageSentAtFromMessage(reply) + }) } interface PendingPurchaseReply { @@ -511,6 +540,10 @@ async function finalizePurchaseReply( callback_data: string }> > + }, + history?: { + repository: TopicMessageHistoryRepository | undefined + record: PurchaseTopicRecord } ): Promise { if (!text) { @@ -524,7 +557,7 @@ async function finalizePurchaseReply( } if (!pendingReply) { - await replyToPurchaseMessage(ctx, text, replyMarkup) + await replyToPurchaseMessage(ctx, text, replyMarkup, history) return } @@ -535,8 +568,22 @@ async function finalizePurchaseReply( text, replyMarkup ? { reply_markup: replyMarkup } : {} ) + + await persistTopicHistoryMessage({ + repository: history?.repository, + householdId: history?.record.householdId ?? '', + telegramChatId: history?.record.chatId ?? '', + telegramThreadId: history?.record.threadId ?? null, + telegramMessageId: pendingReply.messageId.toString(), + telegramUpdateId: null, + senderTelegramUserId: ctx.me?.id?.toString() ?? null, + senderDisplayName: null, + isBot: true, + rawText: text, + messageSentAt: nowInstant() + }) } catch { - await replyToPurchaseMessage(ctx, text, replyMarkup) + await replyToPurchaseMessage(ctx, text, replyMarkup, history) } } @@ -1500,11 +1547,8 @@ async function persistIncomingTopicMessage( repository: TopicMessageHistoryRepository | undefined, record: PurchaseTopicRecord ) { - if (!repository || record.rawText.trim().length === 0) { - return - } - - await repository.saveMessage({ + await persistTopicHistoryMessage({ + repository, householdId: record.householdId, telegramChatId: record.chatId, telegramThreadId: record.threadId, @@ -1513,7 +1557,7 @@ async function persistIncomingTopicMessage( senderTelegramUserId: record.senderTelegramUserId, senderDisplayName: record.senderDisplayName ?? null, isBot: false, - rawText: record.rawText.trim(), + rawText: record.rawText, messageSentAt: record.messageSentAt }) } @@ -1612,7 +1656,8 @@ async function handlePurchaseMessageResult( result: PurchaseMessageIngestionResult, locale: BotLocale, logger: Logger | undefined, - pendingReply: PendingPurchaseReply | null = null + pendingReply: PendingPurchaseReply | null = null, + historyRepository?: TopicMessageHistoryRepository ): Promise { if (result.status !== 'duplicate') { logger?.info( @@ -1644,6 +1689,12 @@ async function handlePurchaseMessageResult( result.purchaseMessageId, result.participants ) + : undefined, + historyRepository + ? { + repository: historyRepository, + record + } : undefined ) } @@ -1983,7 +2034,10 @@ export function registerPurchaseTopicIngestion( if (route.route === 'chat_reply' || route.route === 'dismiss_workflow') { rememberUserTurn(options.memoryStore, record) if (route.replyText) { - await replyToPurchaseMessage(ctx, route.replyText) + await replyToPurchaseMessage(ctx, route.replyText, undefined, { + repository: options.historyRepository, + record + }) rememberAssistantTurn(options.memoryStore, record, route.replyText) } return @@ -2012,7 +2066,15 @@ export function registerPurchaseTopicIngestion( if (result.status === 'ignored_not_purchase') { return await next() } - await handlePurchaseMessageResult(ctx, record, result, 'en', options.logger, pendingReply) + await handlePurchaseMessageResult( + ctx, + record, + result, + 'en', + options.logger, + pendingReply, + options.historyRepository + ) rememberAssistantTurn(options.memoryStore, record, buildPurchaseAcknowledgement(result, 'en')) } catch (error) { options.logger?.error( @@ -2114,7 +2176,10 @@ export function registerConfiguredPurchaseTopicIngestion( if (route.route === 'chat_reply' || route.route === 'dismiss_workflow') { rememberUserTurn(options.memoryStore, record) if (route.replyText) { - await replyToPurchaseMessage(ctx, route.replyText) + await replyToPurchaseMessage(ctx, route.replyText, undefined, { + repository: options.historyRepository, + record + }) rememberAssistantTurn(options.memoryStore, record, route.replyText) } return @@ -2151,7 +2216,15 @@ export function registerConfiguredPurchaseTopicIngestion( return await next() } - await handlePurchaseMessageResult(ctx, record, result, locale, options.logger, pendingReply) + await handlePurchaseMessageResult( + ctx, + record, + result, + locale, + options.logger, + pendingReply, + options.historyRepository + ) rememberAssistantTurn( options.memoryStore, record, diff --git a/apps/bot/src/topic-history.ts b/apps/bot/src/topic-history.ts index 851e5fd..945cee9 100644 --- a/apps/bot/src/topic-history.ts +++ b/apps/bot/src/topic-history.ts @@ -1,5 +1,5 @@ -import { nowInstant, Temporal, type Instant } from '@household/domain' -import type { TopicMessageHistoryRecord } from '@household/ports' +import { instantFromEpochSeconds, nowInstant, Temporal, type Instant } from '@household/domain' +import type { TopicMessageHistoryRecord, TopicMessageHistoryRepository } from '@household/ports' export interface TopicHistoryTurn { role: 'user' | 'assistant' @@ -45,6 +45,50 @@ export function historyRecordToTurn(record: TopicMessageHistoryRecord): TopicHis } } +export function telegramMessageIdFromMessage( + message: { message_id?: number } | null | undefined +): string | null { + return typeof message?.message_id === 'number' ? message.message_id.toString() : null +} + +export function telegramMessageSentAtFromMessage( + message: { date?: number } | null | undefined +): Instant | null { + return typeof message?.date === 'number' ? instantFromEpochSeconds(message.date) : null +} + +export async function persistTopicHistoryMessage(input: { + repository: TopicMessageHistoryRepository | undefined + householdId: string + telegramChatId: string + telegramThreadId: string | null + telegramMessageId: string | null + telegramUpdateId: string | null + senderTelegramUserId: string | null + senderDisplayName: string | null + isBot: boolean + rawText: string + messageSentAt: Instant | null +}) { + const normalizedText = input.rawText.trim() + if (!input.repository || normalizedText.length === 0) { + return + } + + await input.repository.saveMessage({ + householdId: input.householdId, + telegramChatId: input.telegramChatId, + telegramThreadId: input.telegramThreadId, + telegramMessageId: input.telegramMessageId, + telegramUpdateId: input.telegramUpdateId, + senderTelegramUserId: input.senderTelegramUserId, + senderDisplayName: input.senderDisplayName, + isBot: input.isBot, + rawText: normalizedText, + messageSentAt: input.messageSentAt + }) +} + export function formatThreadHistory(turns: readonly TopicHistoryTurn[]): string | null { const lines = turns .map((turn) => `${turn.speaker} (${turn.role}): ${turn.text}`) diff --git a/packages/adapters-db/src/topic-message-history-repository.test.ts b/packages/adapters-db/src/topic-message-history-repository.test.ts new file mode 100644 index 0000000..673706b --- /dev/null +++ b/packages/adapters-db/src/topic-message-history-repository.test.ts @@ -0,0 +1,94 @@ +import { randomUUID } from 'node:crypto' +import { afterAll, describe, expect, test } from 'bun:test' +import { inArray } from 'drizzle-orm' + +import { instantFromIso } from '@household/domain' +import { createDbClient, schema } from '@household/db' + +import { createDbHouseholdConfigurationRepository } from './household-config-repository' +import { createDbTopicMessageHistoryRepository } from './topic-message-history-repository' + +const databaseUrl = process.env.DATABASE_URL +const testIfDatabase = databaseUrl ? test : test.skip + +describe('createDbTopicMessageHistoryRepository', () => { + const createdHouseholdIds: string[] = [] + + afterAll(async () => { + if (!databaseUrl || createdHouseholdIds.length === 0) { + return + } + + const { db, queryClient } = createDbClient(databaseUrl, { + max: 1, + prepare: false + }) + + await db.delete(schema.households).where(inArray(schema.households.id, createdHouseholdIds)) + await queryClient.end({ timeout: 5 }) + }) + + testIfDatabase('lists the latest same-day chat messages in chronological order', async () => { + const householdClient = createDbHouseholdConfigurationRepository(databaseUrl!) + const historyClient = createDbTopicMessageHistoryRepository(databaseUrl!) + const telegramChatId = `-100${Date.now()}` + const registered = await householdClient.repository.registerTelegramHouseholdChat({ + householdName: `History Household ${randomUUID()}`, + telegramChatId, + telegramChatType: 'supergroup', + title: 'History Household' + }) + + createdHouseholdIds.push(registered.household.householdId) + + const baseMessage = { + householdId: registered.household.householdId, + telegramChatId, + telegramThreadId: '777', + senderTelegramUserId: '10002', + senderDisplayName: 'Mia', + isBot: false + } as const + + await historyClient.repository.saveMessage({ + ...baseMessage, + telegramMessageId: 'msg-1', + telegramUpdateId: 'upd-1', + rawText: '08:00', + messageSentAt: instantFromIso('2026-03-05T08:00:00.000Z') + }) + await historyClient.repository.saveMessage({ + ...baseMessage, + telegramMessageId: 'msg-2', + telegramUpdateId: 'upd-2', + rawText: '10:00', + messageSentAt: instantFromIso('2026-03-05T10:00:00.000Z') + }) + await historyClient.repository.saveMessage({ + ...baseMessage, + telegramMessageId: 'msg-3', + telegramUpdateId: 'upd-3', + rawText: '11:00', + messageSentAt: instantFromIso('2026-03-05T11:00:00.000Z') + }) + await historyClient.repository.saveMessage({ + ...baseMessage, + telegramMessageId: 'msg-4', + telegramUpdateId: 'upd-4', + rawText: '12:00', + messageSentAt: instantFromIso('2026-03-05T12:00:00.000Z') + }) + + const rows = await historyClient.repository.listRecentChatMessages({ + householdId: registered.household.householdId, + telegramChatId, + sentAtOrAfter: instantFromIso('2026-03-05T00:00:00.000Z'), + limit: 2 + }) + + expect(rows.map((row) => row.rawText)).toEqual(['11:00', '12:00']) + + await householdClient.close() + await historyClient.close() + }) +}) diff --git a/packages/adapters-db/src/topic-message-history-repository.ts b/packages/adapters-db/src/topic-message-history-repository.ts index 8d6576c..0e57c68 100644 --- a/packages/adapters-db/src/topic-message-history-repository.ts +++ b/packages/adapters-db/src/topic-message-history-repository.ts @@ -1,4 +1,4 @@ -import { and, asc, desc, eq, gte, isNotNull } from 'drizzle-orm' +import { and, desc, eq, gte, isNotNull } from 'drizzle-orm' import { instantFromDatabaseValue, instantToDate } from '@household/domain' import { createDbClient, schema } from '@household/db' @@ -72,10 +72,10 @@ export function createDbTopicMessageHistoryRepository(databaseUrl: string): { gte(schema.topicMessages.messageSentAt, instantToDate(input.sentAtOrAfter)) ) ) - .orderBy(asc(schema.topicMessages.messageSentAt), asc(schema.topicMessages.createdAt)) + .orderBy(desc(schema.topicMessages.messageSentAt), desc(schema.topicMessages.createdAt)) .limit(input.limit) - return rows.map((row) => ({ + return rows.reverse().map((row) => ({ householdId: row.householdId, telegramChatId: row.telegramChatId, telegramThreadId: row.telegramThreadId,