From 575a68b3bb7ff646b2aae66a890be54a696e7557 Mon Sep 17 00:00:00 2001 From: claw Date: Mon, 30 Mar 2026 15:27:15 +0200 Subject: [PATCH] feat(bot): add self-hosted scheduled dispatch support Co-authored-by: claw --- .env.example | 4 + apps/bot/Dockerfile | 12 +- apps/bot/package.json | 2 +- apps/bot/src/app.ts | 34 +++-- apps/bot/src/config.ts | 13 ++ .../src/scheduled-dispatch-handler.test.ts | 2 + apps/bot/src/scheduled-dispatch-handler.ts | 139 +++++++++++++++--- apps/bot/src/scheduler-runner.ts | 83 +++++++++++ ...elf-hosted-scheduled-dispatch-scheduler.ts | 27 ++++ .../src/scheduled-dispatch-repository.ts | 22 ++- .../src/scheduled-dispatch-service.test.ts | 13 ++ .../src/scheduled-dispatch-service.ts | 9 ++ packages/ports/src/scheduled-dispatches.ts | 11 +- 13 files changed, 331 insertions(+), 40 deletions(-) create mode 100644 apps/bot/src/scheduler-runner.ts create mode 100644 apps/bot/src/self-hosted-scheduled-dispatch-scheduler.ts diff --git a/.env.example b/.env.example index 32ede4f..bdfdc97 100644 --- a/.env.example +++ b/.env.example @@ -37,6 +37,10 @@ SCHEDULER_OIDC_ALLOWED_EMAILS=scheduler-invoker@your-project.iam.gserviceaccount # Scheduled dispatches # Leave blank to disable scheduled dispatch locally. +# Options: +# - self-hosted (VPS / Docker Compose poller) +# - gcp-cloud-tasks (Cloud Run / GCP) +# - aws-eventbridge (AWS) SCHEDULED_DISPATCH_PROVIDER= # GCP Cloud Tasks diff --git a/apps/bot/Dockerfile b/apps/bot/Dockerfile index cf2300c..d27982c 100644 --- a/apps/bot/Dockerfile +++ b/apps/bot/Dockerfile @@ -1,6 +1,6 @@ # syntax=docker/dockerfile:1.7 -FROM oven/bun:1.3.10 AS deps +FROM oven/bun:1.3.10-alpine AS deps WORKDIR /app COPY bun.lock package.json tsconfig.base.json ./ @@ -25,19 +25,23 @@ WORKDIR /app COPY apps ./apps COPY packages ./packages -RUN bun run --filter @household/bot build +RUN bun run --filter @household/bot build \ + && mkdir -p packages/db/dist \ + && bun build packages/db/src/migrate.ts --outdir packages/db/dist --target bun -FROM oven/bun:1.3.10 AS runtime +FROM oven/bun:1.3.10-alpine AS runtime WORKDIR /app ENV NODE_ENV=production ENV PORT=8080 COPY --from=build /app/apps/bot/dist ./apps/bot/dist +COPY --from=build /app/packages/db/dist ./packages/db/dist +COPY --from=build /app/packages/db/drizzle ./packages/db/drizzle EXPOSE 8080 HEALTHCHECK --interval=30s --timeout=5s --start-period=10s --retries=3 \ - CMD bun -e "fetch('http://127.0.0.1:' + (process.env.PORT ?? '8080') + '/health').then((res) => process.exit(res.ok ? 0 : 1)).catch(() => process.exit(1))" + CMD bun -e "fetch('http://127.0.0.1:' + (process.env.PORT ?? '8080') + '/healthz').then((res) => process.exit(res.ok ? 0 : 1)).catch(() => process.exit(1))" CMD ["bun", "apps/bot/dist/index.js"] diff --git a/apps/bot/package.json b/apps/bot/package.json index d1449db..5b6d649 100644 --- a/apps/bot/package.json +++ b/apps/bot/package.json @@ -4,7 +4,7 @@ "type": "module", "scripts": { "dev": "bun run src/index.ts", - "build": "bun build src/index.ts src/lambda.ts --outdir dist --target bun", + "build": "bun build src/index.ts src/lambda.ts src/scheduler-runner.ts --outdir dist --target bun", "typecheck": "tsgo --project tsconfig.json --noEmit", "test": "bun test --pass-with-no-tests", "lint": "oxlint \"src\"" diff --git a/apps/bot/src/app.ts b/apps/bot/src/app.ts index 6399992..c97a78a 100644 --- a/apps/bot/src/app.ts +++ b/apps/bot/src/app.ts @@ -39,6 +39,7 @@ import { registerHouseholdSetupCommands } from './household-setup' import { HouseholdContextCache } from './household-context-cache' import { createAwsScheduledDispatchScheduler } from './aws-scheduled-dispatch-scheduler' import { createGcpScheduledDispatchScheduler } from './gcp-scheduled-dispatch-scheduler' +import { createSelfHostedScheduledDispatchScheduler } from './self-hosted-scheduled-dispatch-scheduler' import { createMiniAppAuthHandler, createMiniAppJoinHandler } from './miniapp-auth' import { createMiniAppApproveMemberHandler, @@ -139,21 +140,24 @@ export async function createBotRuntimeApp(): Promise { ? createDbScheduledDispatchRepository(runtime.databaseUrl) : null const scheduledDispatchScheduler = - runtime.scheduledDispatch && runtime.schedulerSharedSecret + runtime.scheduledDispatch && + (runtime.scheduledDispatch.provider === 'self-hosted' || runtime.schedulerSharedSecret) ? runtime.scheduledDispatch.provider === 'gcp-cloud-tasks' ? createGcpScheduledDispatchScheduler({ projectId: runtime.scheduledDispatch.projectId, location: runtime.scheduledDispatch.location, queue: runtime.scheduledDispatch.queue, publicBaseUrl: runtime.scheduledDispatch.publicBaseUrl, - sharedSecret: runtime.schedulerSharedSecret - }) - : createAwsScheduledDispatchScheduler({ - region: runtime.scheduledDispatch.region, - targetLambdaArn: runtime.scheduledDispatch.targetLambdaArn, - roleArn: runtime.scheduledDispatch.roleArn, - groupName: runtime.scheduledDispatch.groupName + sharedSecret: runtime.schedulerSharedSecret! }) + : runtime.scheduledDispatch.provider === 'aws-eventbridge' + ? createAwsScheduledDispatchScheduler({ + region: runtime.scheduledDispatch.region, + targetLambdaArn: runtime.scheduledDispatch.targetLambdaArn, + roleArn: runtime.scheduledDispatch.roleArn, + groupName: runtime.scheduledDispatch.groupName + }) + : createSelfHostedScheduledDispatchScheduler() : null const scheduledDispatchService = scheduledDispatchRepositoryClient && @@ -514,7 +518,7 @@ export async function createBotRuntimeApp(): Promise { event: 'runtime.feature_disabled', feature: 'scheduled-dispatch' }, - 'Scheduled dispatch is disabled. Configure DATABASE_URL, SCHEDULED_DISPATCH_PROVIDER, and scheduler auth to enable reminder delivery.' + 'Scheduled dispatch is disabled. Configure DATABASE_URL and SCHEDULED_DISPATCH_PROVIDER to enable reminder delivery.' ) } @@ -933,6 +937,12 @@ export async function createBotRuntimeApp(): Promise { oidcAllowedEmails: runtime.schedulerOidcAllowedEmails }).authorize, handler: async (request, jobPath) => { + if (jobPath === 'dispatch-due') { + return scheduledDispatchHandler + ? scheduledDispatchHandler.handleDueDispatches(request) + : new Response('Not Found', { status: 404 }) + } + if (jobPath.startsWith('dispatch/')) { return scheduledDispatchHandler ? scheduledDispatchHandler.handle(request, jobPath.slice('dispatch/'.length)) @@ -949,6 +959,12 @@ export async function createBotRuntimeApp(): Promise { oidcAllowedEmails: runtime.schedulerOidcAllowedEmails }).authorize, handler: async (request, jobPath) => { + if (jobPath === 'dispatch-due') { + return scheduledDispatchHandler + ? scheduledDispatchHandler.handleDueDispatches(request) + : new Response('Not Found', { status: 404 }) + } + if (jobPath.startsWith('dispatch/')) { return scheduledDispatchHandler ? scheduledDispatchHandler.handle(request, jobPath.slice('dispatch/'.length)) diff --git a/apps/bot/src/config.ts b/apps/bot/src/config.ts index e98eda5..0bc8d3f 100644 --- a/apps/bot/src/config.ts +++ b/apps/bot/src/config.ts @@ -28,6 +28,9 @@ export interface BotRuntimeConfig { roleArn: string groupName: string } + | { + provider: 'self-hosted' + } | undefined openaiApiKey?: string purchaseParserModel: string @@ -148,6 +151,12 @@ function parseScheduledDispatchConfig( } } + if (provider === 'self-hosted') { + return { + provider + } + } + throw new Error(`Invalid SCHEDULED_DISPATCH_PROVIDER value: ${provider}`) } @@ -239,6 +248,10 @@ export function getBotRuntimeConfig(env: NodeJS.ProcessEnv = process.env): BotRu runtime.schedulerSharedSecret = schedulerSharedSecret } if (scheduledDispatch !== undefined) { + if (scheduledDispatch.provider === 'self-hosted' && schedulerSharedSecret === undefined) { + throw new Error('Self-hosted scheduled dispatch requires SCHEDULER_SHARED_SECRET') + } + runtime.scheduledDispatch = scheduledDispatch } if (miniAppUrl !== undefined) { diff --git a/apps/bot/src/scheduled-dispatch-handler.test.ts b/apps/bot/src/scheduled-dispatch-handler.test.ts index 68ba6d7..96a9019 100644 --- a/apps/bot/src/scheduled-dispatch-handler.test.ts +++ b/apps/bot/src/scheduled-dispatch-handler.test.ts @@ -76,6 +76,7 @@ describe('createScheduledDispatchHandler', () => { cancelAdHocNotification: async () => {}, reconcileHouseholdBuiltInDispatches: async () => {}, reconcileAllBuiltInDispatches: async () => {}, + listDueDispatches: async () => [dispatch], getDispatchById: async () => dispatch, claimDispatch: async () => true, releaseDispatch: async () => {}, @@ -162,6 +163,7 @@ describe('createScheduledDispatchHandler', () => { cancelAdHocNotification: async () => {}, reconcileHouseholdBuiltInDispatches: async () => {}, reconcileAllBuiltInDispatches: async () => {}, + listDueDispatches: async () => [dispatch], getDispatchById: async () => dispatch, claimDispatch: async () => true, releaseDispatch: async () => { diff --git a/apps/bot/src/scheduled-dispatch-handler.ts b/apps/bot/src/scheduled-dispatch-handler.ts index 36f1f7e..f53d457 100644 --- a/apps/bot/src/scheduled-dispatch-handler.ts +++ b/apps/bot/src/scheduled-dispatch-handler.ts @@ -32,6 +32,23 @@ function builtInReminderType(kind: 'utilities' | 'rent_warning' | 'rent_due'): R } } +function parsePositiveInteger( + value: string | null, + fallback: number, + { min, max }: { min: number; max: number } +): number { + if (!value) { + return fallback + } + + const parsed = Number(value) + if (!Number.isInteger(parsed)) { + return fallback + } + + return Math.min(Math.max(parsed, min), max) +} + export function createScheduledDispatchHandler(options: { scheduledDispatchService: ScheduledDispatchService adHocNotificationRepository: Pick< @@ -59,6 +76,7 @@ export function createScheduledDispatchHandler(options: { logger?: Logger }): { handle: (request: Request, dispatchId: string) => Promise + handleDueDispatches: (request: Request) => Promise } { async function sendAdHocNotification(dispatchId: string) { const dispatch = await options.scheduledDispatchService.getDispatchById(dispatchId) @@ -236,33 +254,45 @@ export function createScheduledDispatchHandler(options: { } } + async function handleDispatch(dispatchId: string) { + const dispatch = await options.scheduledDispatchService.getDispatchById(dispatchId) + if (!dispatch) { + return { + dispatchId, + outcome: 'noop' as const, + householdId: null, + kind: null + } + } + + const result = + dispatch.kind === 'ad_hoc_notification' + ? await sendAdHocNotification(dispatchId) + : await sendBuiltInReminder(dispatchId) + + options.logger?.info( + { + event: 'scheduler.scheduled_dispatch.handle', + dispatchId, + householdId: dispatch.householdId, + kind: dispatch.kind, + outcome: result.outcome + }, + 'Scheduled dispatch handled' + ) + + return { + dispatchId, + outcome: result.outcome, + householdId: dispatch.householdId, + kind: dispatch.kind + } + } + return { handle: async (_request, dispatchId) => { try { - const dispatch = await options.scheduledDispatchService.getDispatchById(dispatchId) - if (!dispatch) { - return json({ - ok: true, - dispatchId, - outcome: 'noop' - }) - } - - const result = - dispatch.kind === 'ad_hoc_notification' - ? await sendAdHocNotification(dispatchId) - : await sendBuiltInReminder(dispatchId) - - options.logger?.info( - { - event: 'scheduler.scheduled_dispatch.handle', - dispatchId, - householdId: dispatch.householdId, - kind: dispatch.kind, - outcome: result.outcome - }, - 'Scheduled dispatch handled' - ) + const result = await handleDispatch(dispatchId) return json({ ok: true, @@ -287,6 +317,67 @@ export function createScheduledDispatchHandler(options: { 500 ) } + }, + + handleDueDispatches: async (request) => { + const url = new URL(request.url) + const limit = parsePositiveInteger(url.searchParams.get('limit'), 25, { + min: 1, + max: 100 + }) + + try { + const dueDispatches = await options.scheduledDispatchService.listDueDispatches({ limit }) + const results: Array<{ + dispatchId: string + outcome: string + householdId: string | null + kind: string | null + }> = [] + + for (const dispatch of dueDispatches) { + try { + results.push(await handleDispatch(dispatch.id)) + } catch (error) { + options.logger?.error( + { + event: 'scheduler.scheduled_dispatch.bulk_failed', + dispatchId: dispatch.id, + error: error instanceof Error ? error.message : String(error) + }, + 'Scheduled dispatch failed during bulk run' + ) + results.push({ + dispatchId: dispatch.id, + outcome: 'error', + householdId: dispatch.householdId, + kind: dispatch.kind + }) + } + } + + return json({ + ok: true, + scanned: dueDispatches.length, + results + }) + } catch (error) { + options.logger?.error( + { + event: 'scheduler.scheduled_dispatch.bulk_scan_failed', + error: error instanceof Error ? error.message : String(error) + }, + 'Scheduled dispatch bulk scan failed' + ) + + return json( + { + ok: false, + error: error instanceof Error ? error.message : 'Unknown error' + }, + 500 + ) + } } } } diff --git a/apps/bot/src/scheduler-runner.ts b/apps/bot/src/scheduler-runner.ts new file mode 100644 index 0000000..00b25ba --- /dev/null +++ b/apps/bot/src/scheduler-runner.ts @@ -0,0 +1,83 @@ +function requireEnv(name: string): string { + const value = process.env[name]?.trim() + if (!value) { + throw new Error(`${name} environment variable is required`) + } + return value +} + +function parsePositiveInteger(name: string, fallback: number): number { + const raw = process.env[name]?.trim() + if (!raw) { + return fallback + } + + const parsed = Number(raw) + if (!Number.isInteger(parsed) || parsed <= 0) { + throw new Error(`Invalid ${name} value: ${raw}`) + } + + return parsed +} + +async function runOnce() { + const baseUrl = requireEnv('BOT_INTERNAL_BASE_URL').replace(/\/$/, '') + const schedulerSecret = requireEnv('SCHEDULER_SHARED_SECRET') + const dueScanLimit = parsePositiveInteger('SCHEDULER_DUE_SCAN_LIMIT', 25) + + const response = await fetch(`${baseUrl}/jobs/dispatch-due?limit=${dueScanLimit}`, { + method: 'POST', + headers: { + 'x-household-scheduler-secret': schedulerSecret + } + }) + + if (!response.ok) { + throw new Error(`Scheduler scan failed with status ${response.status}`) + } + + const payload = (await response.json()) as { + ok?: boolean + scanned?: number + error?: string + } + + if (payload.ok !== true) { + throw new Error(payload.error ?? 'Scheduler scan failed') + } + + console.log(JSON.stringify({ event: 'scheduler.tick', scanned: payload.scanned ?? 0 })) +} + +async function main() { + const intervalMs = parsePositiveInteger('SCHEDULER_POLL_INTERVAL_MS', 60_000) + let stopping = false + + const stop = () => { + stopping = true + } + + process.on('SIGINT', stop) + process.on('SIGTERM', stop) + + while (!stopping) { + try { + await runOnce() + } catch (error) { + console.error( + JSON.stringify({ + event: 'scheduler.tick_failed', + error: error instanceof Error ? error.message : String(error) + }) + ) + } + + if (stopping) { + break + } + + await Bun.sleep(intervalMs) + } +} + +await main() diff --git a/apps/bot/src/self-hosted-scheduled-dispatch-scheduler.ts b/apps/bot/src/self-hosted-scheduled-dispatch-scheduler.ts new file mode 100644 index 0000000..2ac44c6 --- /dev/null +++ b/apps/bot/src/self-hosted-scheduled-dispatch-scheduler.ts @@ -0,0 +1,27 @@ +import type { + ScheduleOneShotDispatchInput, + ScheduleOneShotDispatchResult, + ScheduledDispatchScheduler +} from '@household/ports' + +function providerDispatchId(dispatchId: string): string { + return `self-hosted:${dispatchId}` +} + +export function createSelfHostedScheduledDispatchScheduler(): ScheduledDispatchScheduler { + return { + provider: 'self-hosted', + + async scheduleOneShotDispatch( + dispatchInput: ScheduleOneShotDispatchInput + ): Promise { + return { + providerDispatchId: providerDispatchId(dispatchInput.dispatchId) + } + }, + + async cancelDispatch(_providerDispatchId) { + return + } + } +} diff --git a/packages/adapters-db/src/scheduled-dispatch-repository.ts b/packages/adapters-db/src/scheduled-dispatch-repository.ts index fa4761d..7b70487 100644 --- a/packages/adapters-db/src/scheduled-dispatch-repository.ts +++ b/packages/adapters-db/src/scheduled-dispatch-repository.ts @@ -1,4 +1,4 @@ -import { and, asc, eq } from 'drizzle-orm' +import { and, asc, eq, lte } from 'drizzle-orm' import { createDbClient, schema } from '@household/db' import { instantFromDatabaseValue, instantToDate, nowInstant } from '@household/domain' @@ -129,6 +129,26 @@ export function createDbScheduledDispatchRepository(databaseUrl: string): { return rows.map(mapScheduledDispatch) }, + async listDueScheduledDispatches(input) { + const filters = [ + eq(schema.scheduledDispatches.status, 'scheduled'), + lte(schema.scheduledDispatches.dueAt, instantToDate(input.dueBefore)) + ] + + if (input.provider) { + filters.push(eq(schema.scheduledDispatches.provider, input.provider)) + } + + const rows = await db + .select(scheduledDispatchSelect()) + .from(schema.scheduledDispatches) + .where(and(...filters)) + .orderBy(asc(schema.scheduledDispatches.dueAt), asc(schema.scheduledDispatches.createdAt)) + .limit(input.limit) + + return rows.map(mapScheduledDispatch) + }, + async updateScheduledDispatch(input) { const updates: Record = { updatedAt: instantToDate(input.updatedAt) diff --git a/packages/application/src/scheduled-dispatch-service.test.ts b/packages/application/src/scheduled-dispatch-service.test.ts index 452d1b4..a647a83 100644 --- a/packages/application/src/scheduled-dispatch-service.test.ts +++ b/packages/application/src/scheduled-dispatch-service.test.ts @@ -68,6 +68,19 @@ class ScheduledDispatchRepositoryStub implements ScheduledDispatchRepository { return [...this.dispatches.values()].filter((dispatch) => dispatch.householdId === householdId) } + async listDueScheduledDispatches(input: { + dueBefore: Temporal.Instant + provider?: ScheduledDispatchRecord['provider'] + limit: number + }): Promise { + return [...this.dispatches.values()] + .filter((dispatch) => dispatch.status === 'scheduled') + .filter((dispatch) => dispatch.dueAt.epochMilliseconds <= input.dueBefore.epochMilliseconds) + .filter((dispatch) => (input.provider ? dispatch.provider === input.provider : true)) + .sort((left, right) => left.dueAt.epochMilliseconds - right.dueAt.epochMilliseconds) + .slice(0, input.limit) + } + async updateScheduledDispatch(input: { dispatchId: string dueAt?: Temporal.Instant diff --git a/packages/application/src/scheduled-dispatch-service.ts b/packages/application/src/scheduled-dispatch-service.ts index 538ec55..d207ff7 100644 --- a/packages/application/src/scheduled-dispatch-service.ts +++ b/packages/application/src/scheduled-dispatch-service.ts @@ -86,6 +86,7 @@ export interface ScheduledDispatchService { cancelAdHocNotification(notificationId: string, cancelledAt?: Instant): Promise reconcileHouseholdBuiltInDispatches(householdId: string, asOf?: Instant): Promise reconcileAllBuiltInDispatches(asOf?: Instant): Promise + listDueDispatches(input?: { asOf?: Instant; limit?: number }): Promise getDispatchById(dispatchId: string): Promise claimDispatch(dispatchId: string): Promise releaseDispatch(dispatchId: string): Promise @@ -307,6 +308,14 @@ export function createScheduledDispatchService(input: { } }, + listDueDispatches(inputValue) { + return input.repository.listDueScheduledDispatches({ + dueBefore: inputValue?.asOf ?? nowInstant(), + provider: input.scheduler.provider, + limit: inputValue?.limit ?? 25 + }) + }, + getDispatchById(dispatchId) { return input.repository.getScheduledDispatchById(dispatchId) }, diff --git a/packages/ports/src/scheduled-dispatches.ts b/packages/ports/src/scheduled-dispatches.ts index 719e3b6..7c192bd 100644 --- a/packages/ports/src/scheduled-dispatches.ts +++ b/packages/ports/src/scheduled-dispatches.ts @@ -7,7 +7,11 @@ export const SCHEDULED_DISPATCH_KINDS = [ 'rent_due' ] as const export const SCHEDULED_DISPATCH_STATUSES = ['scheduled', 'sent', 'cancelled'] as const -export const SCHEDULED_DISPATCH_PROVIDERS = ['gcp-cloud-tasks', 'aws-eventbridge'] as const +export const SCHEDULED_DISPATCH_PROVIDERS = [ + 'gcp-cloud-tasks', + 'aws-eventbridge', + 'self-hosted' +] as const export type ScheduledDispatchKind = (typeof SCHEDULED_DISPATCH_KINDS)[number] export type ScheduledDispatchStatus = (typeof SCHEDULED_DISPATCH_STATUSES)[number] @@ -64,6 +68,11 @@ export interface ScheduledDispatchRepository { listScheduledDispatchesForHousehold( householdId: string ): Promise + listDueScheduledDispatches(input: { + dueBefore: Instant + provider?: ScheduledDispatchProvider + limit: number + }): Promise updateScheduledDispatch( input: UpdateScheduledDispatchInput ): Promise