refactor: delete the backend tasks package

Signed-off-by: Camila Belo <camilaibs@gmail.com>
This commit is contained in:
Camila Belo
2024-08-28 12:51:48 +02:00
parent 4b5aa20dff
commit bf370c2347
38 changed files with 6 additions and 5080 deletions
-1
View File
@@ -15,7 +15,6 @@
"example-backend-legacy": "0.2.101",
"@backstage/backend-openapi-utils": "0.1.16",
"@backstage/backend-plugin-api": "0.8.0",
"@backstage/backend-tasks": "0.6.0",
"@backstage/backend-test-utils": "0.5.0",
"@backstage/catalog-client": "1.6.6",
"@backstage/catalog-model": "1.6.0",
+5
View File
@@ -0,0 +1,5 @@
---
'@backstage/create-app': patch
---
Remove references to the `@backstage/backend-tasks` in versions of the `create-app` package, as it has been deprecated.
@@ -159,7 +159,7 @@ The same applies for modules that perform their own migrations and interact with
the database. They will run on the same logical database instance as the target
plugin, so care must be taken to choose table names that do not risk colliding
with those of the plugin. A recommended naming pattern is `<package
name>__<table name>`, for example the `@backstage/backend-tasks` package creates
name>__<table name>`, for example the `Scheduler Service` package creates
tables named `backstage_backend_tasks__<table>`. If you use the default [`Knex` migration facilities](https://knexjs.org/guide/migrations.html), you will also
want to make sure that it uses similarly prefixed migration state tables for its
internal bookkeeping needs, so they do not collide with the main ones used by
-1
View File
@@ -1 +0,0 @@
module.exports = require('@backstage/cli/config/eslint-factory')(__dirname);
File diff suppressed because it is too large Load Diff
-36
View File
@@ -1,36 +0,0 @@
# @backstage/backend-tasks
> [!CAUTION]
> This package is deprecated and will be removed in a near future.
Common distributed task management for Backstage backends.
## Usage
> [!CAUTION]
> Please note that the documentation below is only valid for versions equal to or below `0.5.28-next.3`.
> As this package will be deleted soon, we recommend that you migrate to the new backend system, and depend on `coreServices.scheduler` from `@backstage/backend-plugin-api` instead, or use `DefaultSchedulerService` from `@backstage/backend-defaults`. Here are the [backend](https://backstage.io/docs/backend-system/building-backends/migrating) and [plugin](https://backstage.io/docs/backend-system/building-plugins-and-modules/migrating) migration guides.
Add the library to your backend package:
```bash
# From your Backstage root directory
yarn --cwd packages/backend add @backstage/backend-tasks
```
then make use of its facilities as necessary:
```typescript
import { TaskScheduler } from '@backstage/backend-tasks';
const scheduler = TaskScheduler.fromConfig(rootConfig).forPlugin('my-plugin');
await scheduler.scheduleTask({
id: 'refresh_things',
frequency: { cron: '*/5 * * * *' }, // every 5 minutes, also supports Duration
timeout: { minutes: 15 },
fn: async () => {
await entityProvider.run();
},
});
```
-114
View File
@@ -1,114 +0,0 @@
## API Report File for "@backstage/backend-tasks"
> Do not edit this file. It is a report generated by [API Extractor](https://api-extractor.com/).
```ts
import { Config } from '@backstage/config';
import { Duration } from 'luxon';
import { HumanDuration as HumanDuration_2 } from '@backstage/types';
import { JsonObject } from '@backstage/types';
import { LegacyRootDatabaseService } from '@backstage/backend-common';
import { LoggerService } from '@backstage/backend-plugin-api';
import { PluginDatabaseManager } from '@backstage/backend-common';
// @public @deprecated
export type HumanDuration = HumanDuration_2;
// @public @deprecated
export interface PluginTaskScheduler {
createScheduledTaskRunner(schedule: TaskScheduleDefinition): TaskRunner;
getScheduledTasks(): Promise<TaskDescriptor[]>;
scheduleTask(
task: TaskScheduleDefinition & TaskInvocationDefinition,
): Promise<void>;
triggerTask(id: string): Promise<void>;
}
// @public @deprecated
export function readTaskScheduleDefinitionFromConfig(
config: Config,
): TaskScheduleDefinition;
// @public @deprecated
export type TaskDescriptor = {
id: string;
scope: 'global' | 'local';
settings: {
version: number;
} & JsonObject;
};
// @public @deprecated
export type TaskFunction =
| ((abortSignal: AbortSignal) => void | Promise<void>)
| (() => void | Promise<void>);
// @public @deprecated
export interface TaskInvocationDefinition {
fn: TaskFunction;
id: string;
signal?: AbortSignal;
}
// @public @deprecated
export interface TaskRunner {
run(task: TaskInvocationDefinition): Promise<void>;
}
// @public @deprecated
export interface TaskScheduleDefinition {
frequency:
| {
cron: string;
}
| Duration
| HumanDuration_2
/**
* This task will only run when manually triggered with the `triggerTask` method; no automatic
* scheduling. This is useful for locking of global tasks that should not be run concurrently.
*/
| {
trigger: 'manual';
};
initialDelay?: Duration | HumanDuration_2;
scope?: 'global' | 'local';
timeout: Duration | HumanDuration_2;
}
// @public @deprecated
export interface TaskScheduleDefinitionConfig {
frequency:
| {
cron: string;
}
| string
| HumanDuration_2;
initialDelay?: string | HumanDuration_2;
scope?: 'global' | 'local';
timeout: string | HumanDuration_2;
}
// @public @deprecated
export class TaskScheduler {
constructor(
databaseManager: LegacyRootDatabaseService,
logger: LoggerService,
);
// @deprecated
forPlugin(pluginId: string): PluginTaskScheduler;
// @deprecated (undocumented)
static forPlugin(opts: {
pluginId: string;
databaseManager: PluginDatabaseManager;
logger: LoggerService;
}): PluginTaskScheduler;
// @deprecated (undocumented)
static fromConfig(
config: Config,
options?: {
databaseManager?: LegacyRootDatabaseService;
logger?: LoggerService;
},
): TaskScheduler;
}
```
-10
View File
@@ -1,10 +0,0 @@
apiVersion: backstage.io/v1alpha1
kind: Component
metadata:
name: backstage-backend-tasks
title: '@backstage/backend-tasks'
description: Common distributed task management library for Backstage backends
spec:
lifecycle: experimental
type: backstage-node-library
owner: maintainers
-2
View File
@@ -1,2 +0,0 @@
# Knip report
@@ -1,64 +0,0 @@
/*
* Copyright 2020 The Backstage Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// @ts-check
/**
* @param {import('knex').Knex} knex
*/
exports.up = async function up(knex) {
//
// tasks
//
await knex.schema.createTable('backstage_backend_tasks__tasks', table => {
table.comment('Tasks used for scheduling work on multiple workers');
table
.string('id')
.primary()
.notNullable()
.comment('The unique ID of this particular task');
table
.text('settings_json')
.notNullable()
.comment('JSON serialized object with properties for this task');
table
.dateTime('next_run_start_at')
.notNullable()
.comment('The next time that the task should be started');
table
.text('current_run_ticket')
.nullable()
.comment('A unique ticket for the current task run');
table
.dateTime('current_run_started_at')
.nullable()
.comment('The time that the current task run started');
table
.dateTime('current_run_expires_at')
.nullable()
.comment('The time that the current task run will time out');
});
};
/**
* @param {import('knex').Knex} knex
*/
exports.down = async function down(knex) {
//
// tasks
//
await knex.schema.dropTable('backstage_backend_tasks__tasks');
};
@@ -1,41 +0,0 @@
/*
* Copyright 2024 The Backstage Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// @ts-check
/**
* @param { import("knex").Knex } knex
* @returns { Promise<void> }
*/
exports.up = async function up(knex) {
await knex.schema.alterTable('backstage_backend_tasks__tasks', table => {
table.setNullable('next_run_start_at');
});
};
/**
* @param { import("knex").Knex } knex
* @returns { Promise<void> }
*/
exports.down = async function down(knex) {
await knex
.delete()
.from('backstage_backend_tasks__tasks')
.where({ next_run_start_at: null });
await knex.schema.alterTable('backstage_backend_tasks__tasks', table => {
table.dropNullable('next_run_start_at');
});
};
-58
View File
@@ -1,58 +0,0 @@
{
"name": "@backstage/backend-tasks",
"version": "0.6.2-next.0",
"description": "Common distributed task management library for Backstage backends",
"backstage": {
"role": "node-library"
},
"publishConfig": {
"access": "public",
"main": "dist/index.cjs.js",
"types": "dist/index.d.ts"
},
"keywords": [
"backstage"
],
"homepage": "https://backstage.io",
"repository": {
"type": "git",
"url": "https://github.com/backstage/backstage",
"directory": "packages/backend-tasks"
},
"license": "Apache-2.0",
"main": "src/index.ts",
"types": "src/index.ts",
"files": [
"dist",
"migrations/**/*.{js,d.ts}"
],
"scripts": {
"build": "backstage-cli package build",
"clean": "backstage-cli package clean",
"lint": "backstage-cli package lint",
"prepack": "backstage-cli package prepack",
"postpack": "backstage-cli package postpack",
"start": "backstage-cli package start",
"test": "backstage-cli package test"
},
"dependencies": {
"@backstage/backend-common": "workspace:^",
"@backstage/backend-plugin-api": "workspace:^",
"@backstage/config": "workspace:^",
"@backstage/errors": "workspace:^",
"@backstage/types": "workspace:^",
"@opentelemetry/api": "^1.3.0",
"@types/luxon": "^3.0.0",
"cron": "^3.0.0",
"knex": "^3.0.0",
"lodash": "^4.17.21",
"luxon": "^3.0.0",
"uuid": "^9.0.0",
"zod": "^3.22.4"
},
"devDependencies": {
"@backstage/backend-test-utils": "workspace:^",
"@backstage/cli": "workspace:^",
"wait-for-expect": "^3.0.2"
}
}
@@ -1,31 +0,0 @@
/*
* Copyright 2021 The Backstage Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { resolvePackagePath } from '@backstage/backend-plugin-api';
import { Knex } from 'knex';
import { DB_MIGRATIONS_TABLE } from './tables';
export async function migrateBackendTasks(knex: Knex): Promise<void> {
const migrationsDir = resolvePackagePath(
'@backstage/backend-tasks',
'migrations',
);
await knex.migrate.latest({
directory: migrationsDir,
tableName: DB_MIGRATIONS_TABLE,
});
}
@@ -1,27 +0,0 @@
/*
* Copyright 2021 The Backstage Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
export const DB_MIGRATIONS_TABLE = 'backstage_backend_tasks__knex_migrations';
export const DB_TASKS_TABLE = 'backstage_backend_tasks__tasks';
export type DbTasksRow = {
id: string;
settings_json: string;
next_run_start_at: Date;
current_run_ticket?: string;
current_run_started_at?: Date | string;
current_run_expires_at?: Date | string;
};
-27
View File
@@ -1,27 +0,0 @@
/*
* Copyright 2022 The Backstage Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { HumanDuration as TypesHumanDuration } from '@backstage/types';
/**
* Human friendly durations object.
*
* @public
* @deprecated Import from `@backstage/types` instead
*/
export type HumanDuration = TypesHumanDuration;
export * from './tasks';
-27
View File
@@ -1,27 +0,0 @@
/*
* Copyright 2020 The Backstage Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Common distributed task management library for Backstage backends
*
* @remarks
* This package is deprecated and will be removed in a near future.
* Please migrate to the new backend system, and depend on `coreServices.scheduler` from `@backstage/backend-plugin-api` instead, or use `DefaultSchedulerService` from `@backstage/backend-defaults`.
*
* @packageDocumentation
*/
export * from './deprecated';
@@ -1,123 +0,0 @@
/*
* Copyright 2022 The Backstage Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { Knex } from 'knex';
import { TestDatabases } from '@backstage/backend-test-utils';
import fs from 'fs';
const migrationsDir = `${__dirname}/../migrations`;
const migrationsFiles = fs.readdirSync(migrationsDir).sort();
async function migrateUpOnce(knex: Knex): Promise<void> {
await knex.migrate.up({ directory: migrationsDir });
}
async function migrateDownOnce(knex: Knex): Promise<void> {
await knex.migrate.down({ directory: migrationsDir });
}
async function migrateUntilBefore(knex: Knex, target: string): Promise<void> {
const index = migrationsFiles.indexOf(target);
if (index === -1) {
throw new Error(`Migration ${target} not found`);
}
for (let i = 0; i < index; i++) {
await migrateUpOnce(knex);
}
}
jest.setTimeout(60_000);
describe('migrations', () => {
const databases = TestDatabases.create();
it.each(databases.eachSupportedId())(
'20210928160613_init.js, %p',
async databaseId => {
const knex = await databases.init(databaseId);
await migrateUntilBefore(knex, '20210928160613_init.js');
await migrateUpOnce(knex);
await knex('backstage_backend_tasks__tasks').insert({
id: 'test',
settings_json: '{}',
next_run_start_at: knex.fn.now(),
});
await expect(knex('backstage_backend_tasks__tasks')).resolves.toEqual([
{
id: 'test',
settings_json: '{}',
next_run_start_at: expect.anything(),
current_run_ticket: null,
current_run_started_at: null,
current_run_expires_at: null,
},
]);
await migrateDownOnce(knex);
// This looks odd - you might expect a .toThrow at the end but that
// actually is flaky for some reason specifically on sqlite when
// performing multiple runs in sequence
await expect(knex('backstage_backend_tasks__tasks')).rejects.toEqual(
expect.anything(),
);
await knex.destroy();
},
);
it.each(databases.eachSupportedId())(
'20240712211735_nullable_next_run.js, %p',
async databaseId => {
const knex = await databases.init(databaseId);
await migrateUntilBefore(knex, '20240712211735_nullable_next_run.js');
await migrateUpOnce(knex);
await knex('backstage_backend_tasks__tasks').insert({
id: 'test',
settings_json: '{}',
next_run_start_at: knex.raw('null'),
});
await expect(knex('backstage_backend_tasks__tasks')).resolves.toEqual([
{
id: 'test',
settings_json: '{}',
next_run_start_at: null,
current_run_ticket: null,
current_run_started_at: null,
current_run_expires_at: null,
},
]);
await migrateDownOnce(knex);
await expect(
knex('backstage_backend_tasks__tasks').insert({
id: 'test',
settings_json: '{}',
next_run_start_at: knex.raw('null'),
}),
).rejects.toEqual(expect.anything());
await knex.destroy();
},
);
});
-25
View File
@@ -1,25 +0,0 @@
/*
* Copyright 2020 The Backstage Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { TestDatabases } from '@backstage/backend-test-utils';
import { Settings } from 'luxon';
// TS still thinks that methods can return null / placeholders, but we still want to throw as soon as possible when things go wrong
Settings.throwOnInvalid = true;
TestDatabases.setDefaults({
ids: ['MYSQL_8', 'POSTGRES_16', 'POSTGRES_12', 'SQLITE_3'],
});
@@ -1,109 +0,0 @@
/*
* Copyright 2022 The Backstage Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { LocalTaskWorker } from './LocalTaskWorker';
import { mockServices } from '@backstage/backend-test-utils';
describe('LocalTaskWorker', () => {
const logger = mockServices.logger.mock();
it('runs the happy path (with iso duration) and handles cancellation', async () => {
const fn = jest.fn();
const controller = new AbortController();
const worker = new LocalTaskWorker('a', fn, logger);
worker.start(
{
version: 2,
initialDelayDuration: 'PT0.2S',
cadence: 'PT0.2S',
timeoutAfterDuration: 'PT1S',
},
{ signal: controller.signal },
);
// TODO(freben): Rewrite to fake timers - tried, but it wouldn't work
expect(fn).toHaveBeenCalledTimes(0);
await new Promise(r => setTimeout(r, 100));
expect(fn).toHaveBeenCalledTimes(0);
await new Promise(r => setTimeout(r, 200));
expect(fn).toHaveBeenCalledTimes(1);
await new Promise(r => setTimeout(r, 200));
expect(fn).toHaveBeenCalledTimes(2);
controller.abort();
await new Promise(r => setTimeout(r, 200));
expect(fn).toHaveBeenCalledTimes(2);
});
it('runs the happy path (with a cron expression) and handles cancellation', async () => {
const fn = jest.fn();
const controller = new AbortController();
// Await until system time is just past a second boundary (since cron is
// wall clock based)
await new Promise(r => setTimeout(r, 1000 - (Date.now() % 1000) + 10));
const worker = new LocalTaskWorker('a', fn, logger);
worker.start(
{
version: 2,
initialDelayDuration: 'PT0.2S',
cadence: '* * * * * *',
timeoutAfterDuration: 'PT1S',
},
{ signal: controller.signal },
);
// TODO(freben): Rewrite to fake timers - tried, but it wouldn't work
expect(fn).toHaveBeenCalledTimes(0);
await new Promise(r => setTimeout(r, 100));
expect(fn).toHaveBeenCalledTimes(0);
await new Promise(r => setTimeout(r, 200));
expect(fn).toHaveBeenCalledTimes(1);
await new Promise(r => setTimeout(r, 1000));
expect(fn).toHaveBeenCalledTimes(2);
controller.abort();
await new Promise(r => setTimeout(r, 1000));
expect(fn).toHaveBeenCalledTimes(2);
});
it('can trigger to abort wait', async () => {
const fn = jest.fn();
const controller = new AbortController();
const worker = new LocalTaskWorker('a', fn, logger);
worker.start(
{
version: 2,
initialDelayDuration: 'PT0.2S',
cadence: 'PT0.2S',
timeoutAfterDuration: 'PT1S',
},
{ signal: controller.signal },
);
// TODO(freben): Rewrite to fake timers - tried, but it wouldn't work
expect(fn).toHaveBeenCalledTimes(0);
await new Promise(r => setTimeout(r, 100));
expect(fn).toHaveBeenCalledTimes(0);
await new Promise(r => setTimeout(r, 200));
expect(fn).toHaveBeenCalledTimes(1);
worker.trigger();
await new Promise(r => setTimeout(r, 10));
expect(fn).toHaveBeenCalledTimes(2);
controller.abort();
});
});
@@ -1,154 +0,0 @@
/*
* Copyright 2022 The Backstage Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { ConflictError } from '@backstage/errors';
import { CronTime } from 'cron';
import { DateTime, Duration } from 'luxon';
import { TaskFunction, TaskSettingsV2 } from './types';
import { delegateAbortController, sleep } from './util';
import { LoggerService } from '@backstage/backend-plugin-api';
/**
* Implements tasks that run locally without cross-host collaboration.
*
* @private
*/
export class LocalTaskWorker {
private abortWait: AbortController | undefined;
constructor(
private readonly taskId: string,
private readonly fn: TaskFunction,
private readonly logger: LoggerService,
) {}
start(settings: TaskSettingsV2, options?: { signal?: AbortSignal }) {
this.logger.info(
`Task worker starting: ${this.taskId}, ${JSON.stringify(settings)}`,
);
(async () => {
let attemptNum = 1;
for (;;) {
try {
if (settings.initialDelayDuration) {
await this.sleep(
Duration.fromISO(settings.initialDelayDuration),
options?.signal,
);
}
while (!options?.signal?.aborted) {
const startTime = process.hrtime();
await this.runOnce(settings, options?.signal);
const timeTaken = process.hrtime(startTime);
await this.waitUntilNext(
settings,
(timeTaken[0] + timeTaken[1] / 1e9) * 1000,
options?.signal,
);
}
this.logger.info(`Task worker finished: ${this.taskId}`);
attemptNum = 0;
break;
} catch (e) {
attemptNum += 1;
this.logger.warn(
`Task worker failed unexpectedly, attempt number ${attemptNum}, ${e}`,
);
await sleep(Duration.fromObject({ seconds: 1 }));
}
}
})();
}
trigger(): void {
if (!this.abortWait) {
throw new ConflictError(`Task ${this.taskId} is currently running`);
}
this.abortWait.abort();
}
/**
* Makes a single attempt at running the task to completion.
*/
private async runOnce(
settings: TaskSettingsV2,
signal?: AbortSignal,
): Promise<void> {
// Abort the task execution either if the worker is stopped, or if the
// task timeout is hit
const taskAbortController = delegateAbortController(signal);
const timeoutHandle = setTimeout(() => {
taskAbortController.abort();
}, Duration.fromISO(settings.timeoutAfterDuration).as('milliseconds'));
try {
await this.fn(taskAbortController.signal);
} catch (e) {
// ignore intentionally
}
// release resources
clearTimeout(timeoutHandle);
taskAbortController.abort();
}
/**
* Sleeps until it's time to run the task again.
*/
private async waitUntilNext(
settings: TaskSettingsV2,
lastRunMillis: number,
signal?: AbortSignal,
) {
if (signal?.aborted) {
return;
}
const isCron = !settings.cadence.startsWith('P');
let dt: number;
if (isCron) {
const nextRun = +new CronTime(settings.cadence).sendAt().toJSDate();
dt = nextRun - Date.now();
} else {
dt =
Duration.fromISO(settings.cadence).as('milliseconds') - lastRunMillis;
}
dt = Math.max(dt, 0);
this.logger.debug(
`task: ${this.taskId} will next occur around ${DateTime.now().plus(
Duration.fromMillis(dt),
)}`,
);
await this.sleep(Duration.fromMillis(dt), signal);
}
private async sleep(
duration: Duration,
abortSignal?: AbortSignal,
): Promise<void> {
this.abortWait = delegateAbortController(abortSignal);
await sleep(duration, this.abortWait.signal);
this.abortWait.abort(); // cleans up resources
this.abortWait = undefined;
}
}
@@ -1,351 +0,0 @@
/*
* Copyright 2021 The Backstage Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import {
TestDatabaseId,
TestDatabases,
mockServices,
} from '@backstage/backend-test-utils';
import { ConflictError, NotFoundError } from '@backstage/errors';
import { Duration } from 'luxon';
import { migrateBackendTasks } from '../database/migrateBackendTasks';
import {
parseDuration,
PluginTaskSchedulerImpl,
} from './PluginTaskSchedulerImpl';
function defer() {
let resolve = () => {};
const promise = new Promise<void>(_resolve => {
resolve = _resolve;
});
return { promise, resolve };
}
jest.setTimeout(60_000);
describe('PluginTaskManagerImpl', () => {
const databases = TestDatabases.create({
ids: ['POSTGRES_16', 'POSTGRES_12', 'SQLITE_3'],
});
beforeAll(async () => {
// Make sure all databases are running before mocking timers, in case of testcontainers
await Promise.all(
databases.eachSupportedId().map(([id]) => databases.init(id)),
);
jest.useFakeTimers();
}, 60_000);
async function init(databaseId: TestDatabaseId) {
const knex = await databases.init(databaseId);
await migrateBackendTasks(knex);
const logger = mockServices.logger.mock();
const manager = new PluginTaskSchedulerImpl(async () => knex, logger);
return { knex, manager };
}
// This is just to test the wrapper code; most of the actual tests are in
// TaskWorker.test.ts
describe('scheduleTask with global scope', () => {
it.each(databases.eachSupportedId())(
'can run the v1 happy path, %p',
async databaseId => {
const { manager } = await init(databaseId);
const fn = jest.fn();
const promise = new Promise(resolve => fn.mockImplementation(resolve));
await manager.scheduleTask({
id: 'task1',
timeout: Duration.fromMillis(5000),
frequency: Duration.fromMillis(5000),
fn,
scope: 'global',
});
await promise;
expect(fn).toHaveBeenCalledWith(expect.any(AbortSignal));
},
);
it.each(databases.eachSupportedId())(
'can run the v2 happy path, %p',
async databaseId => {
const { manager } = await init(databaseId);
const fn = jest.fn();
const promise = new Promise(resolve => fn.mockImplementation(resolve));
await manager.scheduleTask({
id: 'task2',
timeout: Duration.fromMillis(5000),
frequency: { cron: '* * * * * *' },
fn,
scope: 'global',
});
await promise;
expect(fn).toHaveBeenCalledWith(expect.any(AbortSignal));
},
);
});
describe('triggerTask with global scope', () => {
it.each(databases.eachSupportedId())(
'can manually trigger a task, %p',
async databaseId => {
const { manager } = await init(databaseId);
const fn = jest.fn();
const promise = new Promise(resolve => fn.mockImplementation(resolve));
await manager.scheduleTask({
id: 'task1',
timeout: Duration.fromMillis(5000),
frequency: Duration.fromObject({ years: 1 }),
initialDelay: Duration.fromObject({ years: 1 }),
fn,
scope: 'global',
});
await manager.triggerTask('task1');
jest.advanceTimersByTime(5000);
await promise;
expect(fn).toHaveBeenCalledWith(expect.any(AbortSignal));
},
);
it.each(databases.eachSupportedId())(
'cant trigger a non-existent task, %p',
async databaseId => {
const { manager } = await init(databaseId);
const fn = jest.fn();
await manager.scheduleTask({
id: 'task1',
timeout: Duration.fromMillis(5000),
frequency: Duration.fromObject({ years: 1 }),
fn,
scope: 'global',
});
await expect(() => manager.triggerTask('task2')).rejects.toThrow(
NotFoundError,
);
},
);
it.each(databases.eachSupportedId())(
'cant trigger a running task, %p',
async databaseId => {
const { manager } = await init(databaseId);
const { promise, resolve } = defer();
await manager.scheduleTask({
id: 'task1',
timeout: Duration.fromMillis(5000),
frequency: Duration.fromObject({ years: 1 }),
fn: async () => {
resolve();
await new Promise(r => setTimeout(r, 20000));
},
scope: 'global',
});
await promise;
await expect(() => manager.triggerTask('task1')).rejects.toThrow(
ConflictError,
);
},
);
});
// This is just to test the wrapper code; most of the actual tests are in
// TaskWorker.test.ts
describe('scheduleTask with local scope', () => {
it('can run the v1 happy path', async () => {
const { manager } = await init('SQLITE_3');
const fn = jest.fn();
const promise = new Promise(resolve => fn.mockImplementation(resolve));
await manager.scheduleTask({
id: 'task1',
timeout: { milliseconds: 5000 },
frequency: { milliseconds: 5000 },
fn,
scope: 'local',
});
await promise;
expect(fn).toHaveBeenCalledWith(expect.any(AbortSignal));
}, 60_000);
it('can run the v2 happy path', async () => {
const { manager } = await init('SQLITE_3');
const fn = jest.fn();
const promise = new Promise(resolve => fn.mockImplementation(resolve));
await manager.scheduleTask({
id: 'task2',
timeout: Duration.fromMillis(5000),
frequency: { cron: '* * * * * *' },
fn,
scope: 'local',
});
await promise;
expect(fn).toHaveBeenCalledWith(expect.any(AbortSignal));
}, 60_000);
});
describe('triggerTask with local scope', () => {
it('can manually trigger a task', async () => {
const { manager } = await init('SQLITE_3');
const fn = jest.fn();
const promise = new Promise(resolve => fn.mockImplementation(resolve));
await manager.scheduleTask({
id: 'task1',
timeout: Duration.fromMillis(5000),
frequency: Duration.fromObject({ years: 1 }),
initialDelay: Duration.fromObject({ years: 1 }),
fn,
scope: 'local',
});
await manager.triggerTask('task1');
jest.advanceTimersByTime(5000);
await promise;
expect(fn).toHaveBeenCalledWith(expect.any(AbortSignal));
}, 60_000);
it('cant trigger a non-existent task', async () => {
const { manager } = await init('SQLITE_3');
const fn = jest.fn();
await manager.scheduleTask({
id: 'task1',
timeout: Duration.fromMillis(5000),
frequency: Duration.fromObject({ years: 1 }),
fn,
scope: 'local',
});
await expect(() => manager.triggerTask('task2')).rejects.toThrow(
NotFoundError,
);
}, 60_000);
it('cant trigger a running task', async () => {
const { manager } = await init('SQLITE_3');
const { promise, resolve } = defer();
await manager.scheduleTask({
id: 'task1',
timeout: Duration.fromMillis(5000),
frequency: Duration.fromObject({ years: 1 }),
fn: async () => {
resolve();
await new Promise(r => setTimeout(r, 20000));
},
scope: 'local',
});
await promise;
await expect(() => manager.triggerTask('task1')).rejects.toThrow(
ConflictError,
);
}, 60_000);
});
// This is just to test the wrapper code; most of the actual tests are in
// TaskWorker.test.ts
describe('createScheduledTaskRunner', () => {
it.each(databases.eachSupportedId())(
'can run the happy path, %p',
async databaseId => {
const { manager } = await init(databaseId);
const fn = jest.fn();
const promise = new Promise(resolve => fn.mockImplementation(resolve));
await manager
.createScheduledTaskRunner({
timeout: Duration.fromMillis(5000),
frequency: Duration.fromMillis(5000),
scope: 'global',
})
.run({
id: 'task1',
fn,
});
await promise;
expect(fn).toHaveBeenCalledWith(expect.any(AbortSignal));
},
);
});
describe('can fetch task ids', () => {
it.each(databases.eachSupportedId())(
'can fetch both global and local task ids, %p',
async databaseId => {
const { manager } = await init(databaseId);
const fn = jest.fn();
await manager.scheduleTask({
id: 'task1',
timeout: Duration.fromMillis(5000),
frequency: Duration.fromMillis(5000),
fn,
scope: 'global',
});
await manager.scheduleTask({
id: 'task2',
timeout: Duration.fromMillis(5000),
frequency: Duration.fromMillis(5000),
fn,
scope: 'local',
});
await expect(manager.getScheduledTasks()).resolves.toEqual([
{
id: 'task1',
scope: 'global',
settings: expect.objectContaining({ cadence: 'PT5S' }),
},
{
id: 'task2',
scope: 'local',
settings: expect.objectContaining({ cadence: 'PT5S' }),
},
]);
},
);
});
describe('parseDuration', () => {
it('should parse durations', () => {
expect(parseDuration({ milliseconds: 5000 })).toEqual('PT5S');
expect(parseDuration(Duration.fromMillis(5000))).toEqual('PT5S');
expect(parseDuration({ cron: '1 * * * *' })).toEqual('1 * * * *');
expect(parseDuration({ trigger: 'manual' })).toEqual('manual');
});
});
});
@@ -1,170 +0,0 @@
/*
* Copyright 2021 The Backstage Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { Knex } from 'knex';
import { Duration } from 'luxon';
import { LocalTaskWorker } from './LocalTaskWorker';
import { TaskWorker } from './TaskWorker';
import {
PluginTaskScheduler,
TaskDescriptor,
TaskFunction,
TaskInvocationDefinition,
TaskRunner,
TaskScheduleDefinition,
TaskSettingsV2,
} from './types';
import { validateId } from './util';
import { Counter, Histogram, metrics } from '@opentelemetry/api';
import { LoggerService } from '@backstage/backend-plugin-api';
/**
* Implements the actual task management.
*/
export class PluginTaskSchedulerImpl implements PluginTaskScheduler {
private readonly localTasksById = new Map<string, LocalTaskWorker>();
private readonly allScheduledTasks: TaskDescriptor[] = [];
private readonly counter: Counter;
private readonly duration: Histogram;
constructor(
private readonly databaseFactory: () => Promise<Knex>,
private readonly logger: LoggerService,
) {
const meter = metrics.getMeter('default');
this.counter = meter.createCounter('backend_tasks.task.runs.count', {
description: 'Total number of times a task has been run',
});
this.duration = meter.createHistogram('backend_tasks.task.runs.duration', {
description: 'Histogram of task run durations',
unit: 'seconds',
});
}
async triggerTask(id: string): Promise<void> {
const localTask = this.localTasksById.get(id);
if (localTask) {
localTask.trigger();
return;
}
const knex = await this.databaseFactory();
await TaskWorker.trigger(knex, id);
}
async scheduleTask(
task: TaskScheduleDefinition & TaskInvocationDefinition,
): Promise<void> {
validateId(task.id);
const scope = task.scope ?? 'global';
const settings: TaskSettingsV2 = {
version: 2,
cadence: parseDuration(task.frequency),
initialDelayDuration:
task.initialDelay && parseDuration(task.initialDelay),
timeoutAfterDuration: parseDuration(task.timeout),
};
if (scope === 'global') {
const knex = await this.databaseFactory();
const worker = new TaskWorker(
task.id,
this.wrapInMetrics(task.fn, { labels: { taskId: task.id, scope } }),
knex,
this.logger.child({ task: task.id }),
);
await worker.start(settings, { signal: task.signal });
} else {
const worker = new LocalTaskWorker(
task.id,
this.wrapInMetrics(task.fn, { labels: { taskId: task.id, scope } }),
this.logger.child({ task: task.id }),
);
worker.start(settings, { signal: task.signal });
this.localTasksById.set(task.id, worker);
}
this.allScheduledTasks.push({
id: task.id,
scope: scope,
settings: settings,
});
}
createScheduledTaskRunner(schedule: TaskScheduleDefinition): TaskRunner {
return {
run: async task => {
await this.scheduleTask({ ...task, ...schedule });
},
};
}
async getScheduledTasks(): Promise<TaskDescriptor[]> {
return this.allScheduledTasks;
}
private wrapInMetrics(
fn: TaskFunction,
opts: { labels: Record<string, string> },
): TaskFunction {
return async abort => {
const labels = {
...opts.labels,
};
this.counter.add(1, { ...labels, result: 'started' });
const startTime = process.hrtime();
try {
await fn(abort);
labels.result = 'completed';
} catch (ex) {
labels.result = 'failed';
throw ex;
} finally {
const delta = process.hrtime(startTime);
const endTime = delta[0] + delta[1] / 1e9;
this.counter.add(1, labels);
this.duration.record(endTime, labels);
}
};
}
}
export function parseDuration(
frequency: TaskScheduleDefinition['frequency'],
): string {
if ('cron' in frequency) {
return frequency.cron;
}
if ('trigger' in frequency) {
return frequency.trigger;
}
const parsed = Duration.isDuration(frequency)
? frequency
: Duration.fromObject(frequency);
if (!parsed.isValid) {
throw new Error(
`Invalid duration, ${parsed.invalidReason}: ${parsed.invalidExplanation}`,
);
}
return parsed.toISO()!;
}
@@ -1,95 +0,0 @@
/*
* Copyright 2021 The Backstage Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { TestDatabases, mockServices } from '@backstage/backend-test-utils';
import { Knex } from 'knex';
import { Duration } from 'luxon';
import waitForExpect from 'wait-for-expect';
import { migrateBackendTasks } from '../database/migrateBackendTasks';
import { DbTasksRow, DB_TASKS_TABLE } from '../database/tables';
import { PluginTaskSchedulerJanitor } from './PluginTaskSchedulerJanitor';
import { createTestScopedSignal } from './__testUtils__/createTestScopedSignal';
const insertTask = async (knex: Knex, task: DbTasksRow) => {
return knex<DbTasksRow>(DB_TASKS_TABLE)
.insert(task)
.onConflict('id')
.merge(['settings_json']);
};
const getTask = async (knex: Knex): Promise<DbTasksRow> => {
return (await knex<DbTasksRow>(DB_TASKS_TABLE))[0];
};
describe('PluginTaskSchedulerJanitor', () => {
const logger = mockServices.logger.mock();
const databases = TestDatabases.create({
ids: [
/* 'MYSQL_8' not supported yet */
'POSTGRES_16',
'POSTGRES_12',
'SQLITE_3',
'MYSQL_8',
],
});
const testScopedSignal = createTestScopedSignal();
jest.setTimeout(60_000);
beforeEach(() => {
jest.resetAllMocks();
});
it.each(databases.eachSupportedId())(
'Should update date if current_run_expires_at expires, %p',
async databaseId => {
const knex = await databases.init(databaseId);
await migrateBackendTasks(knex);
const dateYesterday = new Date(
new Date().setDate(new Date().getDate() - 1),
);
await insertTask(knex, {
id: 'task1',
settings_json: '',
next_run_start_at: new Date('2023-03-01 00:00:00'),
current_run_ticket: 'ticket',
current_run_started_at: dateYesterday,
current_run_expires_at: dateYesterday,
});
const worker = new PluginTaskSchedulerJanitor({
waitBetweenRuns: Duration.fromObject({ milliseconds: 20 }),
knex,
logger,
});
worker.start(testScopedSignal());
await waitForExpect(async () => {
await expect(getTask(knex)).resolves.toEqual(
expect.objectContaining({
id: 'task1',
current_run_ticket: null,
current_run_started_at: null,
current_run_expires_at: null,
}),
);
});
},
);
});
@@ -1,96 +0,0 @@
/*
* Copyright 2021 The Backstage Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { Knex } from 'knex';
import { Duration } from 'luxon';
import { DB_TASKS_TABLE, DbTasksRow } from '../database/tables';
import { sleep } from './util';
import { LoggerService } from '@backstage/backend-plugin-api';
/**
* Makes sure to auto-expire and clean up things that time out or for other
* reasons should not be left lingering.
*/
export class PluginTaskSchedulerJanitor {
private readonly knex: Knex;
private readonly waitBetweenRuns: Duration;
private readonly logger: LoggerService;
constructor(options: {
knex: Knex;
waitBetweenRuns: Duration;
logger: LoggerService;
}) {
this.knex = options.knex;
this.waitBetweenRuns = options.waitBetweenRuns;
this.logger = options.logger;
}
async start(abortSignal?: AbortSignal) {
while (!abortSignal?.aborted) {
try {
await this.runOnce();
} catch (e) {
this.logger.warn(`Error while performing janitorial tasks, ${e}`);
}
await sleep(this.waitBetweenRuns, abortSignal);
}
}
private async runOnce() {
const dbNull = this.knex.raw('null');
const configClient = this.knex.client.config.client;
let tasks: Array<{ id: string }>;
if (configClient.includes('sqlite3') || configClient.includes('mysql')) {
tasks = await this.knex<DbTasksRow>(DB_TASKS_TABLE)
.select('id')
.where('current_run_expires_at', '<', this.knex.fn.now());
await this.knex<DbTasksRow>(DB_TASKS_TABLE)
.whereIn(
'id',
tasks.map(t => t.id),
)
.update({
current_run_ticket: dbNull,
current_run_started_at: dbNull,
current_run_expires_at: dbNull,
});
} else {
tasks = await this.knex<DbTasksRow>(DB_TASKS_TABLE)
.where('current_run_expires_at', '<', this.knex.fn.now())
.update({
current_run_ticket: dbNull,
current_run_started_at: dbNull,
current_run_expires_at: dbNull,
})
.returning(['id']);
}
// In rare cases, knex drivers may ignore "returning", and return the number
// of rows changed instead
if (typeof tasks === 'number') {
if (tasks > 0) {
this.logger.warn(`${tasks} tasks timed out and were lost`);
}
} else {
for (const { id } of tasks) {
this.logger.warn(`Task timed out and was lost: ${id}`);
}
}
}
}
@@ -1,88 +0,0 @@
/*
* Copyright 2021 The Backstage Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { DatabaseManager } from '@backstage/backend-common';
import {
TestDatabaseId,
TestDatabases,
mockServices,
} from '@backstage/backend-test-utils';
import { Duration } from 'luxon';
import waitForExpect from 'wait-for-expect';
import { TaskScheduler } from './TaskScheduler';
import { createTestScopedSignal } from './__testUtils__/createTestScopedSignal';
jest.setTimeout(60_000);
describe('TaskScheduler', () => {
const logger = mockServices.logger.mock();
const databases = TestDatabases.create();
const testScopedSignal = createTestScopedSignal();
async function createDatabase(
databaseId: TestDatabaseId,
): Promise<DatabaseManager> {
const knex = await databases.init(databaseId);
const databaseManager: Partial<DatabaseManager> = {
forPlugin: () => ({
getClient: async () => knex,
}),
};
return databaseManager as DatabaseManager;
}
it.each(databases.eachSupportedId())(
'can return a working v1 plugin impl, %p',
async databaseId => {
const database = await createDatabase(databaseId);
const manager = new TaskScheduler(database, logger).forPlugin('test');
const fn = jest.fn();
await manager.scheduleTask({
id: 'task1',
timeout: Duration.fromMillis(5000),
frequency: Duration.fromMillis(5000),
signal: testScopedSignal(),
fn,
});
await waitForExpect(() => {
expect(fn).toHaveBeenCalled();
});
},
);
it.each(databases.eachSupportedId())(
'can return a working v2 plugin impl, %p',
async databaseId => {
const database = await createDatabase(databaseId);
const manager = new TaskScheduler(database, logger).forPlugin('test');
const fn = jest.fn();
await manager.scheduleTask({
id: 'task2',
timeout: Duration.fromMillis(5000),
frequency: { cron: '* * * * * *' },
signal: testScopedSignal(),
fn,
});
await waitForExpect(() => {
expect(fn).toHaveBeenCalled();
});
},
);
});
@@ -1,107 +0,0 @@
/*
* Copyright 2021 The Backstage Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import {
DatabaseManager,
getRootLogger,
LegacyRootDatabaseService,
PluginDatabaseManager,
} from '@backstage/backend-common';
import { Config } from '@backstage/config';
import { once } from 'lodash';
import { Duration } from 'luxon';
import { migrateBackendTasks } from '../database/migrateBackendTasks';
import { PluginTaskSchedulerImpl } from './PluginTaskSchedulerImpl';
import { PluginTaskSchedulerJanitor } from './PluginTaskSchedulerJanitor';
import { PluginTaskScheduler } from './types';
import { LoggerService } from '@backstage/backend-plugin-api';
/**
* Deals with the scheduling of distributed tasks.
*
* @public
* @deprecated Please migrate to the new backend system, and depend on `coreServices.scheduler` from `@backstage/backend-plugin-api` instead, or use `DefaultSchedulerService` from `@backstage/backend-defaults`
*/
export class TaskScheduler {
/**
* @deprecated
* It is only used by the legacy backend system, and should not be used in the new backend system.
*/
static fromConfig(
config: Config,
options?: {
databaseManager?: LegacyRootDatabaseService;
logger?: LoggerService;
},
): TaskScheduler {
const databaseManager =
options?.databaseManager ?? DatabaseManager.fromConfig(config);
const logger = (options?.logger || getRootLogger()).child({
type: 'taskManager',
});
return new TaskScheduler(databaseManager, logger);
}
constructor(
private readonly databaseManager: LegacyRootDatabaseService,
private readonly logger: LoggerService,
) {}
/**
* Instantiates a task manager instance for the given plugin.
*
* @param pluginId - The unique ID of the plugin, for example "catalog"
* @returns A {@link PluginTaskScheduler} instance
* @deprecated Please migrate to the new backend system, and depend on `coreServices.scheduler` from `@backstage/backend-plugin-api` instead, or use `DefaultSchedulerService` from `@backstage/backend-defaults`
*/
forPlugin(pluginId: string): PluginTaskScheduler {
return TaskScheduler.forPlugin({
pluginId,
databaseManager: this.databaseManager.forPlugin(pluginId),
logger: this.logger,
});
}
/**
* @deprecated Please migrate to the new backend system, and depend on `coreServices.scheduler` from `@backstage/backend-plugin-api` instead, or use `DefaultSchedulerService` from `@backstage/backend-defaults`
*/
static forPlugin(opts: {
pluginId: string;
databaseManager: PluginDatabaseManager;
logger: LoggerService;
}): PluginTaskScheduler {
const databaseFactory = once(async () => {
const knex = await opts.databaseManager.getClient();
if (!opts.databaseManager.migrations?.skip) {
await migrateBackendTasks(knex);
}
if (process.env.NODE_ENV !== 'test') {
const janitor = new PluginTaskSchedulerJanitor({
knex,
waitBetweenRuns: Duration.fromObject({ minutes: 1 }),
logger: opts.logger,
});
janitor.start();
}
return knex;
});
return new PluginTaskSchedulerImpl(databaseFactory, opts.logger);
}
}
@@ -1,534 +0,0 @@
/*
* Copyright 2021 The Backstage Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { TestDatabases, mockServices } from '@backstage/backend-test-utils';
import { Duration, DateTime } from 'luxon';
import waitForExpect from 'wait-for-expect';
import { migrateBackendTasks } from '../database/migrateBackendTasks';
import { DbTasksRow, DB_TASKS_TABLE } from '../database/tables';
import { TaskWorker } from './TaskWorker';
import { TaskSettingsV2 } from './types';
import { createTestScopedSignal } from './__testUtils__/createTestScopedSignal';
jest.setTimeout(60_000);
describe('TaskWorker', () => {
const logger = mockServices.logger.mock();
const databases = TestDatabases.create();
const testScopedSignal = createTestScopedSignal();
beforeEach(() => {
jest.resetAllMocks();
});
it.each(databases.eachSupportedId())(
'goes through the expected states, %p',
async databaseId => {
const knex = await databases.init(databaseId);
await migrateBackendTasks(knex);
const fn = jest.fn(
async () => new Promise<void>(resolve => setTimeout(resolve, 50)),
);
const settings: TaskSettingsV2 = {
version: 2,
cadence: '*/2 * * * * *',
initialDelayDuration: Duration.fromObject({ seconds: 1 }).toISO()!,
timeoutAfterDuration: Duration.fromObject({ minutes: 1 }).toISO()!,
};
const worker = new TaskWorker('task1', fn, knex, logger);
await worker.persistTask(settings);
let row = (await knex<DbTasksRow>(DB_TASKS_TABLE))[0];
expect(row).toEqual(
expect.objectContaining({
id: 'task1',
current_run_ticket: null,
current_run_started_at: null,
current_run_expires_at: null,
}),
);
expect(JSON.parse(row.settings_json)).toEqual({
version: 2,
cadence: '*/2 * * * * *',
initialDelayDuration: 'PT1S',
timeoutAfterDuration: 'PT1M',
});
await expect(worker.findReadyTask()).resolves.toEqual({
result: 'not-ready-yet',
});
await waitForExpect(async () => {
await expect(worker.findReadyTask()).resolves.toEqual({
result: 'ready',
settings,
});
});
row = (await knex<DbTasksRow>(DB_TASKS_TABLE))[0];
expect(row).toEqual(
expect.objectContaining({
id: 'task1',
current_run_ticket: null,
current_run_started_at: null,
current_run_expires_at: null,
}),
);
await expect(worker.tryClaimTask('ticket', settings)).resolves.toBe(true);
row = (await knex<DbTasksRow>(DB_TASKS_TABLE))[0];
expect(row).toEqual(
expect.objectContaining({
id: 'task1',
current_run_ticket: 'ticket',
current_run_started_at: expect.anything(),
current_run_expires_at: expect.anything(),
}),
);
await expect(worker.tryReleaseTask('ticket', settings)).resolves.toBe(
true,
);
row = (await knex<DbTasksRow>(DB_TASKS_TABLE))[0];
expect(row).toEqual(
expect.objectContaining({
id: 'task1',
current_run_ticket: null,
current_run_started_at: null,
current_run_expires_at: null,
}),
);
},
);
it.each(databases.eachSupportedId())(
'logs error when the task throws, %p',
async databaseId => {
const knex = await databases.init(databaseId);
await migrateBackendTasks(knex);
jest.spyOn(logger, 'error');
const fn = jest.fn().mockRejectedValue(new Error('failed'));
const settings: TaskSettingsV2 = {
version: 2,
initialDelayDuration: undefined,
cadence: '* * * * * *',
timeoutAfterDuration: Duration.fromMillis(60000).toISO()!,
};
const checkFrequency = Duration.fromObject({ milliseconds: 100 });
const worker = new TaskWorker('task1', fn, knex, logger, checkFrequency);
worker.start(settings, { signal: testScopedSignal() });
await waitForExpect(() => {
expect(logger.error).toHaveBeenCalled();
});
},
);
it.each(databases.eachSupportedId())(
'runs tasks more than once even when the task throws, %p',
async databaseId => {
const knex = await databases.init(databaseId);
await migrateBackendTasks(knex);
const fn = jest.fn().mockRejectedValue(new Error('failed'));
const settings: TaskSettingsV2 = {
version: 2,
initialDelayDuration: undefined,
cadence: '* * * * * *',
timeoutAfterDuration: Duration.fromMillis(60000).toISO()!,
};
const checkFrequency = Duration.fromObject({ milliseconds: 100 });
const worker = new TaskWorker('task1', fn, knex, logger, checkFrequency);
worker.start(settings, { signal: testScopedSignal() });
await waitForExpect(() => {
expect(fn).toHaveBeenCalledTimes(3);
});
},
);
it.each(databases.eachSupportedId())(
'does not clobber ticket lock when stolen, %p',
async databaseId => {
const knex = await databases.init(databaseId);
await migrateBackendTasks(knex);
const fn = jest.fn(
async () => new Promise<void>(resolve => setTimeout(resolve, 50)),
);
const settings: TaskSettingsV2 = {
version: 2,
initialDelayDuration: undefined,
cadence: '* * * * * *',
timeoutAfterDuration: Duration.fromMillis(60000).toISO()!,
};
const worker = new TaskWorker('task1', fn, knex, logger);
await worker.persistTask(settings);
await waitForExpect(async () => {
await expect(worker.findReadyTask()).resolves.toEqual({
result: 'ready',
settings,
});
});
await expect(worker.tryClaimTask('ticket', settings)).resolves.toBe(true);
let row = (await knex<DbTasksRow>(DB_TASKS_TABLE))[0];
expect(row).toEqual(
expect.objectContaining({
id: 'task1',
current_run_ticket: 'ticket',
current_run_started_at: expect.anything(),
current_run_expires_at: expect.anything(),
}),
);
await knex<DbTasksRow>(DB_TASKS_TABLE)
.where('id', '=', 'task1')
.update({ current_run_ticket: 'stolen' });
await expect(worker.tryReleaseTask('ticket', settings)).resolves.toBe(
false,
);
row = (await knex<DbTasksRow>(DB_TASKS_TABLE))[0];
expect(row).toEqual(
expect.objectContaining({
id: 'task1',
current_run_ticket: 'stolen',
current_run_started_at: expect.anything(),
current_run_expires_at: expect.anything(),
}),
);
},
);
it.each(databases.eachSupportedId())(
'gracefully handles a disappeared task row, %p',
async databaseId => {
const knex = await databases.init(databaseId);
await migrateBackendTasks(knex);
const fn = jest.fn(async () => {});
const settings: TaskSettingsV2 = {
version: 2,
initialDelayDuration: undefined,
cadence: '* * * * * *',
timeoutAfterDuration: Duration.fromMillis(60000).toISO()!,
};
const worker1 = new TaskWorker('task1', fn, knex, logger);
await worker1.persistTask(settings);
await knex<DbTasksRow>(DB_TASKS_TABLE).where('id', '=', 'task1').delete();
await expect(worker1.findReadyTask()).resolves.toEqual({
result: 'abort',
});
const worker2 = new TaskWorker('task2', fn, knex, logger);
await worker2.persistTask(settings);
await waitForExpect(async () => {
await expect(worker2.findReadyTask()).resolves.toEqual({
result: 'ready',
settings,
});
});
await knex<DbTasksRow>(DB_TASKS_TABLE).where('id', '=', 'task2').delete();
await expect(worker2.tryClaimTask('ticket', settings)).resolves.toBe(
false,
);
const worker3 = new TaskWorker('task3', fn, knex, logger);
await worker3.persistTask(settings);
await waitForExpect(async () => {
await expect(worker3.findReadyTask()).resolves.toEqual({
result: 'ready',
settings,
});
});
await expect(worker3.tryClaimTask('ticket', settings)).resolves.toBe(
true,
);
await knex<DbTasksRow>(DB_TASKS_TABLE).where('id', '=', 'task3').delete();
await expect(worker3.tryReleaseTask('ticket', settings)).resolves.toBe(
false,
);
},
);
it.each(databases.eachSupportedId())(
'respects initialDelayDuration per worker, %p',
async databaseId => {
const knex = await databases.init(databaseId);
await migrateBackendTasks(knex);
const abortFirst = new AbortController();
const settings: TaskSettingsV2 = {
version: 2,
initialDelayDuration: 'PT0.3S',
cadence: 'PT0.1S',
timeoutAfterDuration: 'PT10S',
};
// Start a single worker and make sure it waits and then goes to work
const fn1 = jest.fn(async () => {});
const worker1 = new TaskWorker(
'task1',
fn1,
knex,
logger,
Duration.fromMillis(10),
);
await worker1.start(settings, { signal: abortFirst.signal });
expect(fn1).toHaveBeenCalledTimes(0);
await new Promise(resolve => setTimeout(resolve, 250));
expect(fn1).toHaveBeenCalledTimes(0);
await new Promise(resolve => setTimeout(resolve, 100));
expect(fn1.mock.calls.length).toBeGreaterThan(0);
// Start a second worker and make sure it waits but the first worker still works along
const fn2 = jest.fn();
const promise2 = new Promise(resolve => fn2.mockImplementation(resolve));
const worker2 = new TaskWorker(
'task1',
fn2,
knex,
logger,
Duration.fromMillis(10),
);
await worker2.start(settings, { signal: testScopedSignal() });
// We eventually abort the first worker just to make sure that the second
// one for sure will get a go at running the task
setTimeout(() => abortFirst.abort(), 1000);
const before = fn1.mock.calls.length;
await promise2;
expect(fn1.mock.calls.length).toBeGreaterThan(before);
await knex.destroy();
},
);
it.each(databases.eachSupportedId())(
'next_run_start_at is always the min between schedule changes from cron frequency, %p',
async databaseId => {
const knex = await databases.init(databaseId);
await migrateBackendTasks(knex);
const fn = jest.fn(
async () => new Promise<void>(resolve => setTimeout(resolve, 50)),
);
const settings: TaskSettingsV2 = {
version: 2,
cadence: '*/15 * * * *',
initialDelayDuration: 'PT2M',
timeoutAfterDuration: 'PT1M',
};
const worker = new TaskWorker('task99', fn, knex, logger);
await worker.persistTask(settings);
const row1 = (await knex<DbTasksRow>(DB_TASKS_TABLE))[0];
const settings2 = {
...settings,
cadence: '*/2 * * * *',
initialDelayDuration: 'PT1M',
};
await worker.persistTask(settings2);
const row2 = (await knex<DbTasksRow>(DB_TASKS_TABLE))[0];
expect(row2.next_run_start_at).not.toStrictEqual(row1.next_run_start_at);
const settings3 = { ...settings };
await worker.persistTask(settings3);
const row3 = (await knex<DbTasksRow>(DB_TASKS_TABLE))[0];
// The new timestamp can basically be 0 or a minute depending on how the
// initialDelayDuration falls right on a cron boundary. This kinda
// contrived check removes a test flakiness based on wall clock time.
expect(
Math.abs(
+new Date(row3.next_run_start_at) - +new Date(row2.next_run_start_at),
),
).toBeLessThanOrEqual(60_000);
await knex.destroy();
},
);
it.each(databases.eachSupportedId())(
'next_run_start_at is always the min between schedule changes when using human duration frequency, %p',
async databaseId => {
const knex = await databases.init(databaseId);
await migrateBackendTasks(knex);
const fn = jest.fn(
async () => new Promise<void>(resolve => setTimeout(resolve, 50)),
);
const initialSettings: TaskSettingsV2 = {
version: 2,
cadence: 'PT120M',
timeoutAfterDuration: 'PT1M',
};
const worker = new TaskWorker('task99', fn, knex, logger);
await worker.persistTask(initialSettings);
// replicate task running, sets next_run_start_at based on cadence
await worker.tryClaimTask('ticket', initialSettings);
await worker.tryReleaseTask('ticket', initialSettings);
// grab initial row for comparisons later
const rowAfterClaimAndRelease = (
await knex<DbTasksRow>(DB_TASKS_TABLE)
)[0];
const settings: TaskSettingsV2 = {
...initialSettings,
cadence: 'PT60M',
};
await worker.persistTask(settings);
const row1 = (await knex<DbTasksRow>(DB_TASKS_TABLE))[0];
const rowAfterClaimAndReleaseNextStartAt = DateTime.fromJSDate(
new Date(rowAfterClaimAndRelease.next_run_start_at),
);
const row1NextStartAt = DateTime.fromJSDate(
new Date(row1.next_run_start_at),
);
const now = DateTime.now();
expect(
rowAfterClaimAndReleaseNextStartAt.diff(row1NextStartAt).as('minutes'),
).toBeCloseTo(60, 1); // ensure that next start at is sooner than initial by one hour
expect(row1NextStartAt.diff(now).as('minutes')).toBeCloseTo(60, 1); // ensure that next start at is later than now by one hour
expect(
rowAfterClaimAndReleaseNextStartAt.diff(now).as('minutes'),
).toBeCloseTo(120, 1);
const settings2 = {
...settings,
};
await worker.persistTask(settings2);
const row2 = (await knex<DbTasksRow>(DB_TASKS_TABLE))[0];
expect(row2.next_run_start_at).toStrictEqual(row1.next_run_start_at);
await knex.destroy();
},
);
it.each(databases.eachSupportedId())(
'next_run_start_at is always the min between schedule changes when using human duration frequency with initial start delay, %p',
async databaseId => {
const knex = await databases.init(databaseId);
await migrateBackendTasks(knex);
const fn = jest.fn(
async () => new Promise<void>(resolve => setTimeout(resolve, 50)),
);
const initialSettings: TaskSettingsV2 = {
version: 2,
cadence: 'PT120M',
initialDelayDuration: 'PT2M',
timeoutAfterDuration: 'PT1M',
};
const worker = new TaskWorker('task99', fn, knex, logger);
await worker.persistTask(initialSettings);
// replicate task running, sets next_run_start_at based on cadence
await worker.tryClaimTask('ticket', initialSettings);
await worker.tryReleaseTask('ticket', initialSettings);
// grab initial row for comparisons later
const rowAfterClaimAndRelease = (
await knex<DbTasksRow>(DB_TASKS_TABLE)
)[0];
const settings: TaskSettingsV2 = {
...initialSettings,
cadence: 'PT60M',
};
await worker.persistTask(settings);
const row1 = (await knex<DbTasksRow>(DB_TASKS_TABLE))[0];
const rowAfterClaimAndReleaseNextStartAt = DateTime.fromJSDate(
new Date(rowAfterClaimAndRelease.next_run_start_at),
);
const row1NextStartAt = DateTime.fromJSDate(
new Date(row1.next_run_start_at),
);
const now = DateTime.now();
expect(
rowAfterClaimAndReleaseNextStartAt.diff(row1NextStartAt).as('minutes'),
).toBeCloseTo(62, 1); // ensure that next start at is sooner than initial by one hour, plus the 2 minute delay (set my tryReleaseTask)
expect(row1NextStartAt.diff(now).as('minutes')).toBeCloseTo(60, 1); // ensure that next start at is later than now by one hour (2 minute delay doesn't take effect here)
expect(
rowAfterClaimAndReleaseNextStartAt.diff(now).as('minutes'),
).toBeCloseTo(122, 1); // includes 2 minute start delay (which is persisted from tryReleaseTask)
const settings2 = {
...settings,
};
await worker.persistTask(settings2);
const row2 = (await knex<DbTasksRow>(DB_TASKS_TABLE))[0];
expect(row2.next_run_start_at).toStrictEqual(row1.next_run_start_at);
await knex.destroy();
},
);
it.each(databases.eachSupportedId())(
'next_run_start_at is not set for manually-triggered tasks, %p',
async databaseId => {
const knex = await databases.init(databaseId);
await migrateBackendTasks(knex);
const fn = jest.fn(
async () => new Promise<void>(resolve => setTimeout(resolve, 50)),
);
const initialSettings: TaskSettingsV2 = {
version: 2,
cadence: 'manual',
timeoutAfterDuration: 'PT1M',
};
const worker = new TaskWorker('task99', fn, knex, logger);
await worker.persistTask(initialSettings);
await worker.tryClaimTask('ticket', initialSettings);
await worker.tryReleaseTask('ticket', initialSettings);
const row = (await knex<DbTasksRow>(DB_TASKS_TABLE))[0];
expect(row.next_run_start_at).toBeNull();
await knex.destroy();
},
);
});
@@ -1,381 +0,0 @@
/*
* Copyright 2021 The Backstage Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { ConflictError, NotFoundError } from '@backstage/errors';
import { CronTime } from 'cron';
import { Knex } from 'knex';
import { DateTime, Duration } from 'luxon';
import { v4 as uuid } from 'uuid';
import { DB_TASKS_TABLE, DbTasksRow } from '../database/tables';
import { TaskFunction, TaskSettingsV2, taskSettingsV2Schema } from './types';
import { delegateAbortController, nowPlus, sleep } from './util';
import { LoggerService } from '@backstage/backend-plugin-api';
const DEFAULT_WORK_CHECK_FREQUENCY = Duration.fromObject({ seconds: 5 });
/**
* Implements tasks that run across worker hosts, with collaborative locking.
*
* @private
*/
export class TaskWorker {
constructor(
private readonly taskId: string,
private readonly fn: TaskFunction,
private readonly knex: Knex,
private readonly logger: LoggerService,
private readonly workCheckFrequency: Duration = DEFAULT_WORK_CHECK_FREQUENCY,
) {}
async start(settings: TaskSettingsV2, options?: { signal?: AbortSignal }) {
try {
await this.persistTask(settings);
} catch (e) {
throw new Error(`Failed to persist task, ${e}`);
}
this.logger.info(
`Task worker starting: ${this.taskId}, ${JSON.stringify(settings)}`,
);
let workCheckFrequency = this.workCheckFrequency;
const isDuration = settings?.cadence.startsWith('P');
if (isDuration) {
const cadence = Duration.fromISO(settings.cadence);
if (cadence < workCheckFrequency) {
workCheckFrequency = cadence;
}
}
let attemptNum = 1;
(async () => {
for (;;) {
try {
if (settings.initialDelayDuration) {
await sleep(
Duration.fromISO(settings.initialDelayDuration),
options?.signal,
);
}
while (!options?.signal?.aborted) {
const runResult = await this.runOnce(options?.signal);
if (runResult.result === 'abort') {
break;
}
await sleep(workCheckFrequency, options?.signal);
}
this.logger.info(`Task worker finished: ${this.taskId}`);
attemptNum = 0;
break;
} catch (e) {
attemptNum += 1;
this.logger.warn(
`Task worker failed unexpectedly, attempt number ${attemptNum}, ${e}`,
);
await sleep(Duration.fromObject({ seconds: 1 }));
}
}
})();
}
static async trigger(knex: Knex, taskId: string): Promise<void> {
// check if task exists
const rows = await knex<DbTasksRow>(DB_TASKS_TABLE)
.select(knex.raw(1))
.where('id', '=', taskId);
if (rows.length !== 1) {
throw new NotFoundError(`Task ${taskId} does not exist`);
}
const updatedRows = await knex<DbTasksRow>(DB_TASKS_TABLE)
.where('id', '=', taskId)
.whereNull('current_run_ticket')
.update({
next_run_start_at: knex.fn.now(),
});
if (updatedRows < 1) {
throw new ConflictError(`Task ${taskId} is currently running`);
}
}
/**
* Makes a single attempt at running the task to completion, if ready.
*
* @returns The outcome of the attempt
*/
private async runOnce(
signal?: AbortSignal,
): Promise<
| { result: 'not-ready-yet' }
| { result: 'abort' }
| { result: 'failed' }
| { result: 'completed' }
> {
const findResult = await this.findReadyTask();
if (
findResult.result === 'not-ready-yet' ||
findResult.result === 'abort'
) {
return findResult;
}
const taskSettings = findResult.settings;
const ticket = uuid();
const claimed = await this.tryClaimTask(ticket, taskSettings);
if (!claimed) {
return { result: 'not-ready-yet' };
}
// Abort the task execution either if the worker is stopped, or if the
// task timeout is hit
const taskAbortController = delegateAbortController(signal);
const timeoutHandle = setTimeout(() => {
taskAbortController.abort();
}, Duration.fromISO(taskSettings.timeoutAfterDuration).as('milliseconds'));
try {
await this.fn(taskAbortController.signal);
taskAbortController.abort(); // releases resources
} catch (e) {
this.logger.error(e);
await this.tryReleaseTask(ticket, taskSettings);
return { result: 'failed' };
} finally {
clearTimeout(timeoutHandle);
}
await this.tryReleaseTask(ticket, taskSettings);
return { result: 'completed' };
}
/**
* Perform the initial store of the task info
*/
async persistTask(settings: TaskSettingsV2) {
// Perform an initial parse to ensure that we will definitely be able to
// read it back again.
taskSettingsV2Schema.parse(settings);
const isManual = settings?.cadence === 'manual';
const isDuration = settings?.cadence.startsWith('P');
const isCron = !isManual && !isDuration;
let startAt: Knex.Raw | undefined;
let nextStartAt: Knex.Raw | undefined;
if (settings.initialDelayDuration) {
startAt = nowPlus(
Duration.fromISO(settings.initialDelayDuration),
this.knex,
);
}
if (isCron) {
const time = new CronTime(settings.cadence)
.sendAt()
.minus({ seconds: 1 }) // immediately, if "* * * * * *"
.toUTC();
nextStartAt = this.nextRunAtRaw(time);
startAt ||= nextStartAt;
} else if (isManual) {
nextStartAt = this.knex.raw('null');
startAt ||= nextStartAt;
} else {
startAt ||= this.knex.fn.now();
nextStartAt = nowPlus(Duration.fromISO(settings.cadence), this.knex);
}
this.logger.debug(`task: ${this.taskId} configured to run at: ${startAt}`);
// It's OK if the task already exists; if it does, just replace its
// settings with the new value and start the loop as usual.
const settingsJson = JSON.stringify(settings);
await this.knex<DbTasksRow>(DB_TASKS_TABLE)
.insert({
id: this.taskId,
settings_json: settingsJson,
next_run_start_at: startAt,
})
.onConflict('id')
.merge(
this.knex.client.config.client.includes('mysql')
? {
settings_json: settingsJson,
next_run_start_at: this.knex.raw(
`CASE WHEN ?? < ?? THEN ?? ELSE ?? END`,
[
nextStartAt,
'next_run_start_at',
nextStartAt,
'next_run_start_at',
],
),
}
: {
settings_json: this.knex.ref('excluded.settings_json'),
next_run_start_at: this.knex.raw(
`CASE WHEN ?? < ?? THEN ?? ELSE ?? END`,
[
nextStartAt,
`${DB_TASKS_TABLE}.next_run_start_at`,
nextStartAt,
`${DB_TASKS_TABLE}.next_run_start_at`,
],
),
},
);
}
/**
* Check if the task is ready to run
*/
async findReadyTask(): Promise<
| { result: 'not-ready-yet' }
| { result: 'abort' }
| { result: 'ready'; settings: TaskSettingsV2 }
> {
const [row] = await this.knex<DbTasksRow>(DB_TASKS_TABLE)
.where('id', '=', this.taskId)
.select({
settingsJson: 'settings_json',
ready: this.knex.raw(
`CASE
WHEN next_run_start_at <= ? AND current_run_ticket IS NULL THEN TRUE
ELSE FALSE
END`,
[this.knex.fn.now()],
),
});
if (!row) {
this.logger.info(
'No longer able to find task; aborting and assuming that it has been unregistered or expired',
);
return { result: 'abort' };
} else if (!row.ready) {
return { result: 'not-ready-yet' };
}
try {
const obj = JSON.parse(row.settingsJson);
const settings = taskSettingsV2Schema.parse(obj);
return { result: 'ready', settings };
} catch (e) {
this.logger.info(
`Task "${this.taskId}" is no longer able to parse task settings; aborting and assuming that a ` +
`newer version of the task has been issued and being handled by other workers, ${e}`,
);
return { result: 'abort' };
}
}
/**
* Attempts to claim a task that's ready for execution, on this worker's
* behalf. We should not attempt to perform the work unless the claim really
* goes through.
*
* @param ticket - A globally unique string that changes for each invocation
* @param settings - The settings of the task to claim
* @returns True if it was successfully claimed
*/
async tryClaimTask(
ticket: string,
settings: TaskSettingsV2,
): Promise<boolean> {
const startedAt = this.knex.fn.now();
const expiresAt = settings.timeoutAfterDuration
? nowPlus(Duration.fromISO(settings.timeoutAfterDuration), this.knex)
: this.knex.raw('null');
const rows = await this.knex<DbTasksRow>(DB_TASKS_TABLE)
.where('id', '=', this.taskId)
.whereNull('current_run_ticket')
.update({
current_run_ticket: ticket,
current_run_started_at: startedAt,
current_run_expires_at: expiresAt,
});
return rows === 1;
}
async tryReleaseTask(
ticket: string,
settings: TaskSettingsV2,
): Promise<boolean> {
const isManual = settings?.cadence === 'manual';
const isDuration = settings?.cadence.startsWith('P');
const isCron = !isManual && !isDuration;
let nextRun: Knex.Raw;
if (isCron) {
const time = new CronTime(settings.cadence).sendAt().toUTC();
this.logger.debug(`task: ${this.taskId} will next occur around ${time}`);
nextRun = this.nextRunAtRaw(time);
} else if (isManual) {
nextRun = this.knex.raw('null');
} else {
const dt = Duration.fromISO(settings.cadence).as('seconds');
this.logger.debug(
`task: ${this.taskId} will next occur around ${DateTime.now().plus({
seconds: dt,
})}`,
);
if (this.knex.client.config.client.includes('sqlite3')) {
nextRun = this.knex.raw(
`max(datetime(next_run_start_at, ?), datetime('now'))`,
[`+${dt} seconds`],
);
} else if (this.knex.client.config.client.includes('mysql')) {
nextRun = this.knex.raw(
`greatest(next_run_start_at + interval ${dt} second, now())`,
);
} else {
nextRun = this.knex.raw(
`greatest(next_run_start_at + interval '${dt} seconds', now())`,
);
}
}
const rows = await this.knex<DbTasksRow>(DB_TASKS_TABLE)
.where('id', '=', this.taskId)
.where('current_run_ticket', '=', ticket)
.update({
next_run_start_at: nextRun,
current_run_ticket: this.knex.raw('null'),
current_run_started_at: this.knex.raw('null'),
current_run_expires_at: this.knex.raw('null'),
});
return rows === 1;
}
private nextRunAtRaw(time: DateTime): Knex.Raw {
if (this.knex.client.config.client.includes('sqlite3')) {
return this.knex.raw('datetime(?)', [time.toISO()]);
} else if (this.knex.client.config.client.includes('mysql')) {
return this.knex.raw(`?`, [time.toSQL({ includeOffset: false })]);
}
return this.knex.raw(`?`, [time.toISO()]);
}
}
@@ -1,28 +0,0 @@
/*
* Copyright 2023 The Backstage Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
export function createTestScopedSignal(): () => AbortSignal {
let testAbortController = new AbortController();
beforeEach(() => {
testAbortController = new AbortController();
});
afterEach(() => {
testAbortController.abort();
});
return () => testAbortController.signal;
}
-27
View File
@@ -1,27 +0,0 @@
/*
* Copyright 2021 The Backstage Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
export { readTaskScheduleDefinitionFromConfig } from './readTaskScheduleDefinitionFromConfig';
export { TaskScheduler } from './TaskScheduler';
export type {
PluginTaskScheduler,
TaskFunction,
TaskDescriptor,
TaskInvocationDefinition,
TaskRunner,
TaskScheduleDefinition,
TaskScheduleDefinitionConfig,
} from './types';
@@ -1,131 +0,0 @@
/*
* Copyright 2022 The Backstage Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { ConfigReader } from '@backstage/config';
import { HumanDuration } from '@backstage/types';
import { readTaskScheduleDefinitionFromConfig } from './readTaskScheduleDefinitionFromConfig';
describe('readTaskScheduleDefinitionFromConfig', () => {
it('all valid values', () => {
const config = new ConfigReader({
frequency: {
cron: '0 30 * * * *',
},
timeout: 'PT3M',
initialDelay: {
minutes: 20,
},
scope: 'global',
});
const result = readTaskScheduleDefinitionFromConfig(config);
expect((result.frequency as { cron: string }).cron).toBe('0 30 * * * *');
expect(result.timeout).toEqual({ minutes: 3 });
expect((result.initialDelay as HumanDuration).minutes).toEqual(20);
expect(result.scope).toBe('global');
});
it('all valid required values', () => {
const config = new ConfigReader({
frequency: {
cron: '0 30 * * * *',
},
timeout: 'PT3M',
});
const result = readTaskScheduleDefinitionFromConfig(config);
expect((result.frequency as { cron: string }).cron).toBe('0 30 * * * *');
expect(result.timeout).toEqual({ minutes: 3 });
expect(result.initialDelay).toBeUndefined();
expect(result.scope).toBeUndefined();
});
it('fail without required frequency', () => {
const config = new ConfigReader({
timeout: 'PT3M',
});
expect(() => readTaskScheduleDefinitionFromConfig(config)).toThrow(
"Missing required config value at 'frequency'",
);
});
it('fail without required timeout', () => {
const config = new ConfigReader({
frequency: 'PT30M',
});
expect(() => readTaskScheduleDefinitionFromConfig(config)).toThrow(
"Missing required config value at 'timeout'",
);
});
it('invalid frequency key', () => {
const config = new ConfigReader({
frequency: {
invalid: 'value',
},
timeout: 'PT3M',
});
expect(() => readTaskScheduleDefinitionFromConfig(config)).toThrow(
"Failed to read duration from config at 'frequency', Error: Needs one or more of 'years', 'months', 'weeks', 'days', 'hours', 'minutes', 'seconds', 'milliseconds'",
);
});
it('invalid frequency value', () => {
const config = new ConfigReader({
frequency: {
minutes: 'value',
},
timeout: 'PT3M',
});
expect(() => readTaskScheduleDefinitionFromConfig(config)).toThrow(
"Failed to read duration from config, Error: Unable to convert config value for key 'frequency.minutes' in 'mock-config' to a number",
);
});
it('frequency value with additional invalid prop', () => {
const config = new ConfigReader({
frequency: {
minutes: 20,
invalid: 'value',
},
timeout: 'PT3M',
});
expect(() => readTaskScheduleDefinitionFromConfig(config)).toThrow(
"Failed to read duration from config at 'frequency', Error: Unknown property 'invalid'; expected one or more of 'years', 'months', 'weeks', 'days', 'hours', 'minutes', 'seconds', 'milliseconds'",
);
});
it('invalid scope value', () => {
const config = new ConfigReader({
frequency: {
years: 2,
},
timeout: 'PT3M',
scope: 'invalid',
});
expect(() => readTaskScheduleDefinitionFromConfig(config)).toThrow(
'Only "global" or "local" are allowed for TaskScheduleDefinition.scope, but got: invalid',
);
});
});
@@ -1,85 +0,0 @@
/*
* Copyright 2022 The Backstage Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { Config, readDurationFromConfig } from '@backstage/config';
import { HumanDuration } from '@backstage/types';
import { TaskScheduleDefinition } from './types';
import { Duration } from 'luxon';
function readDuration(config: Config, key: string): HumanDuration {
if (typeof config.get(key) === 'string') {
const value = config.getString(key);
const duration = Duration.fromISO(value);
if (!duration.isValid) {
throw new Error(`Invalid duration: ${value}`);
}
return duration.toObject();
}
return readDurationFromConfig(config, { key });
}
function readFrequency(
config: Config,
key: string,
): { cron: string } | Duration | HumanDuration | { trigger: 'manual' } {
const value = config.get(key);
if (typeof value === 'object' && (value as { cron?: string }).cron) {
return value as { cron: string };
}
if (
typeof value === 'object' &&
(value as { trigger?: string }).trigger === 'manual'
) {
return { trigger: 'manual' };
}
return readDuration(config, key);
}
/**
* Reads a TaskScheduleDefinition from a Config.
* Expects the config not to be the root config,
* but the config for the definition.
*
* @param config - config for a TaskScheduleDefinition.
* @public
* @deprecated Please import `readSchedulerServiceTaskScheduleDefinitionFromConfig` from `@backstage/backend-plugin-api` instead
*/
export function readTaskScheduleDefinitionFromConfig(
config: Config,
): TaskScheduleDefinition {
const frequency = readFrequency(config, 'frequency');
const timeout = readDuration(config, 'timeout');
const initialDelay = config.has('initialDelay')
? readDuration(config, 'initialDelay')
: undefined;
const scope = config.getOptionalString('scope');
if (scope && !['global', 'local'].includes(scope)) {
throw new Error(
`Only "global" or "local" are allowed for TaskScheduleDefinition.scope, but got: ${scope}`,
);
}
return {
frequency,
timeout,
initialDelay,
scope: scope as 'global' | 'local' | undefined,
};
}
-437
View File
@@ -1,437 +0,0 @@
/*
* Copyright 2021 The Backstage Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { HumanDuration, JsonObject } from '@backstage/types';
import { CronTime } from 'cron';
import { Duration } from 'luxon';
import { z } from 'zod';
/**
* A function that can be called as a scheduled task.
*
* It may optionally accept an abort signal argument. When the signal triggers,
* processing should abort and return as quickly as possible.
*
* @public
* @deprecated Please import `SchedulerServiceTaskFunction` from `@backstage/backend-plugin-api` instead
*/
export type TaskFunction =
| ((abortSignal: AbortSignal) => void | Promise<void>)
| (() => void | Promise<void>);
/**
* A semi-opaque type to describe an actively scheduled task.
*
* @public
* @deprecated Please import `SchedulerServiceTaskDescriptor` from `@backstage/backend-plugin-api` instead
*/
export type TaskDescriptor = {
/**
* The unique identifier of the task.
*/
id: string;
/**
* The scope of the task.
*/
scope: 'global' | 'local';
/**
* The settings that control the task flow. This is a semi-opaque structure
* that is mainly there for debugging purposes. Do not make any assumptions
* about the contents of this field.
*/
settings: { version: number } & JsonObject;
};
/**
* Options that control the scheduling of a task.
*
* @public
* @deprecated Please import `SchedulerServiceTaskScheduleDefinition` from `@backstage/backend-plugin-api` instead
*/
export interface TaskScheduleDefinition {
/**
* How often you want the task to run. The system does its best to avoid
* overlapping invocations.
*
* @remarks
*
* This is the best effort value; under some circumstances there can be
* deviations. For example, if the task runtime is longer than the frequency
* and the timeout has not been given or not been exceeded yet, the next
* invocation of this task will be delayed until after the previous one
* finishes.
*
* This is a required field.
*/
frequency:
| {
/**
* A crontab style string.
*
* @remarks
*
* Overview:
*
* ```
* second (optional)
* minute
* hour
* day of month
* month
* day of week
*
*
* * * * * * *
* ```
*/
cron: string;
}
| Duration
| HumanDuration
/**
* This task will only run when manually triggered with the `triggerTask` method; no automatic
* scheduling. This is useful for locking of global tasks that should not be run concurrently.
*/
| { trigger: 'manual' };
/**
* The maximum amount of time that a single task invocation can take, before
* it's considered timed out and gets "released" such that a new invocation
* is permitted to take place (possibly, then, on a different worker).
*/
timeout: Duration | HumanDuration;
/**
* The amount of time that should pass before the first invocation happens.
*
* @remarks
*
* This can be useful in cold start scenarios to stagger or delay some heavy
* compute jobs. If no value is given for this field then the first invocation
* will happen as soon as possible according to the cadence.
*
* NOTE: This is a per-worker delay. If you have a cluster of workers all
* collaborating on a task that has its `scope` field set to `'global'`, then
* you may still see the task being processed by other long-lived workers,
* while any given single worker is in its initial sleep delay time e.g. after
* a deployment. Therefore, this parameter is not useful for "globally" pausing
* work; its main intended use is for individual machines to get a chance to
* reach some equilibrium at startup before triggering heavy batch workloads.
*/
initialDelay?: Duration | HumanDuration;
/**
* Sets the scope of concurrency control / locking to apply for invocations of
* this task.
*
* @remarks
*
* When the scope is set to the default value `'global'`, the scheduler will
* attempt to ensure that only one worker machine runs the task at a time,
* according to the given cadence. This means that as the number of worker
* hosts increases, the invocation frequency of this task will not go up.
* Instead, the load is spread randomly across hosts. This setting is useful
* for tasks that access shared resources, for example catalog ingestion tasks
* where you do not want many machines to repeatedly import the same data and
* trample over each other.
*
* When the scope is set to `'local'`, there is no concurrency control across
* hosts. Each host runs the task according to the given cadence similarly to
* `setInterval`, but the runtime ensures that there are no overlapping runs.
*
* @defaultValue 'global'
*/
scope?: 'global' | 'local';
}
/**
* Config options for {@link TaskScheduleDefinition}
* that control the scheduling of a task.
*
* @public
* @deprecated Please import `SchedulerServiceTaskScheduleDefinitionConfig` from `@backstage/backend-plugin-api` instead
*/
export interface TaskScheduleDefinitionConfig {
/**
* How often you want the task to run. The system does its best to avoid
* overlapping invocations.
*
* @remarks
*
* This is the best effort value; under some circumstances there can be
* deviations. For example, if the task runtime is longer than the frequency
* and the timeout has not been given or not been exceeded yet, the next
* invocation of this task will be delayed until after the previous one
* finishes.
*
* This is a required field.
*/
frequency:
| {
/**
* A crontab style string.
*
* @remarks
*
* Overview:
*
* ```
* second (optional)
* minute
* hour
* day of month
* month
* day of week
*
*
* * * * * * *
* ```
*/
cron: string;
}
| string
| HumanDuration;
/**
* The maximum amount of time that a single task invocation can take, before
* it's considered timed out and gets "released" such that a new invocation
* is permitted to take place (possibly, then, on a different worker).
*/
timeout: string | HumanDuration;
/**
* The amount of time that should pass before the first invocation happens.
*
* @remarks
*
* This can be useful in cold start scenarios to stagger or delay some heavy
* compute jobs. If no value is given for this field then the first invocation
* will happen as soon as possible according to the cadence.
*
* NOTE: This is a per-worker delay. If you have a cluster of workers all
* collaborating on a task that has its `scope` field set to `'global'`, then
* you may still see the task being processed by other long-lived workers,
* while any given single worker is in its initial sleep delay time e.g. after
* a deployment. Therefore, this parameter is not useful for "globally" pausing
* work; its main intended use is for individual machines to get a chance to
* reach some equilibrium at startup before triggering heavy batch workloads.
*/
initialDelay?: string | HumanDuration;
/**
* Sets the scope of concurrency control / locking to apply for invocations of
* this task.
*
* @remarks
*
* When the scope is set to the default value `'global'`, the scheduler will
* attempt to ensure that only one worker machine runs the task at a time,
* according to the given cadence. This means that as the number of worker
* hosts increases, the invocation frequency of this task will not go up.
* Instead, the load is spread randomly across hosts. This setting is useful
* for tasks that access shared resources, for example catalog ingestion tasks
* where you do not want many machines to repeatedly import the same data and
* trample over each other.
*
* When the scope is set to `'local'`, there is no concurrency control across
* hosts. Each host runs the task according to the given cadence similarly to
* `setInterval`, but the runtime ensures that there are no overlapping runs.
*
* @defaultValue 'global'
*/
scope?: 'global' | 'local';
}
/**
* Options that apply to the invocation of a given task.
*
* @public
* @deprecated Please import `SchedulerServiceTaskInvocationDefinition` from `@backstage/backend-plugin-api` instead
*/
export interface TaskInvocationDefinition {
/**
* A unique ID (within the scope of the plugin) for the task.
*/
id: string;
/**
* The actual task function to be invoked regularly.
*/
fn: TaskFunction;
/**
* An abort signal that, when triggered, will stop the recurring execution of
* the task.
*/
signal?: AbortSignal;
}
/**
* A previously prepared task schedule, ready to be invoked.
*
* @public
* @deprecated Please import `SchedulerServiceTaskRunner` from `@backstage/backend-plugin-api` instead
*/
export interface TaskRunner {
/**
* Takes the schedule and executes an actual task using it.
*
* @param task - The actual runtime properties of the task
*/
run(task: TaskInvocationDefinition): Promise<void>;
}
/**
* Deals with the scheduling of distributed tasks, for a given plugin.
*
* @public
* @deprecated Please use `SchedulerService` from `@backstage/backend-plugin-api` instead (most likely via `coreServices.scheduler`)
*/
export interface PluginTaskScheduler {
/**
* Manually triggers a task by ID.
*
* If the task doesn't exist, a NotFoundError is thrown. If the task is
* currently running, a ConflictError is thrown.
*
* @param id - The task ID
*/
triggerTask(id: string): Promise<void>;
/**
* Schedules a task function for recurring runs.
*
* @remarks
*
* The `scope` task field controls whether to use coordinated exclusive
* invocation across workers, or to just coordinate within the current worker.
*
* This convenience method performs both the scheduling and invocation in one
* go.
*
* @param task - The task definition
*/
scheduleTask(
task: TaskScheduleDefinition & TaskInvocationDefinition,
): Promise<void>;
/**
* Creates a scheduled but dormant recurring task, ready to be launched at a
* later time.
*
* @remarks
*
* This method is useful for pre-creating a schedule in outer code to be
* passed into an inner implementation, such that the outer code controls
* scheduling while inner code controls implementation.
*
* @param schedule - The task schedule
*/
createScheduledTaskRunner(schedule: TaskScheduleDefinition): TaskRunner;
/**
* Returns all scheduled tasks registered to this scheduler.
*
* @remarks
*
* This method is useful for triggering tasks manually using the triggerTask
* functionality. Note that the returned tasks contain only tasks that have
* been initialized in this instance of the scheduler.
*
* @returns Scheduled tasks
*/
getScheduledTasks(): Promise<TaskDescriptor[]>;
}
function isValidOptionalDurationString(d: string | undefined): boolean {
try {
return !d || Duration.fromISO(d).isValid;
} catch {
return false;
}
}
function isValidCronFormat(c: string | undefined): boolean {
try {
if (!c) {
return false;
}
// parse cron format to ensure it's a valid format.
// eslint-disable-next-line no-new
new CronTime(c);
return true;
} catch {
return false;
}
}
function isValidTrigger(t: string): boolean {
return t === 'manual';
}
export const taskSettingsV1Schema = z.object({
version: z.literal(1),
initialDelayDuration: z
.string()
.optional()
.refine(isValidOptionalDurationString, {
message: 'Invalid duration, expecting ISO Period',
}),
recurringAtMostEveryDuration: z
.string()
.refine(isValidOptionalDurationString, {
message: 'Invalid duration, expecting ISO Period',
}),
timeoutAfterDuration: z.string().refine(isValidOptionalDurationString, {
message: 'Invalid duration, expecting ISO Period',
}),
});
/**
* The properties that control a scheduled task (version 1).
*/
export type TaskSettingsV1 = z.infer<typeof taskSettingsV1Schema>;
export const taskSettingsV2Schema = z.object({
version: z.literal(2),
cadence: z
.string()
.refine(isValidCronFormat, { message: 'Invalid cron' })
.or(
z.string().refine(isValidTrigger, {
message: "Invalid trigger, expecting 'manual'",
}),
)
.or(
z.string().refine(isValidOptionalDurationString, {
message: 'Invalid duration, expecting ISO Period',
}),
),
timeoutAfterDuration: z.string().refine(isValidOptionalDurationString, {
message: 'Invalid duration, expecting ISO Period',
}),
initialDelayDuration: z
.string()
.optional()
.refine(isValidOptionalDurationString, {
message: 'Invalid duration, expecting ISO Period',
}),
});
/**
* The properties that control a scheduled task (version 2).
*/
export type TaskSettingsV2 = z.infer<typeof taskSettingsV2Schema>;
@@ -1,113 +0,0 @@
/*
* Copyright 2021 The Backstage Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import knexFactory, { Knex } from 'knex';
import { Duration } from 'luxon';
import { delegateAbortController, nowPlus, sleep, validateId } from './util';
class KnexBuilder {
public build(client: string): Knex {
return knexFactory({ client, useNullAsDefault: true });
}
}
describe('util', () => {
describe('validateId', () => {
it.each(['a', 'a_b', 'ab123c_2', 'a!', 'A', 'a-b', 'a.b', '_a', 'a_'])(
'accepts valid inputs, %p',
async input => {
expect(validateId(input)).toBeUndefined();
},
);
it.each(['', null, Symbol('a')])(
'rejects invalid inputs, %p',
async input => {
expect(() => validateId(input as any)).toThrow();
},
);
});
describe('sleep', () => {
it('finishes the wait as expected with no signal', async () => {
const ac = new AbortController();
const start = Date.now();
await sleep(Duration.fromObject({ seconds: 1 }), ac.signal);
expect(Date.now() - start).toBeGreaterThan(800);
}, 5_000);
it('aborts properly on the signal', async () => {
const ac = new AbortController();
const promise = sleep(Duration.fromObject({ seconds: 10 }), ac.signal);
ac.abort();
await promise;
expect(true).toBe(true);
}, 1_000);
});
describe('delegateAbortController', () => {
it('inherits parent abort state', () => {
const parent = new AbortController();
const child = delegateAbortController(parent.signal);
expect(parent.signal.aborted).toBe(false);
expect(child.signal.aborted).toBe(false);
parent.abort();
expect(parent.signal.aborted).toBe(true);
expect(child.signal.aborted).toBe(true);
});
it('does not inherit from child to parent', () => {
const parent = new AbortController();
const child = delegateAbortController(parent.signal);
expect(parent.signal.aborted).toBe(false);
expect(child.signal.aborted).toBe(false);
child.abort();
expect(parent.signal.aborted).toBe(false);
expect(child.signal.aborted).toBe(true);
});
});
describe('nowPlus', () => {
describe('without duration', () => {
const databases = [
{ client: 'sqlite3', expected: 'CURRENT_TIMESTAMP' },
{ client: 'mysql2', expected: 'CURRENT_TIMESTAMP' },
{ client: 'pg', expected: 'CURRENT_TIMESTAMP' },
];
it.each(databases)('for client $client', ({ client, expected }) => {
const knex = new KnexBuilder().build(client);
const result = nowPlus(undefined, knex);
expect(result.toString()).toBe(expected);
});
});
describe('With duration', () => {
const databases = [
{ client: 'sqlite3', expected: "datetime('now', '20 seconds')" },
{ client: 'mysql2', expected: 'now() + interval 20 second' },
{ client: 'pg', expected: "now() + interval '20 seconds'" },
];
it.each(databases)('for client $client', ({ client, expected }) => {
const duration = Duration.fromObject({ seconds: 20 });
const knex = new KnexBuilder().build(client);
const result = nowPlus(duration, knex);
expect(result.toString()).toBe(expected);
});
});
});
});
-111
View File
@@ -1,111 +0,0 @@
/*
* Copyright 2021 The Backstage Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { InputError } from '@backstage/errors';
import { Knex } from 'knex';
import { DateTime, Duration } from 'luxon';
// Keep the IDs compatible with e.g. Prometheus labels
export function validateId(id: string) {
if (typeof id !== 'string' || !id.trim()) {
throw new InputError(`${id} is not a valid ID, expected non-empty string`);
}
}
export function dbTime(t: Date | string): DateTime {
if (typeof t === 'string') {
return DateTime.fromSQL(t);
}
return DateTime.fromJSDate(t);
}
export function nowPlus(duration: Duration | undefined, knex: Knex) {
const seconds = duration?.as('seconds') ?? 0;
if (!seconds) {
return knex.fn.now();
}
if (knex.client.config.client.includes('sqlite3')) {
return knex.raw(`datetime('now', ?)`, [`${seconds} seconds`]);
}
if (knex.client.config.client.includes('mysql')) {
return knex.raw(`now() + interval ${seconds} second`);
}
return knex.raw(`now() + interval '${seconds} seconds'`);
}
/**
* Sleep for the given duration, but return sooner if the abort signal
* triggers.
*
* @param duration - The amount of time to sleep, at most
* @param abortSignal - An optional abort signal that short circuits the wait
*/
export async function sleep(
duration: Duration,
abortSignal?: AbortSignal,
): Promise<void> {
if (abortSignal?.aborted) {
return;
}
await new Promise<void>(resolve => {
let timeoutHandle: NodeJS.Timeout | undefined = undefined;
const done = () => {
if (timeoutHandle) {
clearTimeout(timeoutHandle);
}
abortSignal?.removeEventListener('abort', done);
resolve();
};
timeoutHandle = setTimeout(done, duration.as('milliseconds'));
abortSignal?.addEventListener('abort', done);
});
}
/**
* Creates a new AbortController that, in addition to working as a regular
* standalone controller, also gets aborted if the given parent signal
* reaches aborted state.
*
* @param parent - The "parent" signal that can trigger the delegate
*/
export function delegateAbortController(parent?: AbortSignal): AbortController {
const delegate = new AbortController();
if (parent) {
if (parent.aborted) {
delegate.abort();
} else {
const onParentAborted = () => {
delegate.abort();
};
const onChildAborted = () => {
parent.removeEventListener('abort', onParentAborted);
};
parent.addEventListener('abort', onParentAborted, { once: true });
delegate.signal.addEventListener('abort', onChildAborted, { once: true });
}
}
return delegate;
}
-2
View File
@@ -34,7 +34,6 @@ import { version as root } from '../../../../package.json';
import { version as appDefaults } from '../../../app-defaults/package.json';
import { version as backendCommon } from '../../../backend-common/package.json';
import { version as backendDefaults } from '../../../backend-defaults/package.json';
import { version as backendTasks } from '../../../backend-tasks/package.json';
import { version as catalogClient } from '../../../catalog-client/package.json';
import { version as catalogModel } from '../../../catalog-model/package.json';
import { version as cli } from '../../../cli/package.json';
@@ -91,7 +90,6 @@ export const packageVersions = {
'@backstage/app-defaults': appDefaults,
'@backstage/backend-common': backendCommon,
'@backstage/backend-defaults': backendDefaults,
'@backstage/backend-tasks': backendTasks,
'@backstage/catalog-client': catalogClient,
'@backstage/catalog-model': catalogModel,
'@backstage/cli': cli,
@@ -18,7 +18,6 @@
"dependencies": {
"@backstage/backend-common": "^{{version '@backstage/backend-common'}}",
"@backstage/backend-defaults": "^{{version '@backstage/backend-defaults'}}",
"@backstage/backend-tasks": "^{{version '@backstage/backend-tasks'}}",
"@backstage/config": "^{{version '@backstage/config'}}",
"@backstage/plugin-app-backend": "^{{version '@backstage/plugin-app-backend'}}",
"@backstage/plugin-auth-backend": "^{{version '@backstage/plugin-auth-backend'}}",
-23
View File
@@ -3771,29 +3771,6 @@ __metadata:
languageName: unknown
linkType: soft
"@backstage/backend-tasks@workspace:packages/backend-tasks":
version: 0.0.0-use.local
resolution: "@backstage/backend-tasks@workspace:packages/backend-tasks"
dependencies:
"@backstage/backend-common": "workspace:^"
"@backstage/backend-plugin-api": "workspace:^"
"@backstage/backend-test-utils": "workspace:^"
"@backstage/cli": "workspace:^"
"@backstage/config": "workspace:^"
"@backstage/errors": "workspace:^"
"@backstage/types": "workspace:^"
"@opentelemetry/api": ^1.3.0
"@types/luxon": ^3.0.0
cron: ^3.0.0
knex: ^3.0.0
lodash: ^4.17.21
luxon: ^3.0.0
uuid: ^9.0.0
wait-for-expect: ^3.0.2
zod: ^3.22.4
languageName: unknown
linkType: soft
"@backstage/backend-test-utils@workspace:^, @backstage/backend-test-utils@workspace:packages/backend-test-utils":
version: 0.0.0-use.local
resolution: "@backstage/backend-test-utils@workspace:packages/backend-test-utils"