feat(bot): add self-hosted scheduled dispatch support

Co-authored-by: claw <stanislavkalishin+claw@gmail.com>
This commit is contained in:
2026-03-30 15:27:15 +02:00
parent 94c1f48794
commit 575a68b3bb
13 changed files with 331 additions and 40 deletions

View File

@@ -1,6 +1,6 @@
# syntax=docker/dockerfile:1.7
FROM oven/bun:1.3.10 AS deps
FROM oven/bun:1.3.10-alpine AS deps
WORKDIR /app
COPY bun.lock package.json tsconfig.base.json ./
@@ -25,19 +25,23 @@ WORKDIR /app
COPY apps ./apps
COPY packages ./packages
RUN bun run --filter @household/bot build
RUN bun run --filter @household/bot build \
&& mkdir -p packages/db/dist \
&& bun build packages/db/src/migrate.ts --outdir packages/db/dist --target bun
FROM oven/bun:1.3.10 AS runtime
FROM oven/bun:1.3.10-alpine AS runtime
WORKDIR /app
ENV NODE_ENV=production
ENV PORT=8080
COPY --from=build /app/apps/bot/dist ./apps/bot/dist
COPY --from=build /app/packages/db/dist ./packages/db/dist
COPY --from=build /app/packages/db/drizzle ./packages/db/drizzle
EXPOSE 8080
HEALTHCHECK --interval=30s --timeout=5s --start-period=10s --retries=3 \
CMD bun -e "fetch('http://127.0.0.1:' + (process.env.PORT ?? '8080') + '/health').then((res) => process.exit(res.ok ? 0 : 1)).catch(() => process.exit(1))"
CMD bun -e "fetch('http://127.0.0.1:' + (process.env.PORT ?? '8080') + '/healthz').then((res) => process.exit(res.ok ? 0 : 1)).catch(() => process.exit(1))"
CMD ["bun", "apps/bot/dist/index.js"]

View File

@@ -4,7 +4,7 @@
"type": "module",
"scripts": {
"dev": "bun run src/index.ts",
"build": "bun build src/index.ts src/lambda.ts --outdir dist --target bun",
"build": "bun build src/index.ts src/lambda.ts src/scheduler-runner.ts --outdir dist --target bun",
"typecheck": "tsgo --project tsconfig.json --noEmit",
"test": "bun test --pass-with-no-tests",
"lint": "oxlint \"src\""

View File

@@ -39,6 +39,7 @@ import { registerHouseholdSetupCommands } from './household-setup'
import { HouseholdContextCache } from './household-context-cache'
import { createAwsScheduledDispatchScheduler } from './aws-scheduled-dispatch-scheduler'
import { createGcpScheduledDispatchScheduler } from './gcp-scheduled-dispatch-scheduler'
import { createSelfHostedScheduledDispatchScheduler } from './self-hosted-scheduled-dispatch-scheduler'
import { createMiniAppAuthHandler, createMiniAppJoinHandler } from './miniapp-auth'
import {
createMiniAppApproveMemberHandler,
@@ -139,21 +140,24 @@ export async function createBotRuntimeApp(): Promise<BotRuntimeApp> {
? createDbScheduledDispatchRepository(runtime.databaseUrl)
: null
const scheduledDispatchScheduler =
runtime.scheduledDispatch && runtime.schedulerSharedSecret
runtime.scheduledDispatch &&
(runtime.scheduledDispatch.provider === 'self-hosted' || runtime.schedulerSharedSecret)
? runtime.scheduledDispatch.provider === 'gcp-cloud-tasks'
? createGcpScheduledDispatchScheduler({
projectId: runtime.scheduledDispatch.projectId,
location: runtime.scheduledDispatch.location,
queue: runtime.scheduledDispatch.queue,
publicBaseUrl: runtime.scheduledDispatch.publicBaseUrl,
sharedSecret: runtime.schedulerSharedSecret
})
: createAwsScheduledDispatchScheduler({
region: runtime.scheduledDispatch.region,
targetLambdaArn: runtime.scheduledDispatch.targetLambdaArn,
roleArn: runtime.scheduledDispatch.roleArn,
groupName: runtime.scheduledDispatch.groupName
sharedSecret: runtime.schedulerSharedSecret!
})
: runtime.scheduledDispatch.provider === 'aws-eventbridge'
? createAwsScheduledDispatchScheduler({
region: runtime.scheduledDispatch.region,
targetLambdaArn: runtime.scheduledDispatch.targetLambdaArn,
roleArn: runtime.scheduledDispatch.roleArn,
groupName: runtime.scheduledDispatch.groupName
})
: createSelfHostedScheduledDispatchScheduler()
: null
const scheduledDispatchService =
scheduledDispatchRepositoryClient &&
@@ -514,7 +518,7 @@ export async function createBotRuntimeApp(): Promise<BotRuntimeApp> {
event: 'runtime.feature_disabled',
feature: 'scheduled-dispatch'
},
'Scheduled dispatch is disabled. Configure DATABASE_URL, SCHEDULED_DISPATCH_PROVIDER, and scheduler auth to enable reminder delivery.'
'Scheduled dispatch is disabled. Configure DATABASE_URL and SCHEDULED_DISPATCH_PROVIDER to enable reminder delivery.'
)
}
@@ -933,6 +937,12 @@ export async function createBotRuntimeApp(): Promise<BotRuntimeApp> {
oidcAllowedEmails: runtime.schedulerOidcAllowedEmails
}).authorize,
handler: async (request, jobPath) => {
if (jobPath === 'dispatch-due') {
return scheduledDispatchHandler
? scheduledDispatchHandler.handleDueDispatches(request)
: new Response('Not Found', { status: 404 })
}
if (jobPath.startsWith('dispatch/')) {
return scheduledDispatchHandler
? scheduledDispatchHandler.handle(request, jobPath.slice('dispatch/'.length))
@@ -949,6 +959,12 @@ export async function createBotRuntimeApp(): Promise<BotRuntimeApp> {
oidcAllowedEmails: runtime.schedulerOidcAllowedEmails
}).authorize,
handler: async (request, jobPath) => {
if (jobPath === 'dispatch-due') {
return scheduledDispatchHandler
? scheduledDispatchHandler.handleDueDispatches(request)
: new Response('Not Found', { status: 404 })
}
if (jobPath.startsWith('dispatch/')) {
return scheduledDispatchHandler
? scheduledDispatchHandler.handle(request, jobPath.slice('dispatch/'.length))

View File

@@ -28,6 +28,9 @@ export interface BotRuntimeConfig {
roleArn: string
groupName: string
}
| {
provider: 'self-hosted'
}
| undefined
openaiApiKey?: string
purchaseParserModel: string
@@ -148,6 +151,12 @@ function parseScheduledDispatchConfig(
}
}
if (provider === 'self-hosted') {
return {
provider
}
}
throw new Error(`Invalid SCHEDULED_DISPATCH_PROVIDER value: ${provider}`)
}
@@ -239,6 +248,10 @@ export function getBotRuntimeConfig(env: NodeJS.ProcessEnv = process.env): BotRu
runtime.schedulerSharedSecret = schedulerSharedSecret
}
if (scheduledDispatch !== undefined) {
if (scheduledDispatch.provider === 'self-hosted' && schedulerSharedSecret === undefined) {
throw new Error('Self-hosted scheduled dispatch requires SCHEDULER_SHARED_SECRET')
}
runtime.scheduledDispatch = scheduledDispatch
}
if (miniAppUrl !== undefined) {

View File

@@ -76,6 +76,7 @@ describe('createScheduledDispatchHandler', () => {
cancelAdHocNotification: async () => {},
reconcileHouseholdBuiltInDispatches: async () => {},
reconcileAllBuiltInDispatches: async () => {},
listDueDispatches: async () => [dispatch],
getDispatchById: async () => dispatch,
claimDispatch: async () => true,
releaseDispatch: async () => {},
@@ -162,6 +163,7 @@ describe('createScheduledDispatchHandler', () => {
cancelAdHocNotification: async () => {},
reconcileHouseholdBuiltInDispatches: async () => {},
reconcileAllBuiltInDispatches: async () => {},
listDueDispatches: async () => [dispatch],
getDispatchById: async () => dispatch,
claimDispatch: async () => true,
releaseDispatch: async () => {

View File

@@ -32,6 +32,23 @@ function builtInReminderType(kind: 'utilities' | 'rent_warning' | 'rent_due'): R
}
}
function parsePositiveInteger(
value: string | null,
fallback: number,
{ min, max }: { min: number; max: number }
): number {
if (!value) {
return fallback
}
const parsed = Number(value)
if (!Number.isInteger(parsed)) {
return fallback
}
return Math.min(Math.max(parsed, min), max)
}
export function createScheduledDispatchHandler(options: {
scheduledDispatchService: ScheduledDispatchService
adHocNotificationRepository: Pick<
@@ -59,6 +76,7 @@ export function createScheduledDispatchHandler(options: {
logger?: Logger
}): {
handle: (request: Request, dispatchId: string) => Promise<Response>
handleDueDispatches: (request: Request) => Promise<Response>
} {
async function sendAdHocNotification(dispatchId: string) {
const dispatch = await options.scheduledDispatchService.getDispatchById(dispatchId)
@@ -236,33 +254,45 @@ export function createScheduledDispatchHandler(options: {
}
}
async function handleDispatch(dispatchId: string) {
const dispatch = await options.scheduledDispatchService.getDispatchById(dispatchId)
if (!dispatch) {
return {
dispatchId,
outcome: 'noop' as const,
householdId: null,
kind: null
}
}
const result =
dispatch.kind === 'ad_hoc_notification'
? await sendAdHocNotification(dispatchId)
: await sendBuiltInReminder(dispatchId)
options.logger?.info(
{
event: 'scheduler.scheduled_dispatch.handle',
dispatchId,
householdId: dispatch.householdId,
kind: dispatch.kind,
outcome: result.outcome
},
'Scheduled dispatch handled'
)
return {
dispatchId,
outcome: result.outcome,
householdId: dispatch.householdId,
kind: dispatch.kind
}
}
return {
handle: async (_request, dispatchId) => {
try {
const dispatch = await options.scheduledDispatchService.getDispatchById(dispatchId)
if (!dispatch) {
return json({
ok: true,
dispatchId,
outcome: 'noop'
})
}
const result =
dispatch.kind === 'ad_hoc_notification'
? await sendAdHocNotification(dispatchId)
: await sendBuiltInReminder(dispatchId)
options.logger?.info(
{
event: 'scheduler.scheduled_dispatch.handle',
dispatchId,
householdId: dispatch.householdId,
kind: dispatch.kind,
outcome: result.outcome
},
'Scheduled dispatch handled'
)
const result = await handleDispatch(dispatchId)
return json({
ok: true,
@@ -287,6 +317,67 @@ export function createScheduledDispatchHandler(options: {
500
)
}
},
handleDueDispatches: async (request) => {
const url = new URL(request.url)
const limit = parsePositiveInteger(url.searchParams.get('limit'), 25, {
min: 1,
max: 100
})
try {
const dueDispatches = await options.scheduledDispatchService.listDueDispatches({ limit })
const results: Array<{
dispatchId: string
outcome: string
householdId: string | null
kind: string | null
}> = []
for (const dispatch of dueDispatches) {
try {
results.push(await handleDispatch(dispatch.id))
} catch (error) {
options.logger?.error(
{
event: 'scheduler.scheduled_dispatch.bulk_failed',
dispatchId: dispatch.id,
error: error instanceof Error ? error.message : String(error)
},
'Scheduled dispatch failed during bulk run'
)
results.push({
dispatchId: dispatch.id,
outcome: 'error',
householdId: dispatch.householdId,
kind: dispatch.kind
})
}
}
return json({
ok: true,
scanned: dueDispatches.length,
results
})
} catch (error) {
options.logger?.error(
{
event: 'scheduler.scheduled_dispatch.bulk_scan_failed',
error: error instanceof Error ? error.message : String(error)
},
'Scheduled dispatch bulk scan failed'
)
return json(
{
ok: false,
error: error instanceof Error ? error.message : 'Unknown error'
},
500
)
}
}
}
}

View File

@@ -0,0 +1,83 @@
function requireEnv(name: string): string {
const value = process.env[name]?.trim()
if (!value) {
throw new Error(`${name} environment variable is required`)
}
return value
}
function parsePositiveInteger(name: string, fallback: number): number {
const raw = process.env[name]?.trim()
if (!raw) {
return fallback
}
const parsed = Number(raw)
if (!Number.isInteger(parsed) || parsed <= 0) {
throw new Error(`Invalid ${name} value: ${raw}`)
}
return parsed
}
async function runOnce() {
const baseUrl = requireEnv('BOT_INTERNAL_BASE_URL').replace(/\/$/, '')
const schedulerSecret = requireEnv('SCHEDULER_SHARED_SECRET')
const dueScanLimit = parsePositiveInteger('SCHEDULER_DUE_SCAN_LIMIT', 25)
const response = await fetch(`${baseUrl}/jobs/dispatch-due?limit=${dueScanLimit}`, {
method: 'POST',
headers: {
'x-household-scheduler-secret': schedulerSecret
}
})
if (!response.ok) {
throw new Error(`Scheduler scan failed with status ${response.status}`)
}
const payload = (await response.json()) as {
ok?: boolean
scanned?: number
error?: string
}
if (payload.ok !== true) {
throw new Error(payload.error ?? 'Scheduler scan failed')
}
console.log(JSON.stringify({ event: 'scheduler.tick', scanned: payload.scanned ?? 0 }))
}
async function main() {
const intervalMs = parsePositiveInteger('SCHEDULER_POLL_INTERVAL_MS', 60_000)
let stopping = false
const stop = () => {
stopping = true
}
process.on('SIGINT', stop)
process.on('SIGTERM', stop)
while (!stopping) {
try {
await runOnce()
} catch (error) {
console.error(
JSON.stringify({
event: 'scheduler.tick_failed',
error: error instanceof Error ? error.message : String(error)
})
)
}
if (stopping) {
break
}
await Bun.sleep(intervalMs)
}
}
await main()

View File

@@ -0,0 +1,27 @@
import type {
ScheduleOneShotDispatchInput,
ScheduleOneShotDispatchResult,
ScheduledDispatchScheduler
} from '@household/ports'
function providerDispatchId(dispatchId: string): string {
return `self-hosted:${dispatchId}`
}
export function createSelfHostedScheduledDispatchScheduler(): ScheduledDispatchScheduler {
return {
provider: 'self-hosted',
async scheduleOneShotDispatch(
dispatchInput: ScheduleOneShotDispatchInput
): Promise<ScheduleOneShotDispatchResult> {
return {
providerDispatchId: providerDispatchId(dispatchInput.dispatchId)
}
},
async cancelDispatch(_providerDispatchId) {
return
}
}
}