add a scheduler api

Signed-off-by: Fredrik Adelöw <freben@gmail.com>
This commit is contained in:
Fredrik Adelöw
2025-04-11 13:22:35 +02:00
parent f2a9429a20
commit d385854e2f
25 changed files with 662 additions and 68 deletions
+5
View File
@@ -0,0 +1,5 @@
---
'@backstage/backend-defaults': patch
---
The `DefaultSchedulerService` now accepts `HttpRouterService` and `PluginMetadataService` arguments. If you supply a router, the scheduler will register a REST API for listing and triggering tasks.
+5
View File
@@ -0,0 +1,5 @@
---
'@backstage/plugin-catalog-backend-module-backstage-openapi': patch
---
Do not swallow errors; instead allow them to bubble up to the task scheduler for better tracking and logging.
+5
View File
@@ -0,0 +1,5 @@
---
'@backstage/backend-plugin-api': patch
---
Minor doc comment update
@@ -0,0 +1,47 @@
/*
* Copyright 2024 The Backstage Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// @ts-check
/**
* @param { import("knex").Knex } knex
* @returns { Promise<void> }
*/
exports.up = async function up(knex) {
await knex.schema.alterTable('backstage_backend_tasks__tasks', table => {
table
.text('last_run_error_json', 'longtext')
.nullable()
.comment(
'JSON serialized error object from the last task run, if it failed',
);
table
.dateTime('last_run_ended_at')
.nullable()
.comment('The last time that the task ended');
});
};
/**
* @param { import("knex").Knex } knex
* @returns { Promise<void> }
*/
exports.down = async function down(knex) {
await knex.schema.alterTable('backstage_backend_tasks__tasks', table => {
table.dropColumn('last_run_error_json');
table.dropColumn('last_run_ended_at');
});
};
@@ -4,7 +4,9 @@
```ts
import { DatabaseService } from '@backstage/backend-plugin-api';
import { HttpRouterService } from '@backstage/backend-plugin-api';
import { LoggerService } from '@backstage/backend-plugin-api';
import { PluginMetadataService } from '@backstage/backend-plugin-api';
import { RootLifecycleService } from '@backstage/backend-plugin-api';
import { SchedulerService } from '@backstage/backend-plugin-api';
import { ServiceFactory } from '@backstage/backend-plugin-api';
@@ -16,6 +18,8 @@ export class DefaultSchedulerService {
database: DatabaseService;
logger: LoggerService;
rootLifecycle?: RootLifecycleService;
httpRouter?: HttpRouterService;
pluginMetadata?: PluginMetadataService;
}): SchedulerService;
}
@@ -10,6 +10,8 @@
| `current_run_started_at` | `timestamp with time zone` | true | - | - |
| `current_run_ticket` | `text` | true | - | - |
| `id` | `character varying` | false | 255 | - |
| `last_run_ended_at` | `timestamp with time zone` | true | - | - |
| `last_run_error_json` | `text` | true | - | - |
| `next_run_start_at` | `timestamp with time zone` | true | - | - |
| `settings_json` | `text` | false | - | - |
@@ -18,12 +18,12 @@ import { resolvePackagePath } from '@backstage/backend-plugin-api';
import { Knex } from 'knex';
import { DB_MIGRATIONS_TABLE } from './tables';
export async function migrateBackendTasks(knex: Knex): Promise<void> {
const migrationsDir = resolvePackagePath(
'@backstage/backend-defaults',
'migrations/scheduler',
);
export const migrationsDir = resolvePackagePath(
'@backstage/backend-defaults',
'migrations/scheduler',
);
export async function migrateBackendTasks(knex: Knex): Promise<void> {
await knex.migrate.latest({
directory: migrationsDir,
tableName: DB_MIGRATIONS_TABLE,
@@ -0,0 +1,108 @@
/*
* Copyright 2022 The Backstage Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { Knex } from 'knex';
import { TestDatabases } from '@backstage/backend-test-utils';
import fs from 'fs';
import { migrationsDir } from './migrateBackendTasks';
const migrationsFiles = fs.readdirSync(migrationsDir).sort();
async function migrateUpOnce(knex: Knex): Promise<void> {
await knex.migrate.up({ directory: migrationsDir });
}
async function migrateDownOnce(knex: Knex): Promise<void> {
await knex.migrate.down({ directory: migrationsDir });
}
async function migrateUntilBefore(knex: Knex, target: string): Promise<void> {
const index = migrationsFiles.indexOf(target);
if (index === -1) {
throw new Error(`Migration ${target} not found`);
}
for (let i = 0; i < index; i++) {
await migrateUpOnce(knex);
}
}
jest.setTimeout(60_000);
describe('migrations', () => {
const databases = TestDatabases.create();
it.each(databases.eachSupportedId())(
'20250411000000_last_run.js, %p',
async databaseId => {
const knex = await databases.init(databaseId);
await migrateUntilBefore(knex, '20250411000000_last_run.js');
await knex
.insert({
id: 'i',
settings_json: '{}',
})
.into('backstage_backend_tasks__tasks');
await expect(knex('backstage_backend_tasks__tasks')).resolves.toEqual([
{
id: 'i',
settings_json: '{}',
next_run_start_at: null,
current_run_ticket: null,
current_run_started_at: null,
current_run_expires_at: null,
},
]);
await migrateUpOnce(knex);
await knex
.table('backstage_backend_tasks__tasks')
.update({ last_run_error_json: 'error' })
.where({ id: 'i' });
await expect(knex('backstage_backend_tasks__tasks')).resolves.toEqual([
{
id: 'i',
settings_json: '{}',
next_run_start_at: null,
current_run_ticket: null,
current_run_started_at: null,
current_run_expires_at: null,
last_run_ended_at: null,
last_run_error_json: 'error',
},
]);
await migrateDownOnce(knex);
await expect(knex('backstage_backend_tasks__tasks')).resolves.toEqual([
{
id: 'i',
settings_json: '{}',
next_run_start_at: null,
current_run_ticket: null,
current_run_started_at: null,
current_run_expires_at: null,
},
]);
await knex.destroy();
},
);
});
@@ -20,8 +20,10 @@ export const DB_TASKS_TABLE = 'backstage_backend_tasks__tasks';
export type DbTasksRow = {
id: string;
settings_json: string;
next_run_start_at: Date;
next_run_start_at?: Date | string; // This can be null when in manual trigger mode
current_run_ticket?: string;
current_run_started_at?: Date | string;
current_run_expires_at?: Date | string;
last_run_error_json?: string;
last_run_ended_at?: Date | string;
};
@@ -16,7 +16,9 @@
import {
DatabaseService,
HttpRouterService,
LoggerService,
PluginMetadataService,
RootLifecycleService,
SchedulerService,
} from '@backstage/backend-plugin-api';
@@ -36,6 +38,8 @@ export class DefaultSchedulerService {
database: DatabaseService;
logger: LoggerService;
rootLifecycle?: RootLifecycleService;
httpRouter?: HttpRouterService;
pluginMetadata?: PluginMetadataService;
}): SchedulerService {
const databaseFactory = once(async () => {
const knex = await options.database.getClient();
@@ -59,10 +63,15 @@ export class DefaultSchedulerService {
return knex;
});
return new PluginTaskSchedulerImpl(
const scheduler = new PluginTaskSchedulerImpl(
options.pluginMetadata?.getId() ?? 'unknown',
databaseFactory,
options.logger,
options.rootLifecycle,
);
options.httpRouter?.use(scheduler.getRouter());
return scheduler;
}
}
@@ -16,6 +16,9 @@
import { LocalTaskWorker } from './LocalTaskWorker';
import { mockServices } from '@backstage/backend-test-utils';
import waitFor from 'wait-for-expect';
jest.setTimeout(10_000);
describe('LocalTaskWorker', () => {
const logger = mockServices.logger.mock();
@@ -106,4 +109,108 @@ describe('LocalTaskWorker', () => {
expect(fn).toHaveBeenCalledTimes(2);
controller.abort();
});
it('goes through the expected states', async () => {
const fn = jest
.fn()
.mockImplementationOnce(() => new Promise<void>(r => setTimeout(r, 100)))
.mockImplementationOnce(
() =>
new Promise<void>((_, r) =>
setTimeout(() => r(new Error('boo')), 100),
),
)
.mockImplementation(() => new Promise<void>(r => setTimeout(r, 100)));
const controller = new AbortController();
const worker = new LocalTaskWorker('a', fn, logger);
worker.start(
{
version: 2,
initialDelayDuration: 'PT0.5S',
cadence: 'PT0.5S',
timeoutAfterDuration: 'PT1S',
},
{ signal: controller.signal },
);
await waitFor(() => {
expect(worker.taskState()).toEqual({
status: 'idle',
startsAt: expect.any(String),
});
expect(worker.workerState()).toEqual({
status: 'initial-wait',
});
});
// Start, complete successfully
await waitFor(() => {
expect(worker.taskState()).toEqual({
status: 'running',
startedAt: expect.any(String),
timesOutAt: expect.any(String),
});
expect(worker.workerState()).toEqual({
status: 'running',
});
});
await waitFor(() => {
expect(worker.taskState()).toEqual({
status: 'idle',
startsAt: expect.any(String),
lastRunEndedAt: expect.any(String),
});
expect(worker.workerState()).toEqual({
status: 'idle',
});
});
// Start, complete with error
await waitFor(() => {
expect(worker.taskState()).toEqual({
status: 'running',
startedAt: expect.any(String),
timesOutAt: expect.any(String),
lastRunEndedAt: expect.any(String),
});
expect(worker.workerState()).toEqual({
status: 'running',
});
});
await waitFor(() => {
expect(worker.taskState()).toEqual({
status: 'idle',
startsAt: expect.any(String),
lastRunEndedAt: expect.any(String),
lastRunError: expect.any(String),
});
expect(worker.workerState()).toEqual({
status: 'idle',
});
});
// Start, complete successfully
await waitFor(() => {
expect(worker.taskState()).toEqual({
status: 'running',
startedAt: expect.any(String),
timesOutAt: expect.any(String),
lastRunEndedAt: expect.any(String),
});
expect(worker.workerState()).toEqual({
status: 'running',
});
});
await waitFor(() => {
expect(worker.taskState()).toEqual({
status: 'idle',
startsAt: expect.any(String),
lastRunEndedAt: expect.any(String),
});
expect(worker.workerState()).toEqual({
status: 'idle',
});
});
});
});
@@ -19,8 +19,8 @@ import { SchedulerServiceTaskFunction } from '@backstage/backend-plugin-api';
import { ConflictError } from '@backstage/errors';
import { CronTime } from 'cron';
import { DateTime, Duration } from 'luxon';
import { TaskSettingsV2 } from './types';
import { delegateAbortController, sleep } from './util';
import { TaskSettingsV2, TaskApiTasksResponse } from './types';
import { delegateAbortController, serializeError, sleep } from './util';
/**
* Implements tasks that run locally without cross-host collaboration.
@@ -29,6 +29,12 @@ import { delegateAbortController, sleep } from './util';
*/
export class LocalTaskWorker {
private abortWait: AbortController | undefined;
#taskState: Exclude<TaskApiTasksResponse['taskState'], null> = {
status: 'idle',
};
#workerState: TaskApiTasksResponse['workerState'] = {
status: 'idle',
};
constructor(
private readonly taskId: string,
@@ -45,12 +51,7 @@ export class LocalTaskWorker {
let attemptNum = 1;
for (;;) {
try {
if (settings.initialDelayDuration) {
await this.sleep(
Duration.fromISO(settings.initialDelayDuration),
options.signal,
);
}
await this.performInitialWait(settings, options.signal);
while (!options.signal.aborted) {
const startTime = process.hrtime();
@@ -84,6 +85,38 @@ export class LocalTaskWorker {
this.abortWait.abort();
}
taskState(): TaskApiTasksResponse['taskState'] {
return this.#taskState;
}
workerState(): TaskApiTasksResponse['workerState'] {
return this.#workerState;
}
/**
* Does the once-at-startup initial wait, if configured.
*/
private async performInitialWait(
settings: TaskSettingsV2,
signal: AbortSignal,
): Promise<void> {
if (settings.initialDelayDuration) {
const parsedDuration = Duration.fromISO(settings.initialDelayDuration);
this.#taskState = {
status: 'idle',
startsAt: DateTime.utc().plus(parsedDuration).toISO()!,
lastRunEndedAt: this.#taskState.lastRunEndedAt,
lastRunError: this.#taskState.lastRunError,
};
this.#workerState = {
status: 'initial-wait',
};
await this.sleep(parsedDuration, signal);
}
}
/**
* Makes a single attempt at running the task to completion.
*/
@@ -94,14 +127,29 @@ export class LocalTaskWorker {
// Abort the task execution either if the worker is stopped, or if the
// task timeout is hit
const taskAbortController = delegateAbortController(signal);
const timeoutDuration = Duration.fromISO(settings.timeoutAfterDuration);
const timeoutHandle = setTimeout(() => {
taskAbortController.abort();
}, Duration.fromISO(settings.timeoutAfterDuration).as('milliseconds'));
}, timeoutDuration.as('milliseconds'));
this.#taskState = {
status: 'running',
startedAt: DateTime.utc().toISO()!,
timesOutAt: DateTime.utc().plus(timeoutDuration).toISO()!,
lastRunEndedAt: this.#taskState.lastRunEndedAt,
lastRunError: this.#taskState.lastRunError,
};
this.#workerState = {
status: 'running',
};
try {
await this.fn(taskAbortController.signal);
this.#taskState.lastRunEndedAt = DateTime.utc().toISO()!;
this.#taskState.lastRunError = undefined;
} catch (e) {
// ignore intentionally
this.#taskState.lastRunEndedAt = DateTime.utc().toISO()!;
this.#taskState.lastRunError = serializeError(e);
}
// release resources
@@ -133,11 +181,20 @@ export class LocalTaskWorker {
}
dt = Math.max(dt, 0);
const startsAt = DateTime.now().plus(Duration.fromMillis(dt));
this.#taskState = {
status: 'idle',
startsAt: startsAt.toISO()!,
lastRunEndedAt: this.#taskState.lastRunEndedAt,
lastRunError: this.#taskState.lastRunError,
};
this.#workerState = {
status: 'idle',
};
this.logger.debug(
`task: ${this.taskId} will next occur around ${DateTime.now().plus(
Duration.fromMillis(dt),
)}`,
`task: ${this.taskId} will next occur around ${startsAt}`,
);
await this.sleep(Duration.fromMillis(dt), signal);
@@ -53,6 +53,7 @@ describe('PluginTaskManagerImpl', () => {
const knex = await databases.init(databaseId);
await migrateBackendTasks(knex);
const manager = new PluginTaskSchedulerImpl(
'myplugin',
async () => knex,
mockServices.logger.mock(),
{
@@ -27,9 +27,11 @@ import {
import { Counter, Histogram, Gauge, metrics, trace } from '@opentelemetry/api';
import { Knex } from 'knex';
import { Duration } from 'luxon';
import express from 'express';
import Router from 'express-promise-router';
import { LocalTaskWorker } from './LocalTaskWorker';
import { TaskWorker } from './TaskWorker';
import { TaskSettingsV2 } from './types';
import { TaskSettingsV2, TaskApiTasksResponse } from './types';
import { delegateAbortController, TRACER_ID, validateId } from './util';
const tracer = trace.getTracer(TRACER_ID);
@@ -38,7 +40,8 @@ const tracer = trace.getTracer(TRACER_ID);
* Implements the actual task management.
*/
export class PluginTaskSchedulerImpl implements SchedulerService {
private readonly localTasksById = new Map<string, LocalTaskWorker>();
private readonly localWorkersById = new Map<string, LocalTaskWorker>();
private readonly globalWorkersById = new Map<string, TaskWorker>();
private readonly allScheduledTasks: SchedulerServiceTaskDescriptor[] = [];
private readonly shutdownInitiated: Promise<boolean>;
@@ -48,6 +51,7 @@ export class PluginTaskSchedulerImpl implements SchedulerService {
private readonly lastCompleted: Gauge;
constructor(
private readonly pluginId: string,
private readonly databaseFactory: () => Promise<Knex>,
private readonly logger: LoggerService,
rootLifecycle?: RootLifecycleService,
@@ -77,7 +81,7 @@ export class PluginTaskSchedulerImpl implements SchedulerService {
}
async triggerTask(id: string): Promise<void> {
const localTask = this.localTasksById.get(id);
const localTask = this.localWorkersById.get(id);
if (localTask) {
localTask.trigger();
return;
@@ -116,6 +120,7 @@ export class PluginTaskSchedulerImpl implements SchedulerService {
this.logger.child({ task: task.id }),
);
await worker.start(settings, { signal: abortController.signal });
this.globalWorkersById.set(task.id, worker);
} else {
const worker = new LocalTaskWorker(
task.id,
@@ -123,7 +128,7 @@ export class PluginTaskSchedulerImpl implements SchedulerService {
this.logger.child({ task: task.id }),
);
worker.start(settings, { signal: abortController.signal });
this.localTasksById.set(task.id, worker);
this.localWorkersById.set(task.id, worker);
}
this.allScheduledTasks.push({
@@ -147,6 +152,47 @@ export class PluginTaskSchedulerImpl implements SchedulerService {
return this.allScheduledTasks;
}
getRouter(): express.Router {
const router = Router();
router.get('/.backstage/scheduler/v1/tasks', async (_, res) => {
const globalState = await TaskWorker.taskStates(
await this.databaseFactory(),
);
const tasks = new Array<TaskApiTasksResponse>();
for (const task of this.allScheduledTasks) {
tasks.push({
taskId: task.id,
pluginId: this.pluginId,
scope: task.scope,
settings: task.settings,
taskState:
this.localWorkersById.get(task.id)?.taskState() ??
globalState.get(task.id) ??
null,
workerState:
this.localWorkersById.get(task.id)?.workerState() ??
this.globalWorkersById.get(task.id)?.workerState() ??
null,
});
}
res.json({ tasks });
});
router.post(
'/.backstage/scheduler/v1/tasks/:id/trigger',
async (req, res) => {
const { id } = req.params;
await this.triggerTask(id);
res.status(200).end();
},
);
return router;
}
private instrumentedFunction(
task: SchedulerServiceTaskInvocationDefinition,
scope: string,
@@ -87,6 +87,8 @@ describe('PluginTaskSchedulerJanitor', () => {
current_run_ticket: null,
current_run_started_at: null,
current_run_expires_at: null,
last_run_ended_at: expect.anything(),
last_run_error_json: expect.stringContaining('Task timed out'),
}),
);
});
@@ -18,7 +18,7 @@ import { LoggerService } from '@backstage/backend-plugin-api';
import { Knex } from 'knex';
import { Duration } from 'luxon';
import { DB_TASKS_TABLE, DbTasksRow } from '../database/tables';
import { sleep } from './util';
import { serializeError, sleep } from './util';
/**
* Makes sure to auto-expire and clean up things that time out or for other
@@ -69,6 +69,8 @@ export class PluginTaskSchedulerJanitor {
current_run_ticket: dbNull,
current_run_started_at: dbNull,
current_run_expires_at: dbNull,
last_run_ended_at: this.knex.fn.now(),
last_run_error_json: serializeError(new Error('Task timed out')),
});
} else {
tasks = await this.knex<DbTasksRow>(DB_TASKS_TABLE)
@@ -77,6 +79,8 @@ export class PluginTaskSchedulerJanitor {
current_run_ticket: dbNull,
current_run_started_at: dbNull,
current_run_expires_at: dbNull,
last_run_ended_at: this.knex.fn.now(),
last_run_error_json: serializeError(new Error('Task timed out')),
})
.returning(['id']);
}
@@ -68,6 +68,17 @@ describe('TaskWorker', () => {
initialDelayDuration: 'PT1S',
timeoutAfterDuration: 'PT1M',
});
await expect(TaskWorker.taskStates(knex)).resolves.toEqual(
new Map([
[
'task1',
{
status: 'idle',
startsAt: expect.anything(),
},
],
]),
);
await expect(worker.findReadyTask()).resolves.toEqual({
result: 'not-ready-yet',
@@ -101,6 +112,18 @@ describe('TaskWorker', () => {
current_run_expires_at: expect.anything(),
}),
);
await expect(TaskWorker.taskStates(knex)).resolves.toEqual(
new Map([
[
'task1',
{
status: 'running',
startedAt: expect.anything(),
timesOutAt: expect.anything(),
},
],
]),
);
await expect(worker.tryReleaseTask('ticket', settings)).resolves.toBe(
true,
@@ -115,6 +138,18 @@ describe('TaskWorker', () => {
current_run_expires_at: null,
}),
);
await expect(TaskWorker.taskStates(knex)).resolves.toEqual(
new Map([
[
'task1',
{
status: 'idle',
startsAt: expect.anything(),
lastRunEndedAt: expect.anything(),
},
],
]),
);
},
);
@@ -136,8 +171,21 @@ describe('TaskWorker', () => {
const worker = new TaskWorker('task1', fn, knex, logger, checkFrequency);
worker.start(settings, { signal: testScopedSignal() });
await waitForExpect(() => {
await waitForExpect(async () => {
expect(logger.error).toHaveBeenCalled();
await expect(TaskWorker.taskStates(knex)).resolves.toEqual(
new Map([
[
'task1',
{
status: 'idle',
startsAt: expect.anything(),
lastRunEndedAt: expect.anything(),
lastRunError: expect.stringContaining('failed'),
},
],
]),
);
});
},
);
@@ -373,7 +421,8 @@ describe('TaskWorker', () => {
// contrived check removes a test flakiness based on wall clock time.
expect(
Math.abs(
+new Date(row3.next_run_start_at) - +new Date(row2.next_run_start_at),
+new Date(row3.next_run_start_at!) -
+new Date(row2.next_run_start_at!),
),
).toBeLessThanOrEqual(60_000);
@@ -416,10 +465,10 @@ describe('TaskWorker', () => {
const row1 = (await knex<DbTasksRow>(DB_TASKS_TABLE))[0];
const rowAfterClaimAndReleaseNextStartAt = DateTime.fromJSDate(
new Date(rowAfterClaimAndRelease.next_run_start_at),
new Date(rowAfterClaimAndRelease.next_run_start_at!),
);
const row1NextStartAt = DateTime.fromJSDate(
new Date(row1.next_run_start_at),
new Date(row1.next_run_start_at!),
);
const now = DateTime.now();
expect(
@@ -478,10 +527,10 @@ describe('TaskWorker', () => {
const row1 = (await knex<DbTasksRow>(DB_TASKS_TABLE))[0];
const rowAfterClaimAndReleaseNextStartAt = DateTime.fromJSDate(
new Date(rowAfterClaimAndRelease.next_run_start_at),
new Date(rowAfterClaimAndRelease.next_run_start_at!),
);
const row1NextStartAt = DateTime.fromJSDate(
new Date(row1.next_run_start_at),
new Date(row1.next_run_start_at!),
);
const now = DateTime.now();
expect(
@@ -21,8 +21,18 @@ import { Knex } from 'knex';
import { DateTime, Duration } from 'luxon';
import { v4 as uuid } from 'uuid';
import { DB_TASKS_TABLE, DbTasksRow } from '../database/tables';
import { TaskSettingsV2, taskSettingsV2Schema } from './types';
import { delegateAbortController, nowPlus, sleep } from './util';
import {
TaskSettingsV2,
taskSettingsV2Schema,
TaskApiTasksResponse,
} from './types';
import {
delegateAbortController,
nowPlus,
sleep,
dbTime,
serializeError,
} from './util';
import { SchedulerServiceTaskFunction } from '@backstage/backend-plugin-api';
const DEFAULT_WORK_CHECK_FREQUENCY = Duration.fromObject({ seconds: 5 });
@@ -33,6 +43,10 @@ const DEFAULT_WORK_CHECK_FREQUENCY = Duration.fromObject({ seconds: 5 });
* @private
*/
export class TaskWorker {
#workerState: TaskApiTasksResponse['workerState'] = {
status: 'idle',
};
constructor(
private readonly taskId: string,
private readonly fn: SchedulerServiceTaskFunction,
@@ -61,24 +75,17 @@ export class TaskWorker {
}
}
let attemptNum = 1;
(async () => {
let attemptNum = 1;
for (;;) {
try {
if (settings.initialDelayDuration) {
await sleep(
Duration.fromISO(settings.initialDelayDuration),
options.signal,
);
}
await this.performInitialWait(settings, options.signal);
while (!options.signal.aborted) {
const runResult = await this.runOnce(options.signal);
if (runResult.result === 'abort') {
break;
}
await sleep(workCheckFrequency, options.signal);
}
@@ -96,6 +103,24 @@ export class TaskWorker {
})();
}
/**
* Does the once-at-startup initial wait, if configured.
*/
private async performInitialWait(
settings: TaskSettingsV2,
signal: AbortSignal,
): Promise<void> {
if (settings.initialDelayDuration) {
this.#workerState = {
status: 'initial-wait',
};
await sleep(Duration.fromISO(settings.initialDelayDuration), signal);
}
this.#workerState = {
status: 'idle',
};
}
static async trigger(knex: Knex, taskId: string): Promise<void> {
// check if task exists
const rows = await knex<DbTasksRow>(DB_TASKS_TABLE)
@@ -116,6 +141,51 @@ export class TaskWorker {
}
}
static async taskStates(
knex: Knex,
): Promise<Map<string, TaskApiTasksResponse['taskState']>> {
const rows = await knex<DbTasksRow>(DB_TASKS_TABLE);
return new Map(
rows.map(row => {
const startedAt = row.current_run_started_at
? dbTime(row.current_run_started_at).toISO()!
: undefined;
const timesOutAt = row.current_run_expires_at
? dbTime(row.current_run_expires_at).toISO()!
: undefined;
const startsAt = row.next_run_start_at
? dbTime(row.next_run_start_at).toISO()!
: undefined;
const lastRunEndedAt = row.last_run_ended_at
? dbTime(row.last_run_ended_at).toISO()!
: undefined;
const lastRunError = row.last_run_error_json || undefined;
return [
row.id,
startedAt
? {
status: 'running',
startedAt,
timesOutAt,
lastRunEndedAt,
lastRunError,
}
: {
status: 'idle',
startsAt,
lastRunEndedAt,
lastRunError,
},
];
}),
);
}
workerState(): TaskApiTasksResponse['workerState'] {
return this.#workerState;
}
/**
* Makes a single attempt at running the task to completion, if ready.
*
@@ -153,13 +223,19 @@ export class TaskWorker {
}, Duration.fromISO(taskSettings.timeoutAfterDuration).as('milliseconds'));
try {
this.#workerState = {
status: 'running',
};
await this.fn(taskAbortController.signal);
taskAbortController.abort(); // releases resources
} catch (e) {
this.logger.error(e);
await this.tryReleaseTask(ticket, taskSettings);
await this.tryReleaseTask(ticket, taskSettings, e);
return { result: 'failed' };
} finally {
this.#workerState = {
status: 'idle',
};
clearTimeout(timeoutHandle);
}
@@ -321,6 +397,7 @@ export class TaskWorker {
async tryReleaseTask(
ticket: string,
settings: TaskSettingsV2,
error?: Error,
): Promise<boolean> {
const isManual = settings?.cadence === 'manual';
const isDuration = settings?.cadence.startsWith('P');
@@ -366,6 +443,10 @@ export class TaskWorker {
current_run_ticket: this.knex.raw('null'),
current_run_started_at: this.knex.raw('null'),
current_run_expires_at: this.knex.raw('null'),
last_run_ended_at: this.knex.fn.now(),
last_run_error_json: error
? serializeError(error)
: this.knex.raw('null'),
});
return rows === 1;
@@ -14,6 +14,7 @@
* limitations under the License.
*/
import { JsonObject } from '@backstage/types';
import { CronTime } from 'cron';
import { Duration } from 'luxon';
import { z } from 'zod';
@@ -97,3 +98,39 @@ export const taskSettingsV2Schema = z.object({
* The properties that control a scheduled task (version 2).
*/
export type TaskSettingsV2 = z.infer<typeof taskSettingsV2Schema>;
/**
* The shape of a task definition as returned by the service's REST API.
*/
export interface TaskApiTasksResponse {
taskId: string;
pluginId: string;
scope: 'global' | 'local';
settings: { version: number } & JsonObject;
taskState:
| {
status: 'running';
startedAt: string;
timesOutAt?: string;
lastRunError?: string;
lastRunEndedAt?: string;
}
| {
status: 'idle';
startsAt?: string;
lastRunError?: string;
lastRunEndedAt?: string;
}
| null;
workerState:
| {
status: 'initial-wait';
}
| {
status: 'idle';
}
| {
status: 'running';
}
| null;
}
@@ -14,7 +14,10 @@
* limitations under the License.
*/
import { InputError } from '@backstage/errors';
import {
InputError,
serializeError as internalSerializeError,
} from '@backstage/errors';
import { Knex } from 'knex';
import { DateTime, Duration } from 'luxon';
@@ -111,3 +114,11 @@ export function delegateAbortController(parent?: AbortSignal): AbortController {
return delegate;
}
export function serializeError(error: Error): string {
return JSON.stringify(
internalSerializeError(error, {
includeStack: process.env.NODE_ENV === 'development',
}),
);
}
@@ -15,12 +15,17 @@
*/
import { coreServices } from '@backstage/backend-plugin-api';
import { ServiceFactoryTester } from '@backstage/backend-test-utils';
import {
mockServices,
ServiceFactoryTester,
} from '@backstage/backend-test-utils';
import { schedulerServiceFactory } from './schedulerServiceFactory';
describe('schedulerFactory', () => {
it('creates sidecar database features', async () => {
const tester = ServiceFactoryTester.from(schedulerServiceFactory);
const tester = ServiceFactoryTester.from(schedulerServiceFactory, {
dependencies: [mockServices.rootHttpRouter.factory()],
});
const scheduler = await tester.getSubject();
await scheduler.scheduleTask({
@@ -35,8 +35,22 @@ export const schedulerServiceFactory = createServiceFactory({
database: coreServices.database,
logger: coreServices.logger,
rootLifecycle: coreServices.rootLifecycle,
httpRouter: coreServices.httpRouter,
pluginMetadata: coreServices.pluginMetadata,
},
async factory({ database, logger, rootLifecycle }) {
return DefaultSchedulerService.create({ database, logger, rootLifecycle });
async factory({
database,
logger,
rootLifecycle,
httpRouter,
pluginMetadata,
}) {
return DefaultSchedulerService.create({
database,
logger,
rootLifecycle,
httpRouter,
pluginMetadata,
});
},
});
@@ -82,13 +82,12 @@ export interface SchedulerServiceTaskScheduleDefinition {
* Overview:
*
* ```
* second (optional)
* minute
* hour
* day of month
* month
* day of week
*
* second (0-60, optional)
* minute (0-59)
* hour (0-23)
* day of month (1-31)
* month (1-12)
* day of week (0-6, 0 is Sunday)
*
* * * * * * *
* ```
@@ -180,13 +179,12 @@ export interface SchedulerServiceTaskScheduleDefinitionConfig {
* Overview:
*
* ```
* second (optional)
* minute
* hour
* day of month
* month
* day of week
*
* second (0-60, optional)
* minute (0-59)
* hour (0-23)
* day of month (1-31)
* month (1-12)
* day of week (0-6, 0 is Sunday)
*
* * * * * * *
* ```
@@ -232,11 +232,7 @@ export class InternalOpenApiDocumentationProvider implements EntityProvider {
taskId,
taskInstanceId: uuid.v4(),
});
try {
await this.refresh(logger);
} catch (error) {
logger.error(`${this.getProviderName()} refresh failed`, error);
}
await this.refresh(logger);
},
});
};
@@ -72,6 +72,5 @@ describe('catalogModuleIncrementalIngestionEntityProvider', () => {
expect(addEntityProvider.mock.calls[0][0].getProviderName()).toBe(
'provider1',
);
expect(httpRouterMock.use).toHaveBeenCalledTimes(1);
});
});