refactor(bot): replace reminder polling with scheduled dispatches

This commit is contained in:
2026-03-24 20:51:54 +04:00
parent a1acec5e60
commit 7f836eeee2
48 changed files with 6425 additions and 1557 deletions

View File

@@ -10,6 +10,7 @@
"lint": "oxlint \"src\""
},
"dependencies": {
"@aws-sdk/client-scheduler": "^3.913.0",
"@household/adapters-db": "workspace:*",
"@household/application": "workspace:*",
"@household/db": "workspace:*",

View File

@@ -1,118 +0,0 @@
import { describe, expect, test } from 'bun:test'
import { Temporal } from '@household/domain'
import type { AdHocNotificationService, DeliverableAdHocNotification } from '@household/application'
import { createAdHocNotificationJobsHandler } from './ad-hoc-notification-jobs'
function dueNotification(
input: Partial<DeliverableAdHocNotification['notification']> = {}
): DeliverableAdHocNotification {
return {
notification: {
id: input.id ?? 'notif-1',
householdId: input.householdId ?? 'household-1',
creatorMemberId: input.creatorMemberId ?? 'creator',
assigneeMemberId: input.assigneeMemberId ?? 'assignee',
originalRequestText: 'raw',
notificationText:
input.notificationText ?? 'Dima, time to check whether Georgiy has called already.',
timezone: input.timezone ?? 'Asia/Tbilisi',
scheduledFor: input.scheduledFor ?? Temporal.Instant.from('2026-03-23T09:00:00Z'),
timePrecision: input.timePrecision ?? 'exact',
deliveryMode: input.deliveryMode ?? 'topic',
dmRecipientMemberIds: input.dmRecipientMemberIds ?? [],
friendlyTagAssignee: input.friendlyTagAssignee ?? true,
status: input.status ?? 'scheduled',
sourceTelegramChatId: null,
sourceTelegramThreadId: null,
sentAt: null,
cancelledAt: null,
cancelledByMemberId: null,
createdAt: Temporal.Instant.from('2026-03-22T09:00:00Z'),
updatedAt: Temporal.Instant.from('2026-03-22T09:00:00Z')
},
creator: {
memberId: 'creator',
telegramUserId: '111',
displayName: 'Dima'
},
assignee: {
memberId: 'assignee',
telegramUserId: '222',
displayName: 'Georgiy'
},
dmRecipients: [
{
memberId: 'recipient',
telegramUserId: '333',
displayName: 'Alice'
}
]
}
}
describe('createAdHocNotificationJobsHandler', () => {
test('delivers topic notifications and marks them sent', async () => {
const sentTopicMessages: string[] = []
const sentNotifications: string[] = []
const service: AdHocNotificationService = {
scheduleNotification: async () => {
throw new Error('not used')
},
listUpcomingNotifications: async () => [],
cancelNotification: async () => ({ status: 'not_found' }),
updateNotification: async () => ({ status: 'not_found' }),
listDueNotifications: async () => [dueNotification()],
claimDueNotification: async () => true,
releaseDueNotification: async () => {},
markNotificationSent: async (notificationId) => {
sentNotifications.push(notificationId)
return null
}
}
const handler = createAdHocNotificationJobsHandler({
notificationService: service,
householdConfigurationRepository: {
async getHouseholdChatByHouseholdId() {
return {
householdId: 'household-1',
householdName: 'Kojori',
telegramChatId: '777',
telegramChatType: 'supergroup',
title: 'Kojori',
defaultLocale: 'ru'
}
},
async getHouseholdTopicBinding() {
return {
householdId: 'household-1',
role: 'reminders',
telegramThreadId: '103',
topicName: 'Reminders'
}
}
},
sendTopicMessage: async (input) => {
sentTopicMessages.push(`${input.chatId}:${input.threadId}:${input.text}`)
},
sendDirectMessage: async () => {}
})
const response = await handler.handle(
new Request('http://localhost/jobs/notifications/due', {
method: 'POST'
})
)
const payload = (await response.json()) as { ok: boolean; notifications: { outcome: string }[] }
expect(payload.ok).toBe(true)
expect(payload.notifications[0]?.outcome).toBe('sent')
expect(sentTopicMessages[0]).toContain(
'Dima, time to check whether Georgiy has called already.'
)
expect(sentNotifications).toEqual(['notif-1'])
})
})

View File

@@ -1,192 +0,0 @@
import type { AdHocNotificationService, DeliverableAdHocNotification } from '@household/application'
import { nowInstant } from '@household/domain'
import type { Logger } from '@household/observability'
import type { HouseholdConfigurationRepository } from '@household/ports'
import { buildTopicNotificationText } from './ad-hoc-notifications'
interface DueNotificationJobRequestBody {
dryRun?: boolean
jobId?: string
}
function json(body: object, status = 200): Response {
return new Response(JSON.stringify(body), {
status,
headers: {
'content-type': 'application/json; charset=utf-8'
}
})
}
async function readBody(request: Request): Promise<DueNotificationJobRequestBody> {
const text = await request.text()
if (text.trim().length === 0) {
return {}
}
try {
return JSON.parse(text) as DueNotificationJobRequestBody
} catch {
throw new Error('Invalid JSON body')
}
}
export function createAdHocNotificationJobsHandler(options: {
notificationService: AdHocNotificationService
householdConfigurationRepository: Pick<
HouseholdConfigurationRepository,
'getHouseholdChatByHouseholdId' | 'getHouseholdTopicBinding'
>
sendTopicMessage: (input: {
householdId: string
chatId: string
threadId: string | null
text: string
parseMode?: 'HTML'
}) => Promise<void>
sendDirectMessage: (input: { telegramUserId: string; text: string }) => Promise<void>
logger?: Logger
}): {
handle: (request: Request) => Promise<Response>
} {
async function deliver(notification: DeliverableAdHocNotification) {
switch (notification.notification.deliveryMode) {
case 'topic': {
const [chat, reminderTopic] = await Promise.all([
options.householdConfigurationRepository.getHouseholdChatByHouseholdId(
notification.notification.householdId
),
options.householdConfigurationRepository.getHouseholdTopicBinding(
notification.notification.householdId,
'reminders'
)
])
if (!chat) {
throw new Error(
`Household chat not configured for ${notification.notification.householdId}`
)
}
const content = buildTopicNotificationText({
notificationText: notification.notification.notificationText
})
await options.sendTopicMessage({
householdId: notification.notification.householdId,
chatId: chat.telegramChatId,
threadId: reminderTopic?.telegramThreadId ?? null,
text: content.text,
parseMode: content.parseMode
})
return
}
case 'dm_all':
case 'dm_selected': {
for (const recipient of notification.dmRecipients) {
await options.sendDirectMessage({
telegramUserId: recipient.telegramUserId,
text: notification.notification.notificationText
})
}
return
}
}
}
return {
handle: async (request) => {
if (request.method !== 'POST') {
return json({ ok: false, error: 'Method Not Allowed' }, 405)
}
try {
const body = await readBody(request)
const now = nowInstant()
const due = await options.notificationService.listDueNotifications(now)
const dispatches: Array<{
notificationId: string
householdId: string
outcome: 'dry-run' | 'sent' | 'duplicate' | 'failed'
error?: string
}> = []
for (const notification of due) {
if (body.dryRun === true) {
dispatches.push({
notificationId: notification.notification.id,
householdId: notification.notification.householdId,
outcome: 'dry-run'
})
continue
}
const claimed = await options.notificationService.claimDueNotification(
notification.notification.id
)
if (!claimed) {
dispatches.push({
notificationId: notification.notification.id,
householdId: notification.notification.householdId,
outcome: 'duplicate'
})
continue
}
try {
await deliver(notification)
await options.notificationService.markNotificationSent(
notification.notification.id,
now
)
dispatches.push({
notificationId: notification.notification.id,
householdId: notification.notification.householdId,
outcome: 'sent'
})
} catch (error) {
await options.notificationService.releaseDueNotification(notification.notification.id)
dispatches.push({
notificationId: notification.notification.id,
householdId: notification.notification.householdId,
outcome: 'failed',
error: error instanceof Error ? error.message : 'Unknown delivery error'
})
}
}
options.logger?.info(
{
event: 'scheduler.ad_hoc_notifications.dispatch',
notificationCount: dispatches.length,
jobId: body.jobId ?? request.headers.get('x-cloudscheduler-jobname') ?? null,
dryRun: body.dryRun === true
},
'Ad hoc notification job completed'
)
return json({
ok: true,
dryRun: body.dryRun === true,
notifications: dispatches
})
} catch (error) {
options.logger?.error(
{
event: 'scheduler.ad_hoc_notifications.failed',
error: error instanceof Error ? error.message : String(error)
},
'Ad hoc notification job failed'
)
return json(
{
ok: false,
error: error instanceof Error ? error.message : 'Unknown error'
},
500
)
}
}
}
}

View File

@@ -1,5 +1,4 @@
import { webhookCallback } from 'grammy'
import type { InlineKeyboardMarkup } from 'grammy/types'
import {
createAdHocNotificationService,
@@ -11,7 +10,7 @@ import {
createLocalePreferenceService,
createMiniAppAdminService,
createPaymentConfirmationService,
createReminderJobService
createScheduledDispatchService
} from '@household/application'
import {
createDbAdHocNotificationRepository,
@@ -19,13 +18,12 @@ import {
createDbFinanceRepository,
createDbHouseholdConfigurationRepository,
createDbProcessedBotMessageRepository,
createDbReminderDispatchRepository,
createDbScheduledDispatchRepository,
createDbTelegramPendingActionRepository,
createDbTopicMessageHistoryRepository
} from '@household/adapters-db'
import { configureLogger, getLogger } from '@household/observability'
import { createAdHocNotificationJobsHandler } from './ad-hoc-notification-jobs'
import { registerAdHocNotifications } from './ad-hoc-notifications'
import { registerAnonymousFeedback } from './anonymous-feedback'
import {
@@ -39,6 +37,8 @@ import { createTelegramBot } from './bot'
import { getBotRuntimeConfig, type BotRuntimeConfig } from './config'
import { registerHouseholdSetupCommands } from './household-setup'
import { HouseholdContextCache } from './household-context-cache'
import { createAwsScheduledDispatchScheduler } from './aws-scheduled-dispatch-scheduler'
import { createGcpScheduledDispatchScheduler } from './gcp-scheduled-dispatch-scheduler'
import { createMiniAppAuthHandler, createMiniAppJoinHandler } from './miniapp-auth'
import {
createMiniAppApproveMemberHandler,
@@ -86,9 +86,9 @@ import {
registerConfiguredPurchaseTopicIngestion
} from './purchase-topic-ingestion'
import { registerConfiguredPaymentTopicIngestion } from './payment-topic-ingestion'
import { createReminderJobsHandler } from './reminder-jobs'
import { registerReminderTopicUtilities } from './reminder-topic-utilities'
import { createSchedulerRequestAuthorizer } from './scheduler-auth'
import { createScheduledDispatchHandler } from './scheduled-dispatch-handler'
import { createBotWebhookServer } from './server'
import { createTopicProcessor } from './topic-processor'
@@ -134,8 +134,42 @@ export async function createBotRuntimeApp(): Promise<BotRuntimeApp> {
repository: householdConfigurationRepositoryClient.repository
})
: null
const scheduledDispatchRepositoryClient =
runtime.databaseUrl && runtime.scheduledDispatch
? createDbScheduledDispatchRepository(runtime.databaseUrl)
: null
const scheduledDispatchScheduler =
runtime.scheduledDispatch && runtime.schedulerSharedSecret
? runtime.scheduledDispatch.provider === 'gcp-cloud-tasks'
? createGcpScheduledDispatchScheduler({
projectId: runtime.scheduledDispatch.projectId,
location: runtime.scheduledDispatch.location,
queue: runtime.scheduledDispatch.queue,
publicBaseUrl: runtime.scheduledDispatch.publicBaseUrl,
sharedSecret: runtime.schedulerSharedSecret
})
: createAwsScheduledDispatchScheduler({
region: runtime.scheduledDispatch.region,
targetLambdaArn: runtime.scheduledDispatch.targetLambdaArn,
roleArn: runtime.scheduledDispatch.roleArn,
groupName: runtime.scheduledDispatch.groupName
})
: null
const scheduledDispatchService =
scheduledDispatchRepositoryClient &&
scheduledDispatchScheduler &&
householdConfigurationRepositoryClient
? createScheduledDispatchService({
repository: scheduledDispatchRepositoryClient.repository,
scheduler: scheduledDispatchScheduler,
householdConfigurationRepository: householdConfigurationRepositoryClient.repository
})
: null
const miniAppAdminService = householdConfigurationRepositoryClient
? createMiniAppAdminService(householdConfigurationRepositoryClient.repository)
? createMiniAppAdminService(
householdConfigurationRepositoryClient.repository,
scheduledDispatchService ?? undefined
)
: null
const localePreferenceService = householdConfigurationRepositoryClient
? createLocalePreferenceService(householdConfigurationRepositoryClient.repository)
@@ -200,7 +234,12 @@ export async function createBotRuntimeApp(): Promise<BotRuntimeApp> {
adHocNotificationRepositoryClient && householdConfigurationRepositoryClient
? createAdHocNotificationService({
repository: adHocNotificationRepositoryClient.repository,
householdConfigurationRepository: householdConfigurationRepositoryClient.repository
householdConfigurationRepository: householdConfigurationRepositoryClient.repository,
...(scheduledDispatchService
? {
scheduledDispatchService
}
: {})
})
: null
@@ -289,6 +328,10 @@ export async function createBotRuntimeApp(): Promise<BotRuntimeApp> {
shutdownTasks.push(adHocNotificationRepositoryClient.close)
}
if (scheduledDispatchRepositoryClient) {
shutdownTasks.push(scheduledDispatchRepositoryClient.close)
}
if (purchaseRepositoryClient && householdConfigurationRepositoryClient) {
registerConfiguredPurchaseTopicIngestion(
bot,
@@ -375,7 +418,8 @@ export async function createBotRuntimeApp(): Promise<BotRuntimeApp> {
registerHouseholdSetupCommands({
bot,
householdSetupService: createHouseholdSetupService(
householdConfigurationRepositoryClient.repository
householdConfigurationRepositoryClient.repository,
scheduledDispatchService ?? undefined
),
householdAdminService: createHouseholdAdminService(
householdConfigurationRepositoryClient.repository
@@ -419,65 +463,13 @@ export async function createBotRuntimeApp(): Promise<BotRuntimeApp> {
})
}
const reminderJobs = runtime.reminderJobsEnabled
? (() => {
const reminderRepositoryClient = createDbReminderDispatchRepository(runtime.databaseUrl!)
const reminderService = createReminderJobService(reminderRepositoryClient.repository)
shutdownTasks.push(reminderRepositoryClient.close)
return createReminderJobsHandler({
listReminderTargets: () =>
householdConfigurationRepositoryClient!.repository.listReminderTargets(),
ensureBillingCycle: async ({ householdId, at }) => {
await financeServiceForHousehold(householdId).ensureExpectedCycle(at)
},
releaseReminderDispatch: (input) =>
reminderRepositoryClient.repository.releaseReminderDispatch(input),
sendReminderMessage: async (target, content) => {
const threadId =
target.telegramThreadId !== null ? Number(target.telegramThreadId) : undefined
if (target.telegramThreadId !== null && (!threadId || !Number.isInteger(threadId))) {
throw new Error(
`Invalid reminder thread id for household ${target.householdId}: ${target.telegramThreadId}`
)
}
await bot.api.sendMessage(target.telegramChatId, content.text, {
...(threadId
? {
message_thread_id: threadId
}
: {}),
...(content.replyMarkup
? {
reply_markup: content.replyMarkup as InlineKeyboardMarkup
}
: {})
})
},
reminderService,
...(runtime.miniAppUrl
? {
miniAppUrl: runtime.miniAppUrl
}
: {}),
...(bot.botInfo?.username
? {
botUsername: bot.botInfo.username
}
: {}),
logger: getLogger('scheduler')
})
})()
: null
const adHocNotificationJobs =
runtime.reminderJobsEnabled &&
adHocNotificationService &&
const scheduledDispatchHandler =
scheduledDispatchService &&
adHocNotificationRepositoryClient &&
householdConfigurationRepositoryClient
? createAdHocNotificationJobsHandler({
notificationService: adHocNotificationService,
? createScheduledDispatchHandler({
scheduledDispatchService,
adHocNotificationRepository: adHocNotificationRepositoryClient.repository,
householdConfigurationRepository: householdConfigurationRepositoryClient.repository,
sendTopicMessage: async (input) => {
const threadId = input.threadId ? Number(input.threadId) : undefined
@@ -491,23 +483,38 @@ export async function createBotRuntimeApp(): Promise<BotRuntimeApp> {
? {
parse_mode: input.parseMode
}
: {}),
...(input.replyMarkup
? {
reply_markup: input.replyMarkup
}
: {})
})
},
sendDirectMessage: async (input) => {
await bot.api.sendMessage(input.telegramUserId, input.text)
},
...(runtime.miniAppUrl
? {
miniAppUrl: runtime.miniAppUrl
}
: {}),
...(bot.botInfo?.username
? {
botUsername: bot.botInfo.username
}
: {}),
logger: getLogger('scheduler')
})
: null
if (!runtime.reminderJobsEnabled) {
if (!scheduledDispatchHandler) {
logger.warn(
{
event: 'runtime.feature_disabled',
feature: 'reminder-jobs'
feature: 'scheduled-dispatch'
},
'Reminder jobs are disabled. Set DATABASE_URL and either SCHEDULER_SHARED_SECRET or SCHEDULER_OIDC_ALLOWED_EMAILS to enable.'
'Scheduled dispatch is disabled. Configure DATABASE_URL, SCHEDULED_DISPATCH_PROVIDER, and scheduler auth to enable reminder delivery.'
)
}
@@ -918,7 +925,7 @@ export async function createBotRuntimeApp(): Promise<BotRuntimeApp> {
})
: undefined,
scheduler:
(reminderJobs || adHocNotificationJobs) && runtime.schedulerSharedSecret
scheduledDispatchHandler && runtime.schedulerSharedSecret
? {
pathPrefix: '/jobs',
authorize: createSchedulerRequestAuthorizer({
@@ -926,37 +933,25 @@ export async function createBotRuntimeApp(): Promise<BotRuntimeApp> {
oidcAllowedEmails: runtime.schedulerOidcAllowedEmails
}).authorize,
handler: async (request, jobPath) => {
if (jobPath.startsWith('reminder/')) {
return reminderJobs
? reminderJobs.handle(request, jobPath.slice('reminder/'.length))
: new Response('Not Found', { status: 404 })
}
if (jobPath === 'notifications/due') {
return adHocNotificationJobs
? adHocNotificationJobs.handle(request)
if (jobPath.startsWith('dispatch/')) {
return scheduledDispatchHandler
? scheduledDispatchHandler.handle(request, jobPath.slice('dispatch/'.length))
: new Response('Not Found', { status: 404 })
}
return new Response('Not Found', { status: 404 })
}
}
: reminderJobs || adHocNotificationJobs
: scheduledDispatchHandler
? {
pathPrefix: '/jobs',
authorize: createSchedulerRequestAuthorizer({
oidcAllowedEmails: runtime.schedulerOidcAllowedEmails
}).authorize,
handler: async (request, jobPath) => {
if (jobPath.startsWith('reminder/')) {
return reminderJobs
? reminderJobs.handle(request, jobPath.slice('reminder/'.length))
: new Response('Not Found', { status: 404 })
}
if (jobPath === 'notifications/due') {
return adHocNotificationJobs
? adHocNotificationJobs.handle(request)
if (jobPath.startsWith('dispatch/')) {
return scheduledDispatchHandler
? scheduledDispatchHandler.handle(request, jobPath.slice('dispatch/'.length))
: new Response('Not Found', { status: 404 })
}
@@ -966,6 +961,10 @@ export async function createBotRuntimeApp(): Promise<BotRuntimeApp> {
: undefined
})
if (scheduledDispatchService) {
await scheduledDispatchService.reconcileAllBuiltInDispatches()
}
return {
fetch: server.fetch,
runtime,

View File

@@ -0,0 +1,46 @@
import { describe, expect, test } from 'bun:test'
import { Temporal } from '@household/domain'
import { createAwsScheduledDispatchScheduler } from './aws-scheduled-dispatch-scheduler'
describe('createAwsScheduledDispatchScheduler', () => {
test('creates one-shot EventBridge schedules targeting the bot lambda', async () => {
const calls: unknown[] = []
const scheduler = createAwsScheduledDispatchScheduler({
region: 'eu-central-1',
targetLambdaArn: 'arn:aws:lambda:eu-central-1:123:function:bot',
roleArn: 'arn:aws:iam::123:role/scheduler',
groupName: 'dispatches',
client: {
send: async (command) => {
calls.push(command.input)
return {}
}
}
})
const result = await scheduler.scheduleOneShotDispatch({
dispatchId: 'dispatch-1',
dueAt: Temporal.Instant.from('2026-03-24T12:00:00Z')
})
expect(result.providerDispatchId).toContain('dispatch-dispatch-1-')
expect(calls[0]).toMatchObject({
GroupName: 'dispatches',
ScheduleExpression: 'at(2026-03-24T12:00:00Z)',
ActionAfterCompletion: 'DELETE',
FlexibleTimeWindow: {
Mode: 'OFF'
},
Target: {
Arn: 'arn:aws:lambda:eu-central-1:123:function:bot',
RoleArn: 'arn:aws:iam::123:role/scheduler',
Input: JSON.stringify({
source: 'household.scheduled-dispatch',
dispatchId: 'dispatch-1'
})
}
})
})
})

View File

@@ -0,0 +1,79 @@
import {
CreateScheduleCommand,
DeleteScheduleCommand,
SchedulerClient
} from '@aws-sdk/client-scheduler'
import type {
ScheduleOneShotDispatchInput,
ScheduleOneShotDispatchResult,
ScheduledDispatchScheduler
} from '@household/ports'
function scheduleName(dispatchId: string): string {
return `dispatch-${dispatchId}-${crypto.randomUUID().slice(0, 8)}`
}
function atExpression(dueAtIso: string): string {
return `at(${dueAtIso.replace(/\.\d{3}Z$/, 'Z')})`
}
export function createAwsScheduledDispatchScheduler(input: {
region: string
targetLambdaArn: string
roleArn: string
groupName: string
client?: Pick<SchedulerClient, 'send'>
}): ScheduledDispatchScheduler {
const client = input.client ?? new SchedulerClient({ region: input.region })
return {
provider: 'aws-eventbridge',
async scheduleOneShotDispatch(
dispatchInput: ScheduleOneShotDispatchInput
): Promise<ScheduleOneShotDispatchResult> {
const name = scheduleName(dispatchInput.dispatchId)
await client.send(
new CreateScheduleCommand({
Name: name,
GroupName: input.groupName,
ScheduleExpression: atExpression(dispatchInput.dueAt.toString()),
FlexibleTimeWindow: {
Mode: 'OFF'
},
ActionAfterCompletion: 'DELETE',
Target: {
Arn: input.targetLambdaArn,
RoleArn: input.roleArn,
Input: JSON.stringify({
source: 'household.scheduled-dispatch',
dispatchId: dispatchInput.dispatchId
})
}
})
)
return {
providerDispatchId: name
}
},
async cancelDispatch(providerDispatchId) {
try {
await client.send(
new DeleteScheduleCommand({
Name: providerDispatchId,
GroupName: input.groupName
})
)
} catch (error) {
const code = (error as { name?: string }).name
if (code === 'ResourceNotFoundException') {
return
}
throw error
}
}
}
}

View File

@@ -13,7 +13,22 @@ export interface BotRuntimeConfig {
miniAppAuthEnabled: boolean
schedulerSharedSecret?: string
schedulerOidcAllowedEmails: readonly string[]
reminderJobsEnabled: boolean
scheduledDispatch?:
| {
provider: 'gcp-cloud-tasks'
publicBaseUrl: string
projectId: string
location: string
queue: string
}
| {
provider: 'aws-eventbridge'
region: string
targetLambdaArn: string
roleArn: string
groupName: string
}
| undefined
openaiApiKey?: string
purchaseParserModel: string
assistantModel: string
@@ -86,6 +101,56 @@ function parseOptionalCsv(value: string | undefined): readonly string[] {
.filter(Boolean)
}
function parseScheduledDispatchConfig(
env: NodeJS.ProcessEnv
): BotRuntimeConfig['scheduledDispatch'] {
const provider = parseOptionalValue(env.SCHEDULED_DISPATCH_PROVIDER)
if (!provider) {
return undefined
}
if (provider === 'gcp-cloud-tasks') {
const publicBaseUrl = parseOptionalValue(env.SCHEDULED_DISPATCH_PUBLIC_BASE_URL)
const projectId = parseOptionalValue(env.GCP_SCHEDULED_DISPATCH_PROJECT_ID)
const location = parseOptionalValue(env.GCP_SCHEDULED_DISPATCH_LOCATION)
const queue = parseOptionalValue(env.GCP_SCHEDULED_DISPATCH_QUEUE)
if (!publicBaseUrl || !projectId || !location || !queue) {
throw new Error(
'GCP scheduled dispatch requires SCHEDULED_DISPATCH_PUBLIC_BASE_URL, GCP_SCHEDULED_DISPATCH_PROJECT_ID, GCP_SCHEDULED_DISPATCH_LOCATION, and GCP_SCHEDULED_DISPATCH_QUEUE'
)
}
return {
provider,
publicBaseUrl,
projectId,
location,
queue
}
}
if (provider === 'aws-eventbridge') {
const region = parseOptionalValue(env.AWS_SCHEDULED_DISPATCH_REGION)
const targetLambdaArn = parseOptionalValue(env.AWS_SCHEDULED_DISPATCH_TARGET_LAMBDA_ARN)
const roleArn = parseOptionalValue(env.AWS_SCHEDULED_DISPATCH_ROLE_ARN)
if (!region || !targetLambdaArn || !roleArn) {
throw new Error(
'AWS scheduled dispatch requires AWS_SCHEDULED_DISPATCH_REGION, AWS_SCHEDULED_DISPATCH_TARGET_LAMBDA_ARN, and AWS_SCHEDULED_DISPATCH_ROLE_ARN'
)
}
return {
provider,
region,
targetLambdaArn,
roleArn,
groupName: parseOptionalValue(env.AWS_SCHEDULED_DISPATCH_GROUP_NAME) ?? 'default'
}
}
throw new Error(`Invalid SCHEDULED_DISPATCH_PROVIDER value: ${provider}`)
}
function parsePositiveInteger(raw: string | undefined, fallback: number, key: string): number {
if (raw === undefined) {
return fallback
@@ -105,6 +170,7 @@ export function getBotRuntimeConfig(env: NodeJS.ProcessEnv = process.env): BotRu
const schedulerOidcAllowedEmails = parseOptionalCsv(env.SCHEDULER_OIDC_ALLOWED_EMAILS)
const miniAppAllowedOrigins = parseOptionalCsv(env.MINI_APP_ALLOWED_ORIGINS)
const miniAppUrl = parseOptionalValue(env.MINI_APP_URL)
const scheduledDispatch = parseScheduledDispatchConfig(env)
const purchaseTopicIngestionEnabled = databaseUrl !== undefined
@@ -112,9 +178,6 @@ export function getBotRuntimeConfig(env: NodeJS.ProcessEnv = process.env): BotRu
const anonymousFeedbackEnabled = databaseUrl !== undefined
const assistantEnabled = databaseUrl !== undefined
const miniAppAuthEnabled = databaseUrl !== undefined
const hasSchedulerOidcConfig = schedulerOidcAllowedEmails.length > 0
const reminderJobsEnabled =
databaseUrl !== undefined && (schedulerSharedSecret !== undefined || hasSchedulerOidcConfig)
const runtime: BotRuntimeConfig = {
port: parsePort(env.PORT),
@@ -129,7 +192,6 @@ export function getBotRuntimeConfig(env: NodeJS.ProcessEnv = process.env): BotRu
miniAppAllowedOrigins,
miniAppAuthEnabled,
schedulerOidcAllowedEmails,
reminderJobsEnabled,
purchaseParserModel: env.PURCHASE_PARSER_MODEL?.trim() || 'gpt-4o-mini',
assistantModel: env.ASSISTANT_MODEL?.trim() || 'gpt-4o-mini',
topicProcessorModel: env.TOPIC_PROCESSOR_MODEL?.trim() || 'gpt-4o-mini',
@@ -176,6 +238,9 @@ export function getBotRuntimeConfig(env: NodeJS.ProcessEnv = process.env): BotRu
if (schedulerSharedSecret !== undefined) {
runtime.schedulerSharedSecret = schedulerSharedSecret
}
if (scheduledDispatch !== undefined) {
runtime.scheduledDispatch = scheduledDispatch
}
if (miniAppUrl !== undefined) {
runtime.miniAppUrl = miniAppUrl
}

View File

@@ -0,0 +1,52 @@
import { describe, expect, test } from 'bun:test'
import { Temporal } from '@household/domain'
import { createGcpScheduledDispatchScheduler } from './gcp-scheduled-dispatch-scheduler'
describe('createGcpScheduledDispatchScheduler', () => {
test('creates Cloud Tasks HTTP tasks for one-shot dispatches', async () => {
const requests: Array<{ url: string; init: RequestInit | undefined }> = []
const scheduler = createGcpScheduledDispatchScheduler({
projectId: 'project-1',
location: 'europe-west1',
queue: 'dispatches',
publicBaseUrl: 'https://bot.example.com',
sharedSecret: 'secret-1',
auth: {
getAccessToken: async () => 'access-token'
},
fetchImpl: (async (url, init) => {
requests.push({
url: String(url),
init
})
return new Response(JSON.stringify({ name: 'tasks/dispatch-1' }), {
status: 200,
headers: {
'content-type': 'application/json'
}
})
}) as typeof fetch
})
const result = await scheduler.scheduleOneShotDispatch({
dispatchId: 'dispatch-1',
dueAt: Temporal.Instant.from('2026-03-24T12:00:00Z')
})
expect(result.providerDispatchId).toBe('tasks/dispatch-1')
expect(requests[0]?.url).toBe(
'https://cloudtasks.googleapis.com/v2/projects/project-1/locations/europe-west1/queues/dispatches/tasks'
)
const payload = JSON.parse(String(requests[0]?.init?.body)) as {
task: {
httpRequest: { url: string; headers: Record<string, string> }
scheduleTime: { seconds: string }
}
}
expect(payload.task.httpRequest.url).toBe('https://bot.example.com/jobs/dispatch/dispatch-1')
expect(payload.task.httpRequest.headers['x-household-scheduler-secret']).toBe('secret-1')
expect(payload.task.scheduleTime.seconds).toBe('1774353600')
})
})

View File

@@ -0,0 +1,122 @@
import { GoogleAuth } from 'google-auth-library'
import type {
ScheduleOneShotDispatchInput,
ScheduleOneShotDispatchResult,
ScheduledDispatchScheduler
} from '@household/ports'
function scheduleTimestamp(input: ScheduleOneShotDispatchInput): {
seconds: string
nanos: number
} {
const milliseconds = input.dueAt.epochMilliseconds
const seconds = Math.floor(milliseconds / 1000)
return {
seconds: String(seconds),
nanos: (milliseconds - seconds * 1000) * 1_000_000
}
}
function callbackUrl(baseUrl: string, dispatchId: string): string {
return `${baseUrl.replace(/\/$/, '')}/jobs/dispatch/${dispatchId}`
}
export function createGcpScheduledDispatchScheduler(input: {
projectId: string
location: string
queue: string
publicBaseUrl: string
sharedSecret: string
auth?: Pick<GoogleAuth, 'getAccessToken'>
fetchImpl?: typeof fetch
}): ScheduledDispatchScheduler {
const auth =
input.auth ??
new GoogleAuth({
scopes: ['https://www.googleapis.com/auth/cloud-platform']
})
const fetchImpl = input.fetchImpl ?? fetch
async function authorizedHeaders() {
const accessToken = await auth.getAccessToken()
if (!accessToken) {
throw new Error('Failed to acquire Google Cloud access token for scheduled dispatch')
}
const token =
typeof accessToken === 'string'
? accessToken
: ((accessToken as { token?: string }).token ?? null)
if (!token) {
throw new Error('Failed to read Google Cloud access token for scheduled dispatch')
}
return {
authorization: `Bearer ${token}`,
'content-type': 'application/json'
}
}
return {
provider: 'gcp-cloud-tasks',
async scheduleOneShotDispatch(dispatchInput): Promise<ScheduleOneShotDispatchResult> {
const response = await fetchImpl(
`https://cloudtasks.googleapis.com/v2/projects/${input.projectId}/locations/${input.location}/queues/${input.queue}/tasks`,
{
method: 'POST',
headers: await authorizedHeaders(),
body: JSON.stringify({
task: {
scheduleTime: scheduleTimestamp(dispatchInput),
httpRequest: {
httpMethod: 'POST',
url: callbackUrl(input.publicBaseUrl, dispatchInput.dispatchId),
headers: {
'content-type': 'application/json',
'x-household-scheduler-secret': input.sharedSecret
},
body: Buffer.from(
JSON.stringify({
dispatchId: dispatchInput.dispatchId
})
).toString('base64')
}
}
})
}
)
if (!response.ok) {
throw new Error(`Cloud Tasks create task failed with status ${response.status}`)
}
const payload = (await response.json()) as { name?: string }
if (!payload.name) {
throw new Error('Cloud Tasks create task response did not include a task name')
}
return {
providerDispatchId: payload.name
}
},
async cancelDispatch(providerDispatchId) {
const response = await fetchImpl(
`https://cloudtasks.googleapis.com/v2/${providerDispatchId}`,
{
method: 'DELETE',
headers: await authorizedHeaders()
}
)
if (response.status === 404) {
return
}
if (!response.ok) {
throw new Error(`Cloud Tasks delete task failed with status ${response.status}`)
}
}
}
}

View File

@@ -10,7 +10,58 @@ import {
const appPromise = createBotRuntimeApp()
const logger = getLogger('lambda')
export async function handler(event: LambdaFunctionUrlRequest): Promise<LambdaFunctionUrlResponse> {
interface ScheduledDispatchLambdaEvent {
source: 'household.scheduled-dispatch'
dispatchId: string
}
function isScheduledDispatchLambdaEvent(value: unknown): value is ScheduledDispatchLambdaEvent {
if (!value || typeof value !== 'object') {
return false
}
const candidate = value as Record<string, unknown>
return (
candidate.source === 'household.scheduled-dispatch' && typeof candidate.dispatchId === 'string'
)
}
async function handleScheduledDispatchEvent(
event: ScheduledDispatchLambdaEvent
): Promise<LambdaFunctionUrlResponse> {
const app = await appPromise
const secret = process.env.SCHEDULER_SHARED_SECRET
const response = await app.fetch(
new Request(`https://lambda.internal/jobs/dispatch/${event.dispatchId}`, {
method: 'POST',
headers: secret
? {
'x-household-scheduler-secret': secret
}
: undefined,
body: JSON.stringify({
dispatchId: event.dispatchId
})
})
)
return {
statusCode: response.status,
headers: {
'content-type': response.headers.get('content-type') ?? 'application/json; charset=utf-8'
},
body: await response.text()
}
}
export async function handler(
event: LambdaFunctionUrlRequest | ScheduledDispatchLambdaEvent
): Promise<LambdaFunctionUrlResponse> {
if (isScheduledDispatchLambdaEvent(event)) {
return handleScheduledDispatchEvent(event)
}
const app = await appPromise
return handleLambdaFunctionUrlEvent(event, app.fetch)
}
@@ -76,7 +127,9 @@ async function runtimeLoop(): Promise<void> {
}
try {
const event = (await invocation.json()) as LambdaFunctionUrlRequest
const event = (await invocation.json()) as
| LambdaFunctionUrlRequest
| ScheduledDispatchLambdaEvent
const response = await handler(event)
await postRuntimeResponse(requestId, response)
} catch (error) {

View File

@@ -1,331 +0,0 @@
import { describe, expect, mock, test } from 'bun:test'
import type { ReminderJobResult, ReminderJobService } from '@household/application'
import { Temporal } from '@household/domain'
import type { ReminderTarget } from '@household/ports'
import { createReminderJobsHandler } from './reminder-jobs'
const target: ReminderTarget = {
householdId: 'household-1',
householdName: 'Kojori House',
telegramChatId: '-1001',
telegramThreadId: '12',
locale: 'ru',
timezone: 'Asia/Tbilisi',
rentDueDay: 20,
rentWarningDay: 17,
utilitiesDueDay: 4,
utilitiesReminderDay: 3
}
const fixedNow = Temporal.Instant.from('2026-03-03T09:00:00Z')
describe('createReminderJobsHandler', () => {
test('returns per-household dispatch outcome with Telegram delivery metadata', async () => {
const claimedResult: ReminderJobResult = {
status: 'claimed',
dedupeKey: '2026-03:utilities',
payloadHash: 'hash',
reminderType: 'utilities',
period: '2026-03',
messageText: 'Utilities reminder for 2026-03'
}
const reminderService: ReminderJobService = {
handleJob: mock(async () => claimedResult)
}
const sendReminderMessage = mock(async () => {})
const handler = createReminderJobsHandler({
listReminderTargets: async () => [target],
releaseReminderDispatch: mock(async () => {}),
sendReminderMessage,
reminderService,
now: () => fixedNow,
botUsername: 'household_test_bot'
})
const response = await handler.handle(
new Request('http://localhost/jobs/reminder/utilities', {
method: 'POST',
body: JSON.stringify({
period: '2026-03',
jobId: 'job-1'
})
}),
'utilities'
)
expect(sendReminderMessage).toHaveBeenCalledTimes(1)
expect(sendReminderMessage).toHaveBeenCalledWith(
target,
expect.objectContaining({
text: 'Напоминание по коммунальным платежам за 2026-03',
replyMarkup: {
inline_keyboard: [
[
{
text: 'Ввести по шагам',
callback_data: 'reminder_util:guided'
},
{
text: 'Шаблон',
callback_data: 'reminder_util:template'
}
],
[
{
text: 'Открыть дашборд',
url: 'https://t.me/household_test_bot?start=dashboard'
}
]
]
}
})
)
expect(response.status).toBe(200)
expect(await response.json()).toEqual({
ok: true,
jobId: 'job-1',
reminderType: 'utilities',
period: '2026-03',
dryRun: false,
totals: {
targets: 1,
claimed: 1,
duplicate: 0,
'dry-run': 0,
failed: 0
},
dispatches: [
{
householdId: 'household-1',
householdName: 'Kojori House',
telegramChatId: '-1001',
telegramThreadId: '12',
period: '2026-03',
dedupeKey: '2026-03:utilities',
outcome: 'claimed',
messageText: 'Напоминание по коммунальным платежам за 2026-03'
}
]
})
})
test('supports forced dry-run mode without posting to Telegram', async () => {
const dryRunResult: ReminderJobResult = {
status: 'dry-run',
dedupeKey: '2026-03:rent-warning',
payloadHash: 'hash',
reminderType: 'rent-warning',
period: '2026-03',
messageText: 'Rent reminder for 2026-03: payment is coming up soon.'
}
const reminderService: ReminderJobService = {
handleJob: mock(async () => dryRunResult)
}
const sendReminderMessage = mock(async () => {})
const handler = createReminderJobsHandler({
listReminderTargets: async () => [target],
releaseReminderDispatch: mock(async () => {}),
sendReminderMessage,
reminderService,
forceDryRun: true,
now: () => fixedNow
})
const response = await handler.handle(
new Request('http://localhost/jobs/reminder/rent-warning', {
method: 'POST',
body: JSON.stringify({ period: '2026-03', jobId: 'job-2' })
}),
'rent-warning'
)
expect(sendReminderMessage).toHaveBeenCalledTimes(0)
expect(response.status).toBe(200)
expect(await response.json()).toMatchObject({
dryRun: true,
totals: {
targets: 1,
claimed: 0,
duplicate: 0,
'dry-run': 1,
failed: 0
}
})
})
test('releases a dispatch claim when Telegram delivery fails', async () => {
const failedResult: ReminderJobResult = {
status: 'claimed',
dedupeKey: '2026-03:rent-due',
payloadHash: 'hash',
reminderType: 'rent-due',
period: '2026-03',
messageText: 'Rent due reminder for 2026-03: please settle payment today.'
}
const reminderService: ReminderJobService = {
handleJob: mock(async () => failedResult)
}
const releaseReminderDispatch = mock(async () => {})
const handler = createReminderJobsHandler({
listReminderTargets: async () => [target],
releaseReminderDispatch,
sendReminderMessage: mock(async () => {
throw new Error('Telegram unavailable')
}),
reminderService,
now: () => fixedNow
})
const response = await handler.handle(
new Request('http://localhost/jobs/reminder/rent-due', {
method: 'POST',
body: JSON.stringify({ period: '2026-03' })
}),
'rent-due'
)
expect(releaseReminderDispatch).toHaveBeenCalledWith({
householdId: 'household-1',
period: '2026-03',
reminderType: 'rent-due'
})
expect(response.status).toBe(200)
expect(await response.json()).toMatchObject({
totals: {
failed: 1
},
dispatches: [
expect.objectContaining({
outcome: 'failed',
error: 'Telegram unavailable'
})
]
})
})
test('rejects unsupported reminder type', async () => {
const handler = createReminderJobsHandler({
listReminderTargets: async () => [target],
releaseReminderDispatch: mock(async () => {}),
sendReminderMessage: mock(async () => {}),
reminderService: {
handleJob: mock(async () => {
throw new Error('should not be called')
})
},
now: () => fixedNow
})
const response = await handler.handle(
new Request('http://localhost/jobs/reminder/unknown', {
method: 'POST',
body: JSON.stringify({ period: '2026-03' })
}),
'unknown'
)
expect(response.status).toBe(400)
expect(await response.json()).toEqual({
ok: false,
error: 'Invalid reminder type'
})
})
test('skips households whose configured reminder day does not match today when no period override is supplied', async () => {
const reminderService: ReminderJobService = {
handleJob: mock(async () => {
throw new Error('should not be called')
})
}
const handler = createReminderJobsHandler({
listReminderTargets: async () => [
{
...target,
utilitiesReminderDay: 31
}
],
releaseReminderDispatch: mock(async () => {}),
sendReminderMessage: mock(async () => {}),
reminderService,
now: () => fixedNow
})
const response = await handler.handle(
new Request('http://localhost/jobs/reminder/utilities', {
method: 'POST',
body: JSON.stringify({ jobId: 'job-3' })
}),
'utilities'
)
expect(response.status).toBe(200)
expect(await response.json()).toMatchObject({
ok: true,
totals: {
targets: 0
},
dispatches: []
})
})
test('honors explicit period overrides even when today is not the configured reminder day', async () => {
const dryRunResult: ReminderJobResult = {
status: 'dry-run',
dedupeKey: '2026-03:rent-due',
payloadHash: 'hash',
reminderType: 'rent-due',
period: '2026-03',
messageText: 'Rent due reminder for 2026-03: please settle payment today.'
}
const reminderService: ReminderJobService = {
handleJob: mock(async () => dryRunResult)
}
const handler = createReminderJobsHandler({
listReminderTargets: async () => [
{
...target,
rentDueDay: 20
}
],
releaseReminderDispatch: mock(async () => {}),
sendReminderMessage: mock(async () => {}),
reminderService,
now: () => fixedNow
})
const response = await handler.handle(
new Request('http://localhost/jobs/reminder/rent-due', {
method: 'POST',
body: JSON.stringify({ period: '2026-03', dryRun: true })
}),
'rent-due'
)
expect(response.status).toBe(200)
expect(await response.json()).toMatchObject({
ok: true,
totals: {
targets: 1,
'dry-run': 1
},
dispatches: [
expect.objectContaining({
householdId: 'household-1',
period: '2026-03',
outcome: 'dry-run'
})
]
})
})
})

View File

@@ -1,299 +0,0 @@
import type { ReminderJobService } from '@household/application'
import { BillingPeriod, Temporal, nowInstant } from '@household/domain'
import type { Logger } from '@household/observability'
import { REMINDER_TYPES, type ReminderTarget, type ReminderType } from '@household/ports'
import type { InlineKeyboardMarkup } from 'grammy/types'
import { getBotTranslations } from './i18n'
import { buildUtilitiesReminderReplyMarkup } from './reminder-topic-utilities'
interface ReminderJobRequestBody {
period?: string
jobId?: string
dryRun?: boolean
}
export interface ReminderMessageContent {
text: string
replyMarkup?: InlineKeyboardMarkup
}
function json(body: object, status = 200): Response {
return new Response(JSON.stringify(body), {
status,
headers: {
'content-type': 'application/json; charset=utf-8'
}
})
}
function parseReminderType(raw: string): ReminderType | null {
if ((REMINDER_TYPES as readonly string[]).includes(raw)) {
return raw as ReminderType
}
return null
}
function currentPeriod(): string {
return BillingPeriod.fromInstant(nowInstant()).toString()
}
function targetLocalDate(target: ReminderTarget, instant: Temporal.Instant) {
return instant.toZonedDateTimeISO(target.timezone)
}
function isReminderDueToday(
target: ReminderTarget,
reminderType: ReminderType,
instant: Temporal.Instant
): boolean {
const currentDay = targetLocalDate(target, instant).day
switch (reminderType) {
case 'utilities':
return currentDay === target.utilitiesReminderDay
case 'rent-warning':
return currentDay === target.rentWarningDay
case 'rent-due':
return currentDay === target.rentDueDay
}
}
function targetPeriod(target: ReminderTarget, instant: Temporal.Instant): string {
const localDate = targetLocalDate(target, instant)
return BillingPeriod.fromString(
`${localDate.year}-${String(localDate.month).padStart(2, '0')}`
).toString()
}
async function readBody(request: Request): Promise<ReminderJobRequestBody> {
const text = await request.text()
if (text.trim().length === 0) {
return {}
}
try {
return JSON.parse(text) as ReminderJobRequestBody
} catch {
throw new Error('Invalid JSON body')
}
}
export function createReminderJobsHandler(options: {
listReminderTargets: () => Promise<readonly ReminderTarget[]>
ensureBillingCycle?: (input: { householdId: string; at: Temporal.Instant }) => Promise<void>
releaseReminderDispatch: (input: {
householdId: string
period: string
reminderType: ReminderType
}) => Promise<void>
sendReminderMessage: (target: ReminderTarget, content: ReminderMessageContent) => Promise<void>
reminderService: ReminderJobService
forceDryRun?: boolean
now?: () => Temporal.Instant
miniAppUrl?: string
botUsername?: string
logger?: Logger
}): {
handle: (request: Request, rawReminderType: string) => Promise<Response>
} {
function messageContent(
target: ReminderTarget,
reminderType: ReminderType,
period: string
): ReminderMessageContent {
const t = getBotTranslations(target.locale).reminders
switch (reminderType) {
case 'utilities':
return {
text: t.utilities(period),
replyMarkup: buildUtilitiesReminderReplyMarkup(target.locale, {
...(options.miniAppUrl
? {
miniAppUrl: options.miniAppUrl
}
: {}),
...(options.botUsername
? {
botUsername: options.botUsername
}
: {})
})
}
case 'rent-warning':
return {
text: t.rentWarning(period),
replyMarkup: {
inline_keyboard: [
[
{
text: t.openDashboardButton,
url: options.botUsername
? `https://t.me/${options.botUsername}?start=dashboard`
: '#'
}
]
]
}
}
case 'rent-due':
return {
text: t.rentDue(period),
replyMarkup: {
inline_keyboard: [
[
{
text: t.openDashboardButton,
url: options.botUsername
? `https://t.me/${options.botUsername}?start=dashboard`
: '#'
}
]
]
}
}
}
}
return {
handle: async (request, rawReminderType) => {
const reminderType = parseReminderType(rawReminderType)
if (!reminderType) {
return json({ ok: false, error: 'Invalid reminder type' }, 400)
}
try {
const body = await readBody(request)
const schedulerJobName = request.headers.get('x-cloudscheduler-jobname')
const requestedPeriod = body.period
? BillingPeriod.fromString(body.period).toString()
: null
const defaultPeriod = requestedPeriod ?? currentPeriod()
const dryRun = options.forceDryRun === true || body.dryRun === true
const currentInstant = options.now?.() ?? nowInstant()
const targets = await options.listReminderTargets()
const dispatches: Array<{
householdId: string
householdName: string
telegramChatId: string
telegramThreadId: string | null
period: string
dedupeKey: string
outcome: 'dry-run' | 'claimed' | 'duplicate' | 'failed'
messageText: string
error?: string
}> = []
for (const target of targets) {
await options.ensureBillingCycle?.({
householdId: target.householdId,
at: currentInstant
})
if (!requestedPeriod && !isReminderDueToday(target, reminderType, currentInstant)) {
continue
}
const period = requestedPeriod ?? targetPeriod(target, currentInstant)
const result = await options.reminderService.handleJob({
householdId: target.householdId,
period,
reminderType,
dryRun
})
const content = messageContent(target, reminderType, period)
let outcome: 'dry-run' | 'claimed' | 'duplicate' | 'failed' = result.status
let error: string | undefined
if (result.status === 'claimed') {
try {
await options.sendReminderMessage(target, content)
} catch (dispatchError) {
await options.releaseReminderDispatch({
householdId: target.householdId,
period,
reminderType
})
outcome = 'failed'
error =
dispatchError instanceof Error
? dispatchError.message
: 'Unknown reminder delivery error'
}
}
options.logger?.info(
{
event: 'scheduler.reminder.dispatch',
reminderType,
period,
jobId: body.jobId ?? schedulerJobName ?? null,
householdId: target.householdId,
householdName: target.householdName,
dedupeKey: result.dedupeKey,
outcome,
dryRun,
...(error ? { error } : {})
},
'Reminder job processed'
)
dispatches.push({
householdId: target.householdId,
householdName: target.householdName,
telegramChatId: target.telegramChatId,
telegramThreadId: target.telegramThreadId,
period,
dedupeKey: result.dedupeKey,
outcome,
messageText: content.text,
...(error ? { error } : {})
})
}
const totals = dispatches.reduce(
(summary, dispatch) => {
summary.targets += 1
summary[dispatch.outcome] += 1
return summary
},
{
targets: 0,
claimed: 0,
duplicate: 0,
'dry-run': 0,
failed: 0
}
)
return json({
ok: true,
jobId: body.jobId ?? schedulerJobName ?? null,
reminderType,
period: defaultPeriod,
dryRun,
totals,
dispatches
})
} catch (error) {
const message = error instanceof Error ? error.message : 'Unknown reminder job error'
options.logger?.error(
{
event: 'scheduler.reminder.dispatch_failed',
reminderType: rawReminderType,
error: message
},
'Reminder job failed'
)
return json({ ok: false, error: message }, 400)
}
}
}
}

View File

@@ -0,0 +1,218 @@
import { describe, expect, test } from 'bun:test'
import type { ScheduledDispatchService } from '@household/application'
import { Temporal } from '@household/domain'
import type {
AdHocNotificationRecord,
HouseholdMemberRecord,
HouseholdTelegramChatRecord,
HouseholdTopicBindingRecord,
ScheduledDispatchRecord
} from '@household/ports'
import { createScheduledDispatchHandler } from './scheduled-dispatch-handler'
function scheduledDispatch(
input: Partial<ScheduledDispatchRecord> &
Pick<ScheduledDispatchRecord, 'id' | 'householdId' | 'kind'>
): ScheduledDispatchRecord {
return {
id: input.id,
householdId: input.householdId,
kind: input.kind,
dueAt: input.dueAt ?? Temporal.Now.instant().subtract({ minutes: 1 }),
timezone: input.timezone ?? 'Asia/Tbilisi',
status: input.status ?? 'scheduled',
provider: input.provider ?? 'gcp-cloud-tasks',
providerDispatchId: input.providerDispatchId ?? 'provider-1',
adHocNotificationId: input.adHocNotificationId ?? null,
period: input.period ?? null,
sentAt: input.sentAt ?? null,
cancelledAt: input.cancelledAt ?? null,
createdAt: input.createdAt ?? Temporal.Instant.from('2026-03-24T00:00:00Z'),
updatedAt: input.updatedAt ?? Temporal.Instant.from('2026-03-24T00:00:00Z')
}
}
function notification(input: Partial<AdHocNotificationRecord> = {}): AdHocNotificationRecord {
return {
id: input.id ?? 'notif-1',
householdId: input.householdId ?? 'household-1',
creatorMemberId: input.creatorMemberId ?? 'creator',
assigneeMemberId: input.assigneeMemberId ?? null,
originalRequestText: 'raw',
notificationText: input.notificationText ?? 'Reminder text',
timezone: input.timezone ?? 'Asia/Tbilisi',
scheduledFor: input.scheduledFor ?? Temporal.Now.instant().subtract({ minutes: 1 }),
timePrecision: input.timePrecision ?? 'exact',
deliveryMode: input.deliveryMode ?? 'topic',
dmRecipientMemberIds: input.dmRecipientMemberIds ?? [],
friendlyTagAssignee: input.friendlyTagAssignee ?? false,
status: input.status ?? 'scheduled',
sourceTelegramChatId: input.sourceTelegramChatId ?? null,
sourceTelegramThreadId: input.sourceTelegramThreadId ?? null,
sentAt: input.sentAt ?? null,
cancelledAt: input.cancelledAt ?? null,
cancelledByMemberId: input.cancelledByMemberId ?? null,
createdAt: input.createdAt ?? Temporal.Instant.from('2026-03-24T00:00:00Z'),
updatedAt: input.updatedAt ?? Temporal.Instant.from('2026-03-24T00:00:00Z')
}
}
describe('createScheduledDispatchHandler', () => {
test('delivers ad hoc topic notifications exactly once and marks them sent', async () => {
const dispatch = scheduledDispatch({
id: 'dispatch-1',
householdId: 'household-1',
kind: 'ad_hoc_notification',
adHocNotificationId: 'notif-1'
})
const sentTopicMessages: string[] = []
const markedNotifications: string[] = []
const markedDispatches: string[] = []
const service: ScheduledDispatchService = {
scheduleAdHocNotification: async () => dispatch,
cancelAdHocNotification: async () => {},
reconcileHouseholdBuiltInDispatches: async () => {},
reconcileAllBuiltInDispatches: async () => {},
getDispatchById: async () => dispatch,
claimDispatch: async () => true,
releaseDispatch: async () => {},
markDispatchSent: async (dispatchId) => {
markedDispatches.push(dispatchId)
return dispatch
}
}
const handler = createScheduledDispatchHandler({
scheduledDispatchService: service,
adHocNotificationRepository: {
async getNotificationById() {
return notification({
id: 'notif-1',
scheduledFor: dispatch.dueAt,
notificationText: 'Dima, reminder landed.'
})
},
async markNotificationSent(notificationId) {
markedNotifications.push(notificationId)
return null
}
},
householdConfigurationRepository: {
async getHouseholdChatByHouseholdId(): Promise<HouseholdTelegramChatRecord | null> {
return {
householdId: 'household-1',
householdName: 'Kojori',
telegramChatId: 'chat-1',
telegramChatType: 'supergroup',
title: 'Kojori',
defaultLocale: 'ru'
}
},
async getHouseholdTopicBinding(): Promise<HouseholdTopicBindingRecord | null> {
return {
householdId: 'household-1',
role: 'reminders',
telegramThreadId: '103',
topicName: 'Reminders'
}
},
async getHouseholdBillingSettings() {
throw new Error('not used')
},
async listHouseholdMembers(): Promise<readonly HouseholdMemberRecord[]> {
return []
}
},
sendTopicMessage: async (input) => {
sentTopicMessages.push(`${input.chatId}:${input.threadId}:${input.text}`)
},
sendDirectMessage: async () => {
throw new Error('not used')
}
})
const response = await handler.handle(
new Request('http://localhost/jobs/dispatch/dispatch-1', { method: 'POST' }),
'dispatch-1'
)
const payload = (await response.json()) as { ok: boolean; outcome: string }
expect(payload.ok).toBe(true)
expect(payload.outcome).toBe('sent')
expect(sentTopicMessages).toEqual(['chat-1:103:Dima, reminder landed.'])
expect(markedNotifications).toEqual(['notif-1'])
expect(markedDispatches).toEqual(['dispatch-1'])
})
test('ignores stale ad hoc dispatch callbacks after a reschedule', async () => {
const dispatch = scheduledDispatch({
id: 'dispatch-1',
householdId: 'household-1',
kind: 'ad_hoc_notification',
adHocNotificationId: 'notif-1',
dueAt: Temporal.Instant.from('2026-03-24T08:00:00Z')
})
let released = false
const service: ScheduledDispatchService = {
scheduleAdHocNotification: async () => dispatch,
cancelAdHocNotification: async () => {},
reconcileHouseholdBuiltInDispatches: async () => {},
reconcileAllBuiltInDispatches: async () => {},
getDispatchById: async () => dispatch,
claimDispatch: async () => true,
releaseDispatch: async () => {
released = true
},
markDispatchSent: async () => dispatch
}
const handler = createScheduledDispatchHandler({
scheduledDispatchService: service,
adHocNotificationRepository: {
async getNotificationById() {
return notification({
id: 'notif-1',
scheduledFor: Temporal.Instant.from('2026-03-24T09:00:00Z')
})
},
async markNotificationSent() {
throw new Error('not used')
}
},
householdConfigurationRepository: {
async getHouseholdChatByHouseholdId() {
return null
},
async getHouseholdTopicBinding() {
return null
},
async getHouseholdBillingSettings() {
throw new Error('not used')
},
async listHouseholdMembers(): Promise<readonly HouseholdMemberRecord[]> {
return []
}
},
sendTopicMessage: async () => {
throw new Error('should not send')
},
sendDirectMessage: async () => {
throw new Error('should not send')
}
})
const response = await handler.handle(
new Request('http://localhost/jobs/dispatch/dispatch-1', { method: 'POST' }),
'dispatch-1'
)
const payload = (await response.json()) as { ok: boolean; outcome: string }
expect(payload.ok).toBe(true)
expect(payload.outcome).toBe('stale')
expect(released).toBe(true)
})
})

View File

@@ -0,0 +1,292 @@
import type { ScheduledDispatchService } from '@household/application'
import { BillingPeriod, nowInstant } from '@household/domain'
import type { Logger } from '@household/observability'
import type {
AdHocNotificationRepository,
HouseholdConfigurationRepository,
HouseholdMemberRecord,
ReminderType
} from '@household/ports'
import type { InlineKeyboardMarkup } from 'grammy/types'
import { buildTopicNotificationText } from './ad-hoc-notifications'
import { buildScheduledReminderMessageContent } from './scheduled-reminder-content'
function json(body: object, status = 200): Response {
return new Response(JSON.stringify(body), {
status,
headers: {
'content-type': 'application/json; charset=utf-8'
}
})
}
function builtInReminderType(kind: 'utilities' | 'rent_warning' | 'rent_due'): ReminderType {
switch (kind) {
case 'utilities':
return 'utilities'
case 'rent_warning':
return 'rent-warning'
case 'rent_due':
return 'rent-due'
}
}
export function createScheduledDispatchHandler(options: {
scheduledDispatchService: ScheduledDispatchService
adHocNotificationRepository: Pick<
AdHocNotificationRepository,
'getNotificationById' | 'markNotificationSent'
>
householdConfigurationRepository: Pick<
HouseholdConfigurationRepository,
| 'getHouseholdChatByHouseholdId'
| 'getHouseholdTopicBinding'
| 'getHouseholdBillingSettings'
| 'listHouseholdMembers'
>
sendTopicMessage: (input: {
householdId: string
chatId: string
threadId: string | null
text: string
parseMode?: 'HTML'
replyMarkup?: InlineKeyboardMarkup
}) => Promise<void>
sendDirectMessage: (input: { telegramUserId: string; text: string }) => Promise<void>
miniAppUrl?: string
botUsername?: string
logger?: Logger
}): {
handle: (request: Request, dispatchId: string) => Promise<Response>
} {
async function sendAdHocNotification(dispatchId: string) {
const dispatch = await options.scheduledDispatchService.getDispatchById(dispatchId)
if (
!dispatch ||
dispatch.kind !== 'ad_hoc_notification' ||
!dispatch.adHocNotificationId ||
dispatch.status !== 'scheduled'
) {
return { outcome: 'noop' as const }
}
const currentNow = nowInstant()
if (dispatch.dueAt.epochMilliseconds > currentNow.epochMilliseconds) {
return { outcome: 'not_due' as const }
}
const claimed = await options.scheduledDispatchService.claimDispatch(dispatch.id)
if (!claimed) {
return { outcome: 'duplicate' as const }
}
try {
const notification = await options.adHocNotificationRepository.getNotificationById(
dispatch.adHocNotificationId
)
if (!notification || notification.status !== 'scheduled') {
await options.scheduledDispatchService.markDispatchSent(dispatch.id, currentNow)
return { outcome: 'noop' as const }
}
if (notification.scheduledFor.epochMilliseconds !== dispatch.dueAt.epochMilliseconds) {
await options.scheduledDispatchService.releaseDispatch(dispatch.id)
return { outcome: 'stale' as const }
}
if (notification.deliveryMode === 'topic') {
const householdChat =
notification.sourceTelegramChatId ??
(
await options.householdConfigurationRepository.getHouseholdChatByHouseholdId(
notification.householdId
)
)?.telegramChatId
const threadId =
notification.sourceTelegramThreadId ??
(
await options.householdConfigurationRepository.getHouseholdTopicBinding(
notification.householdId,
'reminders'
)
)?.telegramThreadId ??
null
if (!householdChat) {
throw new Error(`Household chat not configured for ${notification.householdId}`)
}
const content = buildTopicNotificationText({
notificationText: notification.notificationText
})
await options.sendTopicMessage({
householdId: notification.householdId,
chatId: householdChat,
threadId,
text: content.text,
parseMode: content.parseMode
})
} else {
const members = await options.householdConfigurationRepository.listHouseholdMembers(
notification.householdId
)
const dmRecipients = notification.dmRecipientMemberIds
.map((memberId) => members.find((member) => member.id === memberId))
.filter((member): member is HouseholdMemberRecord => Boolean(member))
for (const recipient of dmRecipients) {
await options.sendDirectMessage({
telegramUserId: recipient.telegramUserId,
text: notification.notificationText
})
}
}
await options.adHocNotificationRepository.markNotificationSent(notification.id, currentNow)
await options.scheduledDispatchService.markDispatchSent(dispatch.id, currentNow)
return { outcome: 'sent' as const }
} catch (error) {
await options.scheduledDispatchService.releaseDispatch(dispatch.id)
throw error
}
}
async function sendBuiltInReminder(dispatchId: string) {
const dispatch = await options.scheduledDispatchService.getDispatchById(dispatchId)
if (
!dispatch ||
dispatch.status !== 'scheduled' ||
(dispatch.kind !== 'utilities' &&
dispatch.kind !== 'rent_warning' &&
dispatch.kind !== 'rent_due')
) {
return { outcome: 'noop' as const }
}
const currentNow = nowInstant()
if (dispatch.dueAt.epochMilliseconds > currentNow.epochMilliseconds) {
return { outcome: 'not_due' as const }
}
const claimed = await options.scheduledDispatchService.claimDispatch(dispatch.id)
if (!claimed) {
return { outcome: 'duplicate' as const }
}
try {
const [chat, reminderTopic] = await Promise.all([
options.householdConfigurationRepository.getHouseholdChatByHouseholdId(
dispatch.householdId
),
options.householdConfigurationRepository.getHouseholdTopicBinding(
dispatch.householdId,
'reminders'
)
])
if (!chat) {
await options.scheduledDispatchService.markDispatchSent(dispatch.id, currentNow)
return { outcome: 'noop' as const }
}
const content = buildScheduledReminderMessageContent({
locale: chat.defaultLocale,
reminderType: builtInReminderType(dispatch.kind),
period:
dispatch.period ??
BillingPeriod.fromInstant(
dispatch.dueAt.toZonedDateTimeISO(dispatch.timezone).toInstant()
).toString(),
...(options.miniAppUrl
? {
miniAppUrl: options.miniAppUrl
}
: {}),
...(options.botUsername
? {
botUsername: options.botUsername
}
: {})
})
await options.sendTopicMessage({
householdId: dispatch.householdId,
chatId: chat.telegramChatId,
threadId: reminderTopic?.telegramThreadId ?? null,
text: content.text,
...(content.replyMarkup
? {
replyMarkup: content.replyMarkup
}
: {})
})
await options.scheduledDispatchService.markDispatchSent(dispatch.id, currentNow)
await options.scheduledDispatchService.reconcileHouseholdBuiltInDispatches(
dispatch.householdId,
currentNow.add({ seconds: 1 })
)
return {
outcome: 'sent' as const
}
} catch (error) {
await options.scheduledDispatchService.releaseDispatch(dispatch.id)
throw error
}
}
return {
handle: async (_request, dispatchId) => {
try {
const dispatch = await options.scheduledDispatchService.getDispatchById(dispatchId)
if (!dispatch) {
return json({
ok: true,
dispatchId,
outcome: 'noop'
})
}
const result =
dispatch.kind === 'ad_hoc_notification'
? await sendAdHocNotification(dispatchId)
: await sendBuiltInReminder(dispatchId)
options.logger?.info(
{
event: 'scheduler.scheduled_dispatch.handle',
dispatchId,
householdId: dispatch.householdId,
kind: dispatch.kind,
outcome: result.outcome
},
'Scheduled dispatch handled'
)
return json({
ok: true,
dispatchId,
outcome: result.outcome
})
} catch (error) {
options.logger?.error(
{
event: 'scheduler.scheduled_dispatch.failed',
dispatchId,
error: error instanceof Error ? error.message : String(error)
},
'Scheduled dispatch failed'
)
return json(
{
ok: false,
dispatchId,
error: error instanceof Error ? error.message : 'Unknown error'
},
500
)
}
}
}
}

View File

@@ -0,0 +1,70 @@
import type { ReminderType } from '@household/ports'
import type { InlineKeyboardMarkup } from 'grammy/types'
import { getBotTranslations } from './i18n'
import type { BotLocale } from './i18n'
import { buildUtilitiesReminderReplyMarkup } from './reminder-topic-utilities'
export interface ScheduledReminderMessageContent {
text: string
replyMarkup?: InlineKeyboardMarkup
}
export function buildScheduledReminderMessageContent(input: {
locale: BotLocale
reminderType: ReminderType
period: string
miniAppUrl?: string
botUsername?: string
}): ScheduledReminderMessageContent {
const t = getBotTranslations(input.locale).reminders
const dashboardReplyMarkup = input.botUsername
? ({
inline_keyboard: [
[
{
text: t.openDashboardButton,
url: `https://t.me/${input.botUsername}?start=dashboard`
}
]
]
} satisfies InlineKeyboardMarkup)
: null
switch (input.reminderType) {
case 'utilities':
return {
text: t.utilities(input.period),
replyMarkup: buildUtilitiesReminderReplyMarkup(input.locale, {
...(input.miniAppUrl
? {
miniAppUrl: input.miniAppUrl
}
: {}),
...(input.botUsername
? {
botUsername: input.botUsername
}
: {})
})
}
case 'rent-warning':
return {
text: t.rentWarning(input.period),
...(dashboardReplyMarkup
? {
replyMarkup: dashboardReplyMarkup
}
: {})
}
case 'rent-due':
return {
text: t.rentDue(input.period),
...(dashboardReplyMarkup
? {
replyMarkup: dashboardReplyMarkup
}
: {})
}
}
}

View File

@@ -9,7 +9,7 @@ describe('createSchedulerRequestAuthorizer', () => {
})
const authorized = await authorizer.authorize(
new Request('http://localhost/jobs/reminder/utilities', {
new Request('http://localhost/jobs/dispatch/test-dispatch', {
headers: {
'x-household-scheduler-secret': 'secret'
}
@@ -36,7 +36,7 @@ describe('createSchedulerRequestAuthorizer', () => {
})
const authorized = await authorizer.authorize(
new Request('http://localhost/jobs/reminder/utilities', {
new Request('http://localhost/jobs/dispatch/test-dispatch', {
headers: {
authorization: 'Bearer signed-id-token'
}
@@ -63,7 +63,7 @@ describe('createSchedulerRequestAuthorizer', () => {
})
const authorized = await authorizer.authorize(
new Request('http://localhost/jobs/reminder/utilities', {
new Request('http://localhost/jobs/dispatch/test-dispatch', {
headers: {
authorization: 'Bearer signed-id-token'
}

View File

@@ -564,7 +564,7 @@ describe('createBotWebhookServer', () => {
test('rejects scheduler request with missing secret', async () => {
const response = await server.fetch(
new Request('http://localhost/jobs/reminder/utilities', {
new Request('http://localhost/jobs/dispatch/test-dispatch', {
method: 'POST',
body: JSON.stringify({ period: '2026-03' })
})
@@ -575,7 +575,7 @@ describe('createBotWebhookServer', () => {
test('rejects non-post method for scheduler endpoint', async () => {
const response = await server.fetch(
new Request('http://localhost/jobs/reminder/utilities', {
new Request('http://localhost/jobs/dispatch/test-dispatch', {
method: 'GET',
headers: {
'x-household-scheduler-secret': 'scheduler-secret'
@@ -588,7 +588,7 @@ describe('createBotWebhookServer', () => {
test('accepts authorized scheduler request', async () => {
const response = await server.fetch(
new Request('http://localhost/jobs/reminder/rent-due', {
new Request('http://localhost/jobs/dispatch/test-dispatch', {
method: 'POST',
headers: {
'x-household-scheduler-secret': 'scheduler-secret'
@@ -600,7 +600,7 @@ describe('createBotWebhookServer', () => {
expect(response.status).toBe(200)
expect(await response.json()).toEqual({
ok: true,
reminderType: 'rent-due'
reminderType: 'dispatch/test-dispatch'
})
})
})

View File

@@ -297,9 +297,7 @@ export function createBotWebhookServer(options: BotWebhookServerOptions): {
options.miniAppDeletePayment?.path ?? '/api/miniapp/admin/payments/delete'
const miniAppLocalePreferencePath =
options.miniAppLocalePreference?.path ?? '/api/miniapp/preferences/locale'
const schedulerPathPrefix = options.scheduler
? (options.scheduler.pathPrefix ?? '/jobs/reminder')
: null
const schedulerPathPrefix = options.scheduler ? (options.scheduler.pathPrefix ?? '/jobs') : null
return {
fetch: async (request: Request) => {