refactor: delete the backend tasks package
Signed-off-by: Camila Belo <camilaibs@gmail.com>
This commit is contained in:
@@ -15,7 +15,6 @@
|
||||
"example-backend-legacy": "0.2.101",
|
||||
"@backstage/backend-openapi-utils": "0.1.16",
|
||||
"@backstage/backend-plugin-api": "0.8.0",
|
||||
"@backstage/backend-tasks": "0.6.0",
|
||||
"@backstage/backend-test-utils": "0.5.0",
|
||||
"@backstage/catalog-client": "1.6.6",
|
||||
"@backstage/catalog-model": "1.6.0",
|
||||
|
||||
@@ -0,0 +1,5 @@
|
||||
---
|
||||
'@backstage/create-app': patch
|
||||
---
|
||||
|
||||
Remove references to the `@backstage/backend-tasks` in versions of the `create-app` package, as it has been deprecated.
|
||||
@@ -159,7 +159,7 @@ The same applies for modules that perform their own migrations and interact with
|
||||
the database. They will run on the same logical database instance as the target
|
||||
plugin, so care must be taken to choose table names that do not risk colliding
|
||||
with those of the plugin. A recommended naming pattern is `<package
|
||||
name>__<table name>`, for example the `@backstage/backend-tasks` package creates
|
||||
name>__<table name>`, for example the `Scheduler Service` package creates
|
||||
tables named `backstage_backend_tasks__<table>`. If you use the default [`Knex` migration facilities](https://knexjs.org/guide/migrations.html), you will also
|
||||
want to make sure that it uses similarly prefixed migration state tables for its
|
||||
internal bookkeeping needs, so they do not collide with the main ones used by
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
module.exports = require('@backstage/cli/config/eslint-factory')(__dirname);
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,36 +0,0 @@
|
||||
# @backstage/backend-tasks
|
||||
|
||||
> [!CAUTION]
|
||||
> This package is deprecated and will be removed in a near future.
|
||||
|
||||
Common distributed task management for Backstage backends.
|
||||
|
||||
## Usage
|
||||
|
||||
> [!CAUTION]
|
||||
> Please note that the documentation below is only valid for versions equal to or below `0.5.28-next.3`.
|
||||
> As this package will be deleted soon, we recommend that you migrate to the new backend system, and depend on `coreServices.scheduler` from `@backstage/backend-plugin-api` instead, or use `DefaultSchedulerService` from `@backstage/backend-defaults`. Here are the [backend](https://backstage.io/docs/backend-system/building-backends/migrating) and [plugin](https://backstage.io/docs/backend-system/building-plugins-and-modules/migrating) migration guides.
|
||||
|
||||
Add the library to your backend package:
|
||||
|
||||
```bash
|
||||
# From your Backstage root directory
|
||||
yarn --cwd packages/backend add @backstage/backend-tasks
|
||||
```
|
||||
|
||||
then make use of its facilities as necessary:
|
||||
|
||||
```typescript
|
||||
import { TaskScheduler } from '@backstage/backend-tasks';
|
||||
|
||||
const scheduler = TaskScheduler.fromConfig(rootConfig).forPlugin('my-plugin');
|
||||
|
||||
await scheduler.scheduleTask({
|
||||
id: 'refresh_things',
|
||||
frequency: { cron: '*/5 * * * *' }, // every 5 minutes, also supports Duration
|
||||
timeout: { minutes: 15 },
|
||||
fn: async () => {
|
||||
await entityProvider.run();
|
||||
},
|
||||
});
|
||||
```
|
||||
@@ -1,114 +0,0 @@
|
||||
## API Report File for "@backstage/backend-tasks"
|
||||
|
||||
> Do not edit this file. It is a report generated by [API Extractor](https://api-extractor.com/).
|
||||
|
||||
```ts
|
||||
import { Config } from '@backstage/config';
|
||||
import { Duration } from 'luxon';
|
||||
import { HumanDuration as HumanDuration_2 } from '@backstage/types';
|
||||
import { JsonObject } from '@backstage/types';
|
||||
import { LegacyRootDatabaseService } from '@backstage/backend-common';
|
||||
import { LoggerService } from '@backstage/backend-plugin-api';
|
||||
import { PluginDatabaseManager } from '@backstage/backend-common';
|
||||
|
||||
// @public @deprecated
|
||||
export type HumanDuration = HumanDuration_2;
|
||||
|
||||
// @public @deprecated
|
||||
export interface PluginTaskScheduler {
|
||||
createScheduledTaskRunner(schedule: TaskScheduleDefinition): TaskRunner;
|
||||
getScheduledTasks(): Promise<TaskDescriptor[]>;
|
||||
scheduleTask(
|
||||
task: TaskScheduleDefinition & TaskInvocationDefinition,
|
||||
): Promise<void>;
|
||||
triggerTask(id: string): Promise<void>;
|
||||
}
|
||||
|
||||
// @public @deprecated
|
||||
export function readTaskScheduleDefinitionFromConfig(
|
||||
config: Config,
|
||||
): TaskScheduleDefinition;
|
||||
|
||||
// @public @deprecated
|
||||
export type TaskDescriptor = {
|
||||
id: string;
|
||||
scope: 'global' | 'local';
|
||||
settings: {
|
||||
version: number;
|
||||
} & JsonObject;
|
||||
};
|
||||
|
||||
// @public @deprecated
|
||||
export type TaskFunction =
|
||||
| ((abortSignal: AbortSignal) => void | Promise<void>)
|
||||
| (() => void | Promise<void>);
|
||||
|
||||
// @public @deprecated
|
||||
export interface TaskInvocationDefinition {
|
||||
fn: TaskFunction;
|
||||
id: string;
|
||||
signal?: AbortSignal;
|
||||
}
|
||||
|
||||
// @public @deprecated
|
||||
export interface TaskRunner {
|
||||
run(task: TaskInvocationDefinition): Promise<void>;
|
||||
}
|
||||
|
||||
// @public @deprecated
|
||||
export interface TaskScheduleDefinition {
|
||||
frequency:
|
||||
| {
|
||||
cron: string;
|
||||
}
|
||||
| Duration
|
||||
| HumanDuration_2
|
||||
/**
|
||||
* This task will only run when manually triggered with the `triggerTask` method; no automatic
|
||||
* scheduling. This is useful for locking of global tasks that should not be run concurrently.
|
||||
*/
|
||||
| {
|
||||
trigger: 'manual';
|
||||
};
|
||||
initialDelay?: Duration | HumanDuration_2;
|
||||
scope?: 'global' | 'local';
|
||||
timeout: Duration | HumanDuration_2;
|
||||
}
|
||||
|
||||
// @public @deprecated
|
||||
export interface TaskScheduleDefinitionConfig {
|
||||
frequency:
|
||||
| {
|
||||
cron: string;
|
||||
}
|
||||
| string
|
||||
| HumanDuration_2;
|
||||
initialDelay?: string | HumanDuration_2;
|
||||
scope?: 'global' | 'local';
|
||||
timeout: string | HumanDuration_2;
|
||||
}
|
||||
|
||||
// @public @deprecated
|
||||
export class TaskScheduler {
|
||||
constructor(
|
||||
databaseManager: LegacyRootDatabaseService,
|
||||
logger: LoggerService,
|
||||
);
|
||||
// @deprecated
|
||||
forPlugin(pluginId: string): PluginTaskScheduler;
|
||||
// @deprecated (undocumented)
|
||||
static forPlugin(opts: {
|
||||
pluginId: string;
|
||||
databaseManager: PluginDatabaseManager;
|
||||
logger: LoggerService;
|
||||
}): PluginTaskScheduler;
|
||||
// @deprecated (undocumented)
|
||||
static fromConfig(
|
||||
config: Config,
|
||||
options?: {
|
||||
databaseManager?: LegacyRootDatabaseService;
|
||||
logger?: LoggerService;
|
||||
},
|
||||
): TaskScheduler;
|
||||
}
|
||||
```
|
||||
@@ -1,10 +0,0 @@
|
||||
apiVersion: backstage.io/v1alpha1
|
||||
kind: Component
|
||||
metadata:
|
||||
name: backstage-backend-tasks
|
||||
title: '@backstage/backend-tasks'
|
||||
description: Common distributed task management library for Backstage backends
|
||||
spec:
|
||||
lifecycle: experimental
|
||||
type: backstage-node-library
|
||||
owner: maintainers
|
||||
@@ -1,2 +0,0 @@
|
||||
# Knip report
|
||||
|
||||
@@ -1,64 +0,0 @@
|
||||
/*
|
||||
* Copyright 2020 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
// @ts-check
|
||||
|
||||
/**
|
||||
* @param {import('knex').Knex} knex
|
||||
*/
|
||||
exports.up = async function up(knex) {
|
||||
//
|
||||
// tasks
|
||||
//
|
||||
await knex.schema.createTable('backstage_backend_tasks__tasks', table => {
|
||||
table.comment('Tasks used for scheduling work on multiple workers');
|
||||
table
|
||||
.string('id')
|
||||
.primary()
|
||||
.notNullable()
|
||||
.comment('The unique ID of this particular task');
|
||||
table
|
||||
.text('settings_json')
|
||||
.notNullable()
|
||||
.comment('JSON serialized object with properties for this task');
|
||||
table
|
||||
.dateTime('next_run_start_at')
|
||||
.notNullable()
|
||||
.comment('The next time that the task should be started');
|
||||
table
|
||||
.text('current_run_ticket')
|
||||
.nullable()
|
||||
.comment('A unique ticket for the current task run');
|
||||
table
|
||||
.dateTime('current_run_started_at')
|
||||
.nullable()
|
||||
.comment('The time that the current task run started');
|
||||
table
|
||||
.dateTime('current_run_expires_at')
|
||||
.nullable()
|
||||
.comment('The time that the current task run will time out');
|
||||
});
|
||||
};
|
||||
|
||||
/**
|
||||
* @param {import('knex').Knex} knex
|
||||
*/
|
||||
exports.down = async function down(knex) {
|
||||
//
|
||||
// tasks
|
||||
//
|
||||
await knex.schema.dropTable('backstage_backend_tasks__tasks');
|
||||
};
|
||||
@@ -1,41 +0,0 @@
|
||||
/*
|
||||
* Copyright 2024 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
// @ts-check
|
||||
|
||||
/**
|
||||
* @param { import("knex").Knex } knex
|
||||
* @returns { Promise<void> }
|
||||
*/
|
||||
exports.up = async function up(knex) {
|
||||
await knex.schema.alterTable('backstage_backend_tasks__tasks', table => {
|
||||
table.setNullable('next_run_start_at');
|
||||
});
|
||||
};
|
||||
|
||||
/**
|
||||
* @param { import("knex").Knex } knex
|
||||
* @returns { Promise<void> }
|
||||
*/
|
||||
exports.down = async function down(knex) {
|
||||
await knex
|
||||
.delete()
|
||||
.from('backstage_backend_tasks__tasks')
|
||||
.where({ next_run_start_at: null });
|
||||
await knex.schema.alterTable('backstage_backend_tasks__tasks', table => {
|
||||
table.dropNullable('next_run_start_at');
|
||||
});
|
||||
};
|
||||
@@ -1,58 +0,0 @@
|
||||
{
|
||||
"name": "@backstage/backend-tasks",
|
||||
"version": "0.6.2-next.0",
|
||||
"description": "Common distributed task management library for Backstage backends",
|
||||
"backstage": {
|
||||
"role": "node-library"
|
||||
},
|
||||
"publishConfig": {
|
||||
"access": "public",
|
||||
"main": "dist/index.cjs.js",
|
||||
"types": "dist/index.d.ts"
|
||||
},
|
||||
"keywords": [
|
||||
"backstage"
|
||||
],
|
||||
"homepage": "https://backstage.io",
|
||||
"repository": {
|
||||
"type": "git",
|
||||
"url": "https://github.com/backstage/backstage",
|
||||
"directory": "packages/backend-tasks"
|
||||
},
|
||||
"license": "Apache-2.0",
|
||||
"main": "src/index.ts",
|
||||
"types": "src/index.ts",
|
||||
"files": [
|
||||
"dist",
|
||||
"migrations/**/*.{js,d.ts}"
|
||||
],
|
||||
"scripts": {
|
||||
"build": "backstage-cli package build",
|
||||
"clean": "backstage-cli package clean",
|
||||
"lint": "backstage-cli package lint",
|
||||
"prepack": "backstage-cli package prepack",
|
||||
"postpack": "backstage-cli package postpack",
|
||||
"start": "backstage-cli package start",
|
||||
"test": "backstage-cli package test"
|
||||
},
|
||||
"dependencies": {
|
||||
"@backstage/backend-common": "workspace:^",
|
||||
"@backstage/backend-plugin-api": "workspace:^",
|
||||
"@backstage/config": "workspace:^",
|
||||
"@backstage/errors": "workspace:^",
|
||||
"@backstage/types": "workspace:^",
|
||||
"@opentelemetry/api": "^1.3.0",
|
||||
"@types/luxon": "^3.0.0",
|
||||
"cron": "^3.0.0",
|
||||
"knex": "^3.0.0",
|
||||
"lodash": "^4.17.21",
|
||||
"luxon": "^3.0.0",
|
||||
"uuid": "^9.0.0",
|
||||
"zod": "^3.22.4"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@backstage/backend-test-utils": "workspace:^",
|
||||
"@backstage/cli": "workspace:^",
|
||||
"wait-for-expect": "^3.0.2"
|
||||
}
|
||||
}
|
||||
@@ -1,31 +0,0 @@
|
||||
/*
|
||||
* Copyright 2021 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { resolvePackagePath } from '@backstage/backend-plugin-api';
|
||||
import { Knex } from 'knex';
|
||||
import { DB_MIGRATIONS_TABLE } from './tables';
|
||||
|
||||
export async function migrateBackendTasks(knex: Knex): Promise<void> {
|
||||
const migrationsDir = resolvePackagePath(
|
||||
'@backstage/backend-tasks',
|
||||
'migrations',
|
||||
);
|
||||
|
||||
await knex.migrate.latest({
|
||||
directory: migrationsDir,
|
||||
tableName: DB_MIGRATIONS_TABLE,
|
||||
});
|
||||
}
|
||||
@@ -1,27 +0,0 @@
|
||||
/*
|
||||
* Copyright 2021 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
export const DB_MIGRATIONS_TABLE = 'backstage_backend_tasks__knex_migrations';
|
||||
export const DB_TASKS_TABLE = 'backstage_backend_tasks__tasks';
|
||||
|
||||
export type DbTasksRow = {
|
||||
id: string;
|
||||
settings_json: string;
|
||||
next_run_start_at: Date;
|
||||
current_run_ticket?: string;
|
||||
current_run_started_at?: Date | string;
|
||||
current_run_expires_at?: Date | string;
|
||||
};
|
||||
@@ -1,27 +0,0 @@
|
||||
/*
|
||||
* Copyright 2022 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { HumanDuration as TypesHumanDuration } from '@backstage/types';
|
||||
|
||||
/**
|
||||
* Human friendly durations object.
|
||||
*
|
||||
* @public
|
||||
* @deprecated Import from `@backstage/types` instead
|
||||
*/
|
||||
export type HumanDuration = TypesHumanDuration;
|
||||
|
||||
export * from './tasks';
|
||||
@@ -1,27 +0,0 @@
|
||||
/*
|
||||
* Copyright 2020 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/**
|
||||
* Common distributed task management library for Backstage backends
|
||||
*
|
||||
* @remarks
|
||||
* This package is deprecated and will be removed in a near future.
|
||||
* Please migrate to the new backend system, and depend on `coreServices.scheduler` from `@backstage/backend-plugin-api` instead, or use `DefaultSchedulerService` from `@backstage/backend-defaults`.
|
||||
*
|
||||
* @packageDocumentation
|
||||
*/
|
||||
|
||||
export * from './deprecated';
|
||||
@@ -1,123 +0,0 @@
|
||||
/*
|
||||
* Copyright 2022 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { Knex } from 'knex';
|
||||
import { TestDatabases } from '@backstage/backend-test-utils';
|
||||
import fs from 'fs';
|
||||
|
||||
const migrationsDir = `${__dirname}/../migrations`;
|
||||
const migrationsFiles = fs.readdirSync(migrationsDir).sort();
|
||||
|
||||
async function migrateUpOnce(knex: Knex): Promise<void> {
|
||||
await knex.migrate.up({ directory: migrationsDir });
|
||||
}
|
||||
|
||||
async function migrateDownOnce(knex: Knex): Promise<void> {
|
||||
await knex.migrate.down({ directory: migrationsDir });
|
||||
}
|
||||
|
||||
async function migrateUntilBefore(knex: Knex, target: string): Promise<void> {
|
||||
const index = migrationsFiles.indexOf(target);
|
||||
if (index === -1) {
|
||||
throw new Error(`Migration ${target} not found`);
|
||||
}
|
||||
for (let i = 0; i < index; i++) {
|
||||
await migrateUpOnce(knex);
|
||||
}
|
||||
}
|
||||
|
||||
jest.setTimeout(60_000);
|
||||
|
||||
describe('migrations', () => {
|
||||
const databases = TestDatabases.create();
|
||||
|
||||
it.each(databases.eachSupportedId())(
|
||||
'20210928160613_init.js, %p',
|
||||
async databaseId => {
|
||||
const knex = await databases.init(databaseId);
|
||||
|
||||
await migrateUntilBefore(knex, '20210928160613_init.js');
|
||||
await migrateUpOnce(knex);
|
||||
|
||||
await knex('backstage_backend_tasks__tasks').insert({
|
||||
id: 'test',
|
||||
settings_json: '{}',
|
||||
next_run_start_at: knex.fn.now(),
|
||||
});
|
||||
|
||||
await expect(knex('backstage_backend_tasks__tasks')).resolves.toEqual([
|
||||
{
|
||||
id: 'test',
|
||||
settings_json: '{}',
|
||||
next_run_start_at: expect.anything(),
|
||||
current_run_ticket: null,
|
||||
current_run_started_at: null,
|
||||
current_run_expires_at: null,
|
||||
},
|
||||
]);
|
||||
|
||||
await migrateDownOnce(knex);
|
||||
|
||||
// This looks odd - you might expect a .toThrow at the end but that
|
||||
// actually is flaky for some reason specifically on sqlite when
|
||||
// performing multiple runs in sequence
|
||||
await expect(knex('backstage_backend_tasks__tasks')).rejects.toEqual(
|
||||
expect.anything(),
|
||||
);
|
||||
|
||||
await knex.destroy();
|
||||
},
|
||||
);
|
||||
|
||||
it.each(databases.eachSupportedId())(
|
||||
'20240712211735_nullable_next_run.js, %p',
|
||||
async databaseId => {
|
||||
const knex = await databases.init(databaseId);
|
||||
|
||||
await migrateUntilBefore(knex, '20240712211735_nullable_next_run.js');
|
||||
await migrateUpOnce(knex);
|
||||
|
||||
await knex('backstage_backend_tasks__tasks').insert({
|
||||
id: 'test',
|
||||
settings_json: '{}',
|
||||
next_run_start_at: knex.raw('null'),
|
||||
});
|
||||
|
||||
await expect(knex('backstage_backend_tasks__tasks')).resolves.toEqual([
|
||||
{
|
||||
id: 'test',
|
||||
settings_json: '{}',
|
||||
next_run_start_at: null,
|
||||
current_run_ticket: null,
|
||||
current_run_started_at: null,
|
||||
current_run_expires_at: null,
|
||||
},
|
||||
]);
|
||||
|
||||
await migrateDownOnce(knex);
|
||||
|
||||
await expect(
|
||||
knex('backstage_backend_tasks__tasks').insert({
|
||||
id: 'test',
|
||||
settings_json: '{}',
|
||||
next_run_start_at: knex.raw('null'),
|
||||
}),
|
||||
).rejects.toEqual(expect.anything());
|
||||
|
||||
await knex.destroy();
|
||||
},
|
||||
);
|
||||
});
|
||||
@@ -1,25 +0,0 @@
|
||||
/*
|
||||
* Copyright 2020 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { TestDatabases } from '@backstage/backend-test-utils';
|
||||
import { Settings } from 'luxon';
|
||||
|
||||
// TS still thinks that methods can return null / placeholders, but we still want to throw as soon as possible when things go wrong
|
||||
Settings.throwOnInvalid = true;
|
||||
|
||||
TestDatabases.setDefaults({
|
||||
ids: ['MYSQL_8', 'POSTGRES_16', 'POSTGRES_12', 'SQLITE_3'],
|
||||
});
|
||||
@@ -1,109 +0,0 @@
|
||||
/*
|
||||
* Copyright 2022 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { LocalTaskWorker } from './LocalTaskWorker';
|
||||
import { mockServices } from '@backstage/backend-test-utils';
|
||||
|
||||
describe('LocalTaskWorker', () => {
|
||||
const logger = mockServices.logger.mock();
|
||||
|
||||
it('runs the happy path (with iso duration) and handles cancellation', async () => {
|
||||
const fn = jest.fn();
|
||||
const controller = new AbortController();
|
||||
|
||||
const worker = new LocalTaskWorker('a', fn, logger);
|
||||
worker.start(
|
||||
{
|
||||
version: 2,
|
||||
initialDelayDuration: 'PT0.2S',
|
||||
cadence: 'PT0.2S',
|
||||
timeoutAfterDuration: 'PT1S',
|
||||
},
|
||||
{ signal: controller.signal },
|
||||
);
|
||||
|
||||
// TODO(freben): Rewrite to fake timers - tried, but it wouldn't work
|
||||
expect(fn).toHaveBeenCalledTimes(0);
|
||||
await new Promise(r => setTimeout(r, 100));
|
||||
expect(fn).toHaveBeenCalledTimes(0);
|
||||
await new Promise(r => setTimeout(r, 200));
|
||||
expect(fn).toHaveBeenCalledTimes(1);
|
||||
await new Promise(r => setTimeout(r, 200));
|
||||
expect(fn).toHaveBeenCalledTimes(2);
|
||||
controller.abort();
|
||||
await new Promise(r => setTimeout(r, 200));
|
||||
expect(fn).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it('runs the happy path (with a cron expression) and handles cancellation', async () => {
|
||||
const fn = jest.fn();
|
||||
const controller = new AbortController();
|
||||
|
||||
// Await until system time is just past a second boundary (since cron is
|
||||
// wall clock based)
|
||||
await new Promise(r => setTimeout(r, 1000 - (Date.now() % 1000) + 10));
|
||||
|
||||
const worker = new LocalTaskWorker('a', fn, logger);
|
||||
worker.start(
|
||||
{
|
||||
version: 2,
|
||||
initialDelayDuration: 'PT0.2S',
|
||||
cadence: '* * * * * *',
|
||||
timeoutAfterDuration: 'PT1S',
|
||||
},
|
||||
{ signal: controller.signal },
|
||||
);
|
||||
|
||||
// TODO(freben): Rewrite to fake timers - tried, but it wouldn't work
|
||||
expect(fn).toHaveBeenCalledTimes(0);
|
||||
await new Promise(r => setTimeout(r, 100));
|
||||
expect(fn).toHaveBeenCalledTimes(0);
|
||||
await new Promise(r => setTimeout(r, 200));
|
||||
expect(fn).toHaveBeenCalledTimes(1);
|
||||
await new Promise(r => setTimeout(r, 1000));
|
||||
expect(fn).toHaveBeenCalledTimes(2);
|
||||
controller.abort();
|
||||
await new Promise(r => setTimeout(r, 1000));
|
||||
expect(fn).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it('can trigger to abort wait', async () => {
|
||||
const fn = jest.fn();
|
||||
const controller = new AbortController();
|
||||
|
||||
const worker = new LocalTaskWorker('a', fn, logger);
|
||||
worker.start(
|
||||
{
|
||||
version: 2,
|
||||
initialDelayDuration: 'PT0.2S',
|
||||
cadence: 'PT0.2S',
|
||||
timeoutAfterDuration: 'PT1S',
|
||||
},
|
||||
{ signal: controller.signal },
|
||||
);
|
||||
|
||||
// TODO(freben): Rewrite to fake timers - tried, but it wouldn't work
|
||||
expect(fn).toHaveBeenCalledTimes(0);
|
||||
await new Promise(r => setTimeout(r, 100));
|
||||
expect(fn).toHaveBeenCalledTimes(0);
|
||||
await new Promise(r => setTimeout(r, 200));
|
||||
expect(fn).toHaveBeenCalledTimes(1);
|
||||
worker.trigger();
|
||||
await new Promise(r => setTimeout(r, 10));
|
||||
expect(fn).toHaveBeenCalledTimes(2);
|
||||
controller.abort();
|
||||
});
|
||||
});
|
||||
@@ -1,154 +0,0 @@
|
||||
/*
|
||||
* Copyright 2022 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { ConflictError } from '@backstage/errors';
|
||||
import { CronTime } from 'cron';
|
||||
import { DateTime, Duration } from 'luxon';
|
||||
import { TaskFunction, TaskSettingsV2 } from './types';
|
||||
import { delegateAbortController, sleep } from './util';
|
||||
import { LoggerService } from '@backstage/backend-plugin-api';
|
||||
|
||||
/**
|
||||
* Implements tasks that run locally without cross-host collaboration.
|
||||
*
|
||||
* @private
|
||||
*/
|
||||
export class LocalTaskWorker {
|
||||
private abortWait: AbortController | undefined;
|
||||
|
||||
constructor(
|
||||
private readonly taskId: string,
|
||||
private readonly fn: TaskFunction,
|
||||
private readonly logger: LoggerService,
|
||||
) {}
|
||||
|
||||
start(settings: TaskSettingsV2, options?: { signal?: AbortSignal }) {
|
||||
this.logger.info(
|
||||
`Task worker starting: ${this.taskId}, ${JSON.stringify(settings)}`,
|
||||
);
|
||||
|
||||
(async () => {
|
||||
let attemptNum = 1;
|
||||
for (;;) {
|
||||
try {
|
||||
if (settings.initialDelayDuration) {
|
||||
await this.sleep(
|
||||
Duration.fromISO(settings.initialDelayDuration),
|
||||
options?.signal,
|
||||
);
|
||||
}
|
||||
|
||||
while (!options?.signal?.aborted) {
|
||||
const startTime = process.hrtime();
|
||||
await this.runOnce(settings, options?.signal);
|
||||
const timeTaken = process.hrtime(startTime);
|
||||
await this.waitUntilNext(
|
||||
settings,
|
||||
(timeTaken[0] + timeTaken[1] / 1e9) * 1000,
|
||||
options?.signal,
|
||||
);
|
||||
}
|
||||
|
||||
this.logger.info(`Task worker finished: ${this.taskId}`);
|
||||
attemptNum = 0;
|
||||
break;
|
||||
} catch (e) {
|
||||
attemptNum += 1;
|
||||
this.logger.warn(
|
||||
`Task worker failed unexpectedly, attempt number ${attemptNum}, ${e}`,
|
||||
);
|
||||
await sleep(Duration.fromObject({ seconds: 1 }));
|
||||
}
|
||||
}
|
||||
})();
|
||||
}
|
||||
|
||||
trigger(): void {
|
||||
if (!this.abortWait) {
|
||||
throw new ConflictError(`Task ${this.taskId} is currently running`);
|
||||
}
|
||||
this.abortWait.abort();
|
||||
}
|
||||
|
||||
/**
|
||||
* Makes a single attempt at running the task to completion.
|
||||
*/
|
||||
private async runOnce(
|
||||
settings: TaskSettingsV2,
|
||||
signal?: AbortSignal,
|
||||
): Promise<void> {
|
||||
// Abort the task execution either if the worker is stopped, or if the
|
||||
// task timeout is hit
|
||||
const taskAbortController = delegateAbortController(signal);
|
||||
const timeoutHandle = setTimeout(() => {
|
||||
taskAbortController.abort();
|
||||
}, Duration.fromISO(settings.timeoutAfterDuration).as('milliseconds'));
|
||||
|
||||
try {
|
||||
await this.fn(taskAbortController.signal);
|
||||
} catch (e) {
|
||||
// ignore intentionally
|
||||
}
|
||||
|
||||
// release resources
|
||||
clearTimeout(timeoutHandle);
|
||||
taskAbortController.abort();
|
||||
}
|
||||
|
||||
/**
|
||||
* Sleeps until it's time to run the task again.
|
||||
*/
|
||||
private async waitUntilNext(
|
||||
settings: TaskSettingsV2,
|
||||
lastRunMillis: number,
|
||||
signal?: AbortSignal,
|
||||
) {
|
||||
if (signal?.aborted) {
|
||||
return;
|
||||
}
|
||||
|
||||
const isCron = !settings.cadence.startsWith('P');
|
||||
let dt: number;
|
||||
|
||||
if (isCron) {
|
||||
const nextRun = +new CronTime(settings.cadence).sendAt().toJSDate();
|
||||
dt = nextRun - Date.now();
|
||||
} else {
|
||||
dt =
|
||||
Duration.fromISO(settings.cadence).as('milliseconds') - lastRunMillis;
|
||||
}
|
||||
|
||||
dt = Math.max(dt, 0);
|
||||
|
||||
this.logger.debug(
|
||||
`task: ${this.taskId} will next occur around ${DateTime.now().plus(
|
||||
Duration.fromMillis(dt),
|
||||
)}`,
|
||||
);
|
||||
|
||||
await this.sleep(Duration.fromMillis(dt), signal);
|
||||
}
|
||||
|
||||
private async sleep(
|
||||
duration: Duration,
|
||||
abortSignal?: AbortSignal,
|
||||
): Promise<void> {
|
||||
this.abortWait = delegateAbortController(abortSignal);
|
||||
await sleep(duration, this.abortWait.signal);
|
||||
this.abortWait.abort(); // cleans up resources
|
||||
this.abortWait = undefined;
|
||||
}
|
||||
}
|
||||
@@ -1,351 +0,0 @@
|
||||
/*
|
||||
* Copyright 2021 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import {
|
||||
TestDatabaseId,
|
||||
TestDatabases,
|
||||
mockServices,
|
||||
} from '@backstage/backend-test-utils';
|
||||
import { ConflictError, NotFoundError } from '@backstage/errors';
|
||||
import { Duration } from 'luxon';
|
||||
import { migrateBackendTasks } from '../database/migrateBackendTasks';
|
||||
import {
|
||||
parseDuration,
|
||||
PluginTaskSchedulerImpl,
|
||||
} from './PluginTaskSchedulerImpl';
|
||||
|
||||
function defer() {
|
||||
let resolve = () => {};
|
||||
const promise = new Promise<void>(_resolve => {
|
||||
resolve = _resolve;
|
||||
});
|
||||
return { promise, resolve };
|
||||
}
|
||||
|
||||
jest.setTimeout(60_000);
|
||||
|
||||
describe('PluginTaskManagerImpl', () => {
|
||||
const databases = TestDatabases.create({
|
||||
ids: ['POSTGRES_16', 'POSTGRES_12', 'SQLITE_3'],
|
||||
});
|
||||
|
||||
beforeAll(async () => {
|
||||
// Make sure all databases are running before mocking timers, in case of testcontainers
|
||||
await Promise.all(
|
||||
databases.eachSupportedId().map(([id]) => databases.init(id)),
|
||||
);
|
||||
|
||||
jest.useFakeTimers();
|
||||
}, 60_000);
|
||||
|
||||
async function init(databaseId: TestDatabaseId) {
|
||||
const knex = await databases.init(databaseId);
|
||||
await migrateBackendTasks(knex);
|
||||
const logger = mockServices.logger.mock();
|
||||
const manager = new PluginTaskSchedulerImpl(async () => knex, logger);
|
||||
return { knex, manager };
|
||||
}
|
||||
|
||||
// This is just to test the wrapper code; most of the actual tests are in
|
||||
// TaskWorker.test.ts
|
||||
describe('scheduleTask with global scope', () => {
|
||||
it.each(databases.eachSupportedId())(
|
||||
'can run the v1 happy path, %p',
|
||||
async databaseId => {
|
||||
const { manager } = await init(databaseId);
|
||||
|
||||
const fn = jest.fn();
|
||||
const promise = new Promise(resolve => fn.mockImplementation(resolve));
|
||||
await manager.scheduleTask({
|
||||
id: 'task1',
|
||||
timeout: Duration.fromMillis(5000),
|
||||
frequency: Duration.fromMillis(5000),
|
||||
fn,
|
||||
scope: 'global',
|
||||
});
|
||||
|
||||
await promise;
|
||||
expect(fn).toHaveBeenCalledWith(expect.any(AbortSignal));
|
||||
},
|
||||
);
|
||||
|
||||
it.each(databases.eachSupportedId())(
|
||||
'can run the v2 happy path, %p',
|
||||
async databaseId => {
|
||||
const { manager } = await init(databaseId);
|
||||
|
||||
const fn = jest.fn();
|
||||
const promise = new Promise(resolve => fn.mockImplementation(resolve));
|
||||
await manager.scheduleTask({
|
||||
id: 'task2',
|
||||
timeout: Duration.fromMillis(5000),
|
||||
frequency: { cron: '* * * * * *' },
|
||||
fn,
|
||||
scope: 'global',
|
||||
});
|
||||
|
||||
await promise;
|
||||
expect(fn).toHaveBeenCalledWith(expect.any(AbortSignal));
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
describe('triggerTask with global scope', () => {
|
||||
it.each(databases.eachSupportedId())(
|
||||
'can manually trigger a task, %p',
|
||||
async databaseId => {
|
||||
const { manager } = await init(databaseId);
|
||||
|
||||
const fn = jest.fn();
|
||||
const promise = new Promise(resolve => fn.mockImplementation(resolve));
|
||||
await manager.scheduleTask({
|
||||
id: 'task1',
|
||||
timeout: Duration.fromMillis(5000),
|
||||
frequency: Duration.fromObject({ years: 1 }),
|
||||
initialDelay: Duration.fromObject({ years: 1 }),
|
||||
fn,
|
||||
scope: 'global',
|
||||
});
|
||||
|
||||
await manager.triggerTask('task1');
|
||||
jest.advanceTimersByTime(5000);
|
||||
|
||||
await promise;
|
||||
expect(fn).toHaveBeenCalledWith(expect.any(AbortSignal));
|
||||
},
|
||||
);
|
||||
|
||||
it.each(databases.eachSupportedId())(
|
||||
'cant trigger a non-existent task, %p',
|
||||
async databaseId => {
|
||||
const { manager } = await init(databaseId);
|
||||
|
||||
const fn = jest.fn();
|
||||
await manager.scheduleTask({
|
||||
id: 'task1',
|
||||
timeout: Duration.fromMillis(5000),
|
||||
frequency: Duration.fromObject({ years: 1 }),
|
||||
fn,
|
||||
scope: 'global',
|
||||
});
|
||||
|
||||
await expect(() => manager.triggerTask('task2')).rejects.toThrow(
|
||||
NotFoundError,
|
||||
);
|
||||
},
|
||||
);
|
||||
|
||||
it.each(databases.eachSupportedId())(
|
||||
'cant trigger a running task, %p',
|
||||
async databaseId => {
|
||||
const { manager } = await init(databaseId);
|
||||
|
||||
const { promise, resolve } = defer();
|
||||
|
||||
await manager.scheduleTask({
|
||||
id: 'task1',
|
||||
timeout: Duration.fromMillis(5000),
|
||||
frequency: Duration.fromObject({ years: 1 }),
|
||||
fn: async () => {
|
||||
resolve();
|
||||
await new Promise(r => setTimeout(r, 20000));
|
||||
},
|
||||
scope: 'global',
|
||||
});
|
||||
|
||||
await promise;
|
||||
await expect(() => manager.triggerTask('task1')).rejects.toThrow(
|
||||
ConflictError,
|
||||
);
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
// This is just to test the wrapper code; most of the actual tests are in
|
||||
// TaskWorker.test.ts
|
||||
describe('scheduleTask with local scope', () => {
|
||||
it('can run the v1 happy path', async () => {
|
||||
const { manager } = await init('SQLITE_3');
|
||||
|
||||
const fn = jest.fn();
|
||||
const promise = new Promise(resolve => fn.mockImplementation(resolve));
|
||||
await manager.scheduleTask({
|
||||
id: 'task1',
|
||||
timeout: { milliseconds: 5000 },
|
||||
frequency: { milliseconds: 5000 },
|
||||
fn,
|
||||
scope: 'local',
|
||||
});
|
||||
|
||||
await promise;
|
||||
expect(fn).toHaveBeenCalledWith(expect.any(AbortSignal));
|
||||
}, 60_000);
|
||||
|
||||
it('can run the v2 happy path', async () => {
|
||||
const { manager } = await init('SQLITE_3');
|
||||
|
||||
const fn = jest.fn();
|
||||
const promise = new Promise(resolve => fn.mockImplementation(resolve));
|
||||
await manager.scheduleTask({
|
||||
id: 'task2',
|
||||
timeout: Duration.fromMillis(5000),
|
||||
frequency: { cron: '* * * * * *' },
|
||||
fn,
|
||||
scope: 'local',
|
||||
});
|
||||
|
||||
await promise;
|
||||
expect(fn).toHaveBeenCalledWith(expect.any(AbortSignal));
|
||||
}, 60_000);
|
||||
});
|
||||
|
||||
describe('triggerTask with local scope', () => {
|
||||
it('can manually trigger a task', async () => {
|
||||
const { manager } = await init('SQLITE_3');
|
||||
|
||||
const fn = jest.fn();
|
||||
const promise = new Promise(resolve => fn.mockImplementation(resolve));
|
||||
await manager.scheduleTask({
|
||||
id: 'task1',
|
||||
timeout: Duration.fromMillis(5000),
|
||||
frequency: Duration.fromObject({ years: 1 }),
|
||||
initialDelay: Duration.fromObject({ years: 1 }),
|
||||
fn,
|
||||
scope: 'local',
|
||||
});
|
||||
|
||||
await manager.triggerTask('task1');
|
||||
jest.advanceTimersByTime(5000);
|
||||
|
||||
await promise;
|
||||
expect(fn).toHaveBeenCalledWith(expect.any(AbortSignal));
|
||||
}, 60_000);
|
||||
|
||||
it('cant trigger a non-existent task', async () => {
|
||||
const { manager } = await init('SQLITE_3');
|
||||
|
||||
const fn = jest.fn();
|
||||
await manager.scheduleTask({
|
||||
id: 'task1',
|
||||
timeout: Duration.fromMillis(5000),
|
||||
frequency: Duration.fromObject({ years: 1 }),
|
||||
fn,
|
||||
scope: 'local',
|
||||
});
|
||||
|
||||
await expect(() => manager.triggerTask('task2')).rejects.toThrow(
|
||||
NotFoundError,
|
||||
);
|
||||
}, 60_000);
|
||||
|
||||
it('cant trigger a running task', async () => {
|
||||
const { manager } = await init('SQLITE_3');
|
||||
|
||||
const { promise, resolve } = defer();
|
||||
|
||||
await manager.scheduleTask({
|
||||
id: 'task1',
|
||||
timeout: Duration.fromMillis(5000),
|
||||
frequency: Duration.fromObject({ years: 1 }),
|
||||
fn: async () => {
|
||||
resolve();
|
||||
await new Promise(r => setTimeout(r, 20000));
|
||||
},
|
||||
scope: 'local',
|
||||
});
|
||||
|
||||
await promise;
|
||||
await expect(() => manager.triggerTask('task1')).rejects.toThrow(
|
||||
ConflictError,
|
||||
);
|
||||
}, 60_000);
|
||||
});
|
||||
|
||||
// This is just to test the wrapper code; most of the actual tests are in
|
||||
// TaskWorker.test.ts
|
||||
describe('createScheduledTaskRunner', () => {
|
||||
it.each(databases.eachSupportedId())(
|
||||
'can run the happy path, %p',
|
||||
async databaseId => {
|
||||
const { manager } = await init(databaseId);
|
||||
|
||||
const fn = jest.fn();
|
||||
const promise = new Promise(resolve => fn.mockImplementation(resolve));
|
||||
await manager
|
||||
.createScheduledTaskRunner({
|
||||
timeout: Duration.fromMillis(5000),
|
||||
frequency: Duration.fromMillis(5000),
|
||||
scope: 'global',
|
||||
})
|
||||
.run({
|
||||
id: 'task1',
|
||||
fn,
|
||||
});
|
||||
|
||||
await promise;
|
||||
expect(fn).toHaveBeenCalledWith(expect.any(AbortSignal));
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
describe('can fetch task ids', () => {
|
||||
it.each(databases.eachSupportedId())(
|
||||
'can fetch both global and local task ids, %p',
|
||||
async databaseId => {
|
||||
const { manager } = await init(databaseId);
|
||||
const fn = jest.fn();
|
||||
|
||||
await manager.scheduleTask({
|
||||
id: 'task1',
|
||||
timeout: Duration.fromMillis(5000),
|
||||
frequency: Duration.fromMillis(5000),
|
||||
fn,
|
||||
scope: 'global',
|
||||
});
|
||||
|
||||
await manager.scheduleTask({
|
||||
id: 'task2',
|
||||
timeout: Duration.fromMillis(5000),
|
||||
frequency: Duration.fromMillis(5000),
|
||||
fn,
|
||||
scope: 'local',
|
||||
});
|
||||
|
||||
await expect(manager.getScheduledTasks()).resolves.toEqual([
|
||||
{
|
||||
id: 'task1',
|
||||
scope: 'global',
|
||||
settings: expect.objectContaining({ cadence: 'PT5S' }),
|
||||
},
|
||||
{
|
||||
id: 'task2',
|
||||
scope: 'local',
|
||||
settings: expect.objectContaining({ cadence: 'PT5S' }),
|
||||
},
|
||||
]);
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
describe('parseDuration', () => {
|
||||
it('should parse durations', () => {
|
||||
expect(parseDuration({ milliseconds: 5000 })).toEqual('PT5S');
|
||||
expect(parseDuration(Duration.fromMillis(5000))).toEqual('PT5S');
|
||||
expect(parseDuration({ cron: '1 * * * *' })).toEqual('1 * * * *');
|
||||
expect(parseDuration({ trigger: 'manual' })).toEqual('manual');
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -1,170 +0,0 @@
|
||||
/*
|
||||
* Copyright 2021 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { Knex } from 'knex';
|
||||
import { Duration } from 'luxon';
|
||||
import { LocalTaskWorker } from './LocalTaskWorker';
|
||||
import { TaskWorker } from './TaskWorker';
|
||||
import {
|
||||
PluginTaskScheduler,
|
||||
TaskDescriptor,
|
||||
TaskFunction,
|
||||
TaskInvocationDefinition,
|
||||
TaskRunner,
|
||||
TaskScheduleDefinition,
|
||||
TaskSettingsV2,
|
||||
} from './types';
|
||||
import { validateId } from './util';
|
||||
import { Counter, Histogram, metrics } from '@opentelemetry/api';
|
||||
import { LoggerService } from '@backstage/backend-plugin-api';
|
||||
|
||||
/**
|
||||
* Implements the actual task management.
|
||||
*/
|
||||
export class PluginTaskSchedulerImpl implements PluginTaskScheduler {
|
||||
private readonly localTasksById = new Map<string, LocalTaskWorker>();
|
||||
private readonly allScheduledTasks: TaskDescriptor[] = [];
|
||||
|
||||
private readonly counter: Counter;
|
||||
private readonly duration: Histogram;
|
||||
|
||||
constructor(
|
||||
private readonly databaseFactory: () => Promise<Knex>,
|
||||
private readonly logger: LoggerService,
|
||||
) {
|
||||
const meter = metrics.getMeter('default');
|
||||
this.counter = meter.createCounter('backend_tasks.task.runs.count', {
|
||||
description: 'Total number of times a task has been run',
|
||||
});
|
||||
this.duration = meter.createHistogram('backend_tasks.task.runs.duration', {
|
||||
description: 'Histogram of task run durations',
|
||||
unit: 'seconds',
|
||||
});
|
||||
}
|
||||
|
||||
async triggerTask(id: string): Promise<void> {
|
||||
const localTask = this.localTasksById.get(id);
|
||||
if (localTask) {
|
||||
localTask.trigger();
|
||||
return;
|
||||
}
|
||||
|
||||
const knex = await this.databaseFactory();
|
||||
await TaskWorker.trigger(knex, id);
|
||||
}
|
||||
|
||||
async scheduleTask(
|
||||
task: TaskScheduleDefinition & TaskInvocationDefinition,
|
||||
): Promise<void> {
|
||||
validateId(task.id);
|
||||
const scope = task.scope ?? 'global';
|
||||
|
||||
const settings: TaskSettingsV2 = {
|
||||
version: 2,
|
||||
cadence: parseDuration(task.frequency),
|
||||
initialDelayDuration:
|
||||
task.initialDelay && parseDuration(task.initialDelay),
|
||||
timeoutAfterDuration: parseDuration(task.timeout),
|
||||
};
|
||||
|
||||
if (scope === 'global') {
|
||||
const knex = await this.databaseFactory();
|
||||
const worker = new TaskWorker(
|
||||
task.id,
|
||||
this.wrapInMetrics(task.fn, { labels: { taskId: task.id, scope } }),
|
||||
knex,
|
||||
this.logger.child({ task: task.id }),
|
||||
);
|
||||
await worker.start(settings, { signal: task.signal });
|
||||
} else {
|
||||
const worker = new LocalTaskWorker(
|
||||
task.id,
|
||||
this.wrapInMetrics(task.fn, { labels: { taskId: task.id, scope } }),
|
||||
this.logger.child({ task: task.id }),
|
||||
);
|
||||
worker.start(settings, { signal: task.signal });
|
||||
this.localTasksById.set(task.id, worker);
|
||||
}
|
||||
|
||||
this.allScheduledTasks.push({
|
||||
id: task.id,
|
||||
scope: scope,
|
||||
settings: settings,
|
||||
});
|
||||
}
|
||||
|
||||
createScheduledTaskRunner(schedule: TaskScheduleDefinition): TaskRunner {
|
||||
return {
|
||||
run: async task => {
|
||||
await this.scheduleTask({ ...task, ...schedule });
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
async getScheduledTasks(): Promise<TaskDescriptor[]> {
|
||||
return this.allScheduledTasks;
|
||||
}
|
||||
|
||||
private wrapInMetrics(
|
||||
fn: TaskFunction,
|
||||
opts: { labels: Record<string, string> },
|
||||
): TaskFunction {
|
||||
return async abort => {
|
||||
const labels = {
|
||||
...opts.labels,
|
||||
};
|
||||
this.counter.add(1, { ...labels, result: 'started' });
|
||||
|
||||
const startTime = process.hrtime();
|
||||
|
||||
try {
|
||||
await fn(abort);
|
||||
labels.result = 'completed';
|
||||
} catch (ex) {
|
||||
labels.result = 'failed';
|
||||
throw ex;
|
||||
} finally {
|
||||
const delta = process.hrtime(startTime);
|
||||
const endTime = delta[0] + delta[1] / 1e9;
|
||||
this.counter.add(1, labels);
|
||||
this.duration.record(endTime, labels);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
export function parseDuration(
|
||||
frequency: TaskScheduleDefinition['frequency'],
|
||||
): string {
|
||||
if ('cron' in frequency) {
|
||||
return frequency.cron;
|
||||
}
|
||||
if ('trigger' in frequency) {
|
||||
return frequency.trigger;
|
||||
}
|
||||
|
||||
const parsed = Duration.isDuration(frequency)
|
||||
? frequency
|
||||
: Duration.fromObject(frequency);
|
||||
|
||||
if (!parsed.isValid) {
|
||||
throw new Error(
|
||||
`Invalid duration, ${parsed.invalidReason}: ${parsed.invalidExplanation}`,
|
||||
);
|
||||
}
|
||||
|
||||
return parsed.toISO()!;
|
||||
}
|
||||
@@ -1,95 +0,0 @@
|
||||
/*
|
||||
* Copyright 2021 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { TestDatabases, mockServices } from '@backstage/backend-test-utils';
|
||||
import { Knex } from 'knex';
|
||||
import { Duration } from 'luxon';
|
||||
import waitForExpect from 'wait-for-expect';
|
||||
import { migrateBackendTasks } from '../database/migrateBackendTasks';
|
||||
import { DbTasksRow, DB_TASKS_TABLE } from '../database/tables';
|
||||
import { PluginTaskSchedulerJanitor } from './PluginTaskSchedulerJanitor';
|
||||
import { createTestScopedSignal } from './__testUtils__/createTestScopedSignal';
|
||||
|
||||
const insertTask = async (knex: Knex, task: DbTasksRow) => {
|
||||
return knex<DbTasksRow>(DB_TASKS_TABLE)
|
||||
.insert(task)
|
||||
.onConflict('id')
|
||||
.merge(['settings_json']);
|
||||
};
|
||||
|
||||
const getTask = async (knex: Knex): Promise<DbTasksRow> => {
|
||||
return (await knex<DbTasksRow>(DB_TASKS_TABLE))[0];
|
||||
};
|
||||
|
||||
describe('PluginTaskSchedulerJanitor', () => {
|
||||
const logger = mockServices.logger.mock();
|
||||
const databases = TestDatabases.create({
|
||||
ids: [
|
||||
/* 'MYSQL_8' not supported yet */
|
||||
'POSTGRES_16',
|
||||
'POSTGRES_12',
|
||||
'SQLITE_3',
|
||||
'MYSQL_8',
|
||||
],
|
||||
});
|
||||
const testScopedSignal = createTestScopedSignal();
|
||||
|
||||
jest.setTimeout(60_000);
|
||||
|
||||
beforeEach(() => {
|
||||
jest.resetAllMocks();
|
||||
});
|
||||
|
||||
it.each(databases.eachSupportedId())(
|
||||
'Should update date if current_run_expires_at expires, %p',
|
||||
async databaseId => {
|
||||
const knex = await databases.init(databaseId);
|
||||
await migrateBackendTasks(knex);
|
||||
|
||||
const dateYesterday = new Date(
|
||||
new Date().setDate(new Date().getDate() - 1),
|
||||
);
|
||||
|
||||
await insertTask(knex, {
|
||||
id: 'task1',
|
||||
settings_json: '',
|
||||
next_run_start_at: new Date('2023-03-01 00:00:00'),
|
||||
current_run_ticket: 'ticket',
|
||||
current_run_started_at: dateYesterday,
|
||||
current_run_expires_at: dateYesterday,
|
||||
});
|
||||
|
||||
const worker = new PluginTaskSchedulerJanitor({
|
||||
waitBetweenRuns: Duration.fromObject({ milliseconds: 20 }),
|
||||
knex,
|
||||
logger,
|
||||
});
|
||||
|
||||
worker.start(testScopedSignal());
|
||||
|
||||
await waitForExpect(async () => {
|
||||
await expect(getTask(knex)).resolves.toEqual(
|
||||
expect.objectContaining({
|
||||
id: 'task1',
|
||||
current_run_ticket: null,
|
||||
current_run_started_at: null,
|
||||
current_run_expires_at: null,
|
||||
}),
|
||||
);
|
||||
});
|
||||
},
|
||||
);
|
||||
});
|
||||
@@ -1,96 +0,0 @@
|
||||
/*
|
||||
* Copyright 2021 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { Knex } from 'knex';
|
||||
import { Duration } from 'luxon';
|
||||
import { DB_TASKS_TABLE, DbTasksRow } from '../database/tables';
|
||||
import { sleep } from './util';
|
||||
import { LoggerService } from '@backstage/backend-plugin-api';
|
||||
|
||||
/**
|
||||
* Makes sure to auto-expire and clean up things that time out or for other
|
||||
* reasons should not be left lingering.
|
||||
*/
|
||||
export class PluginTaskSchedulerJanitor {
|
||||
private readonly knex: Knex;
|
||||
private readonly waitBetweenRuns: Duration;
|
||||
private readonly logger: LoggerService;
|
||||
|
||||
constructor(options: {
|
||||
knex: Knex;
|
||||
waitBetweenRuns: Duration;
|
||||
logger: LoggerService;
|
||||
}) {
|
||||
this.knex = options.knex;
|
||||
this.waitBetweenRuns = options.waitBetweenRuns;
|
||||
this.logger = options.logger;
|
||||
}
|
||||
|
||||
async start(abortSignal?: AbortSignal) {
|
||||
while (!abortSignal?.aborted) {
|
||||
try {
|
||||
await this.runOnce();
|
||||
} catch (e) {
|
||||
this.logger.warn(`Error while performing janitorial tasks, ${e}`);
|
||||
}
|
||||
|
||||
await sleep(this.waitBetweenRuns, abortSignal);
|
||||
}
|
||||
}
|
||||
|
||||
private async runOnce() {
|
||||
const dbNull = this.knex.raw('null');
|
||||
const configClient = this.knex.client.config.client;
|
||||
|
||||
let tasks: Array<{ id: string }>;
|
||||
if (configClient.includes('sqlite3') || configClient.includes('mysql')) {
|
||||
tasks = await this.knex<DbTasksRow>(DB_TASKS_TABLE)
|
||||
.select('id')
|
||||
.where('current_run_expires_at', '<', this.knex.fn.now());
|
||||
await this.knex<DbTasksRow>(DB_TASKS_TABLE)
|
||||
.whereIn(
|
||||
'id',
|
||||
tasks.map(t => t.id),
|
||||
)
|
||||
.update({
|
||||
current_run_ticket: dbNull,
|
||||
current_run_started_at: dbNull,
|
||||
current_run_expires_at: dbNull,
|
||||
});
|
||||
} else {
|
||||
tasks = await this.knex<DbTasksRow>(DB_TASKS_TABLE)
|
||||
.where('current_run_expires_at', '<', this.knex.fn.now())
|
||||
.update({
|
||||
current_run_ticket: dbNull,
|
||||
current_run_started_at: dbNull,
|
||||
current_run_expires_at: dbNull,
|
||||
})
|
||||
.returning(['id']);
|
||||
}
|
||||
|
||||
// In rare cases, knex drivers may ignore "returning", and return the number
|
||||
// of rows changed instead
|
||||
if (typeof tasks === 'number') {
|
||||
if (tasks > 0) {
|
||||
this.logger.warn(`${tasks} tasks timed out and were lost`);
|
||||
}
|
||||
} else {
|
||||
for (const { id } of tasks) {
|
||||
this.logger.warn(`Task timed out and was lost: ${id}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,88 +0,0 @@
|
||||
/*
|
||||
* Copyright 2021 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { DatabaseManager } from '@backstage/backend-common';
|
||||
import {
|
||||
TestDatabaseId,
|
||||
TestDatabases,
|
||||
mockServices,
|
||||
} from '@backstage/backend-test-utils';
|
||||
import { Duration } from 'luxon';
|
||||
import waitForExpect from 'wait-for-expect';
|
||||
import { TaskScheduler } from './TaskScheduler';
|
||||
import { createTestScopedSignal } from './__testUtils__/createTestScopedSignal';
|
||||
|
||||
jest.setTimeout(60_000);
|
||||
|
||||
describe('TaskScheduler', () => {
|
||||
const logger = mockServices.logger.mock();
|
||||
const databases = TestDatabases.create();
|
||||
const testScopedSignal = createTestScopedSignal();
|
||||
|
||||
async function createDatabase(
|
||||
databaseId: TestDatabaseId,
|
||||
): Promise<DatabaseManager> {
|
||||
const knex = await databases.init(databaseId);
|
||||
const databaseManager: Partial<DatabaseManager> = {
|
||||
forPlugin: () => ({
|
||||
getClient: async () => knex,
|
||||
}),
|
||||
};
|
||||
return databaseManager as DatabaseManager;
|
||||
}
|
||||
|
||||
it.each(databases.eachSupportedId())(
|
||||
'can return a working v1 plugin impl, %p',
|
||||
async databaseId => {
|
||||
const database = await createDatabase(databaseId);
|
||||
const manager = new TaskScheduler(database, logger).forPlugin('test');
|
||||
const fn = jest.fn();
|
||||
|
||||
await manager.scheduleTask({
|
||||
id: 'task1',
|
||||
timeout: Duration.fromMillis(5000),
|
||||
frequency: Duration.fromMillis(5000),
|
||||
signal: testScopedSignal(),
|
||||
fn,
|
||||
});
|
||||
|
||||
await waitForExpect(() => {
|
||||
expect(fn).toHaveBeenCalled();
|
||||
});
|
||||
},
|
||||
);
|
||||
|
||||
it.each(databases.eachSupportedId())(
|
||||
'can return a working v2 plugin impl, %p',
|
||||
async databaseId => {
|
||||
const database = await createDatabase(databaseId);
|
||||
const manager = new TaskScheduler(database, logger).forPlugin('test');
|
||||
const fn = jest.fn();
|
||||
|
||||
await manager.scheduleTask({
|
||||
id: 'task2',
|
||||
timeout: Duration.fromMillis(5000),
|
||||
frequency: { cron: '* * * * * *' },
|
||||
signal: testScopedSignal(),
|
||||
fn,
|
||||
});
|
||||
|
||||
await waitForExpect(() => {
|
||||
expect(fn).toHaveBeenCalled();
|
||||
});
|
||||
},
|
||||
);
|
||||
});
|
||||
@@ -1,107 +0,0 @@
|
||||
/*
|
||||
* Copyright 2021 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import {
|
||||
DatabaseManager,
|
||||
getRootLogger,
|
||||
LegacyRootDatabaseService,
|
||||
PluginDatabaseManager,
|
||||
} from '@backstage/backend-common';
|
||||
import { Config } from '@backstage/config';
|
||||
import { once } from 'lodash';
|
||||
import { Duration } from 'luxon';
|
||||
import { migrateBackendTasks } from '../database/migrateBackendTasks';
|
||||
import { PluginTaskSchedulerImpl } from './PluginTaskSchedulerImpl';
|
||||
import { PluginTaskSchedulerJanitor } from './PluginTaskSchedulerJanitor';
|
||||
import { PluginTaskScheduler } from './types';
|
||||
import { LoggerService } from '@backstage/backend-plugin-api';
|
||||
|
||||
/**
|
||||
* Deals with the scheduling of distributed tasks.
|
||||
*
|
||||
* @public
|
||||
* @deprecated Please migrate to the new backend system, and depend on `coreServices.scheduler` from `@backstage/backend-plugin-api` instead, or use `DefaultSchedulerService` from `@backstage/backend-defaults`
|
||||
*/
|
||||
export class TaskScheduler {
|
||||
/**
|
||||
* @deprecated
|
||||
* It is only used by the legacy backend system, and should not be used in the new backend system.
|
||||
*/
|
||||
static fromConfig(
|
||||
config: Config,
|
||||
options?: {
|
||||
databaseManager?: LegacyRootDatabaseService;
|
||||
logger?: LoggerService;
|
||||
},
|
||||
): TaskScheduler {
|
||||
const databaseManager =
|
||||
options?.databaseManager ?? DatabaseManager.fromConfig(config);
|
||||
const logger = (options?.logger || getRootLogger()).child({
|
||||
type: 'taskManager',
|
||||
});
|
||||
return new TaskScheduler(databaseManager, logger);
|
||||
}
|
||||
|
||||
constructor(
|
||||
private readonly databaseManager: LegacyRootDatabaseService,
|
||||
private readonly logger: LoggerService,
|
||||
) {}
|
||||
|
||||
/**
|
||||
* Instantiates a task manager instance for the given plugin.
|
||||
*
|
||||
* @param pluginId - The unique ID of the plugin, for example "catalog"
|
||||
* @returns A {@link PluginTaskScheduler} instance
|
||||
* @deprecated Please migrate to the new backend system, and depend on `coreServices.scheduler` from `@backstage/backend-plugin-api` instead, or use `DefaultSchedulerService` from `@backstage/backend-defaults`
|
||||
*/
|
||||
forPlugin(pluginId: string): PluginTaskScheduler {
|
||||
return TaskScheduler.forPlugin({
|
||||
pluginId,
|
||||
databaseManager: this.databaseManager.forPlugin(pluginId),
|
||||
logger: this.logger,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated Please migrate to the new backend system, and depend on `coreServices.scheduler` from `@backstage/backend-plugin-api` instead, or use `DefaultSchedulerService` from `@backstage/backend-defaults`
|
||||
*/
|
||||
static forPlugin(opts: {
|
||||
pluginId: string;
|
||||
databaseManager: PluginDatabaseManager;
|
||||
logger: LoggerService;
|
||||
}): PluginTaskScheduler {
|
||||
const databaseFactory = once(async () => {
|
||||
const knex = await opts.databaseManager.getClient();
|
||||
|
||||
if (!opts.databaseManager.migrations?.skip) {
|
||||
await migrateBackendTasks(knex);
|
||||
}
|
||||
|
||||
if (process.env.NODE_ENV !== 'test') {
|
||||
const janitor = new PluginTaskSchedulerJanitor({
|
||||
knex,
|
||||
waitBetweenRuns: Duration.fromObject({ minutes: 1 }),
|
||||
logger: opts.logger,
|
||||
});
|
||||
janitor.start();
|
||||
}
|
||||
|
||||
return knex;
|
||||
});
|
||||
|
||||
return new PluginTaskSchedulerImpl(databaseFactory, opts.logger);
|
||||
}
|
||||
}
|
||||
@@ -1,534 +0,0 @@
|
||||
/*
|
||||
* Copyright 2021 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { TestDatabases, mockServices } from '@backstage/backend-test-utils';
|
||||
import { Duration, DateTime } from 'luxon';
|
||||
import waitForExpect from 'wait-for-expect';
|
||||
import { migrateBackendTasks } from '../database/migrateBackendTasks';
|
||||
import { DbTasksRow, DB_TASKS_TABLE } from '../database/tables';
|
||||
import { TaskWorker } from './TaskWorker';
|
||||
import { TaskSettingsV2 } from './types';
|
||||
import { createTestScopedSignal } from './__testUtils__/createTestScopedSignal';
|
||||
|
||||
jest.setTimeout(60_000);
|
||||
|
||||
describe('TaskWorker', () => {
|
||||
const logger = mockServices.logger.mock();
|
||||
const databases = TestDatabases.create();
|
||||
const testScopedSignal = createTestScopedSignal();
|
||||
|
||||
beforeEach(() => {
|
||||
jest.resetAllMocks();
|
||||
});
|
||||
|
||||
it.each(databases.eachSupportedId())(
|
||||
'goes through the expected states, %p',
|
||||
async databaseId => {
|
||||
const knex = await databases.init(databaseId);
|
||||
await migrateBackendTasks(knex);
|
||||
|
||||
const fn = jest.fn(
|
||||
async () => new Promise<void>(resolve => setTimeout(resolve, 50)),
|
||||
);
|
||||
const settings: TaskSettingsV2 = {
|
||||
version: 2,
|
||||
cadence: '*/2 * * * * *',
|
||||
initialDelayDuration: Duration.fromObject({ seconds: 1 }).toISO()!,
|
||||
timeoutAfterDuration: Duration.fromObject({ minutes: 1 }).toISO()!,
|
||||
};
|
||||
|
||||
const worker = new TaskWorker('task1', fn, knex, logger);
|
||||
await worker.persistTask(settings);
|
||||
|
||||
let row = (await knex<DbTasksRow>(DB_TASKS_TABLE))[0];
|
||||
expect(row).toEqual(
|
||||
expect.objectContaining({
|
||||
id: 'task1',
|
||||
current_run_ticket: null,
|
||||
current_run_started_at: null,
|
||||
current_run_expires_at: null,
|
||||
}),
|
||||
);
|
||||
expect(JSON.parse(row.settings_json)).toEqual({
|
||||
version: 2,
|
||||
cadence: '*/2 * * * * *',
|
||||
initialDelayDuration: 'PT1S',
|
||||
timeoutAfterDuration: 'PT1M',
|
||||
});
|
||||
|
||||
await expect(worker.findReadyTask()).resolves.toEqual({
|
||||
result: 'not-ready-yet',
|
||||
});
|
||||
|
||||
await waitForExpect(async () => {
|
||||
await expect(worker.findReadyTask()).resolves.toEqual({
|
||||
result: 'ready',
|
||||
settings,
|
||||
});
|
||||
});
|
||||
|
||||
row = (await knex<DbTasksRow>(DB_TASKS_TABLE))[0];
|
||||
expect(row).toEqual(
|
||||
expect.objectContaining({
|
||||
id: 'task1',
|
||||
current_run_ticket: null,
|
||||
current_run_started_at: null,
|
||||
current_run_expires_at: null,
|
||||
}),
|
||||
);
|
||||
|
||||
await expect(worker.tryClaimTask('ticket', settings)).resolves.toBe(true);
|
||||
|
||||
row = (await knex<DbTasksRow>(DB_TASKS_TABLE))[0];
|
||||
expect(row).toEqual(
|
||||
expect.objectContaining({
|
||||
id: 'task1',
|
||||
current_run_ticket: 'ticket',
|
||||
current_run_started_at: expect.anything(),
|
||||
current_run_expires_at: expect.anything(),
|
||||
}),
|
||||
);
|
||||
|
||||
await expect(worker.tryReleaseTask('ticket', settings)).resolves.toBe(
|
||||
true,
|
||||
);
|
||||
|
||||
row = (await knex<DbTasksRow>(DB_TASKS_TABLE))[0];
|
||||
expect(row).toEqual(
|
||||
expect.objectContaining({
|
||||
id: 'task1',
|
||||
current_run_ticket: null,
|
||||
current_run_started_at: null,
|
||||
current_run_expires_at: null,
|
||||
}),
|
||||
);
|
||||
},
|
||||
);
|
||||
|
||||
it.each(databases.eachSupportedId())(
|
||||
'logs error when the task throws, %p',
|
||||
async databaseId => {
|
||||
const knex = await databases.init(databaseId);
|
||||
await migrateBackendTasks(knex);
|
||||
|
||||
jest.spyOn(logger, 'error');
|
||||
const fn = jest.fn().mockRejectedValue(new Error('failed'));
|
||||
const settings: TaskSettingsV2 = {
|
||||
version: 2,
|
||||
initialDelayDuration: undefined,
|
||||
cadence: '* * * * * *',
|
||||
timeoutAfterDuration: Duration.fromMillis(60000).toISO()!,
|
||||
};
|
||||
const checkFrequency = Duration.fromObject({ milliseconds: 100 });
|
||||
const worker = new TaskWorker('task1', fn, knex, logger, checkFrequency);
|
||||
worker.start(settings, { signal: testScopedSignal() });
|
||||
|
||||
await waitForExpect(() => {
|
||||
expect(logger.error).toHaveBeenCalled();
|
||||
});
|
||||
},
|
||||
);
|
||||
|
||||
it.each(databases.eachSupportedId())(
|
||||
'runs tasks more than once even when the task throws, %p',
|
||||
async databaseId => {
|
||||
const knex = await databases.init(databaseId);
|
||||
await migrateBackendTasks(knex);
|
||||
|
||||
const fn = jest.fn().mockRejectedValue(new Error('failed'));
|
||||
const settings: TaskSettingsV2 = {
|
||||
version: 2,
|
||||
initialDelayDuration: undefined,
|
||||
cadence: '* * * * * *',
|
||||
timeoutAfterDuration: Duration.fromMillis(60000).toISO()!,
|
||||
};
|
||||
const checkFrequency = Duration.fromObject({ milliseconds: 100 });
|
||||
const worker = new TaskWorker('task1', fn, knex, logger, checkFrequency);
|
||||
worker.start(settings, { signal: testScopedSignal() });
|
||||
|
||||
await waitForExpect(() => {
|
||||
expect(fn).toHaveBeenCalledTimes(3);
|
||||
});
|
||||
},
|
||||
);
|
||||
|
||||
it.each(databases.eachSupportedId())(
|
||||
'does not clobber ticket lock when stolen, %p',
|
||||
async databaseId => {
|
||||
const knex = await databases.init(databaseId);
|
||||
await migrateBackendTasks(knex);
|
||||
|
||||
const fn = jest.fn(
|
||||
async () => new Promise<void>(resolve => setTimeout(resolve, 50)),
|
||||
);
|
||||
const settings: TaskSettingsV2 = {
|
||||
version: 2,
|
||||
initialDelayDuration: undefined,
|
||||
cadence: '* * * * * *',
|
||||
timeoutAfterDuration: Duration.fromMillis(60000).toISO()!,
|
||||
};
|
||||
|
||||
const worker = new TaskWorker('task1', fn, knex, logger);
|
||||
await worker.persistTask(settings);
|
||||
|
||||
await waitForExpect(async () => {
|
||||
await expect(worker.findReadyTask()).resolves.toEqual({
|
||||
result: 'ready',
|
||||
settings,
|
||||
});
|
||||
});
|
||||
|
||||
await expect(worker.tryClaimTask('ticket', settings)).resolves.toBe(true);
|
||||
|
||||
let row = (await knex<DbTasksRow>(DB_TASKS_TABLE))[0];
|
||||
expect(row).toEqual(
|
||||
expect.objectContaining({
|
||||
id: 'task1',
|
||||
current_run_ticket: 'ticket',
|
||||
current_run_started_at: expect.anything(),
|
||||
current_run_expires_at: expect.anything(),
|
||||
}),
|
||||
);
|
||||
|
||||
await knex<DbTasksRow>(DB_TASKS_TABLE)
|
||||
.where('id', '=', 'task1')
|
||||
.update({ current_run_ticket: 'stolen' });
|
||||
|
||||
await expect(worker.tryReleaseTask('ticket', settings)).resolves.toBe(
|
||||
false,
|
||||
);
|
||||
|
||||
row = (await knex<DbTasksRow>(DB_TASKS_TABLE))[0];
|
||||
expect(row).toEqual(
|
||||
expect.objectContaining({
|
||||
id: 'task1',
|
||||
current_run_ticket: 'stolen',
|
||||
current_run_started_at: expect.anything(),
|
||||
current_run_expires_at: expect.anything(),
|
||||
}),
|
||||
);
|
||||
},
|
||||
);
|
||||
|
||||
it.each(databases.eachSupportedId())(
|
||||
'gracefully handles a disappeared task row, %p',
|
||||
async databaseId => {
|
||||
const knex = await databases.init(databaseId);
|
||||
await migrateBackendTasks(knex);
|
||||
|
||||
const fn = jest.fn(async () => {});
|
||||
const settings: TaskSettingsV2 = {
|
||||
version: 2,
|
||||
initialDelayDuration: undefined,
|
||||
cadence: '* * * * * *',
|
||||
timeoutAfterDuration: Duration.fromMillis(60000).toISO()!,
|
||||
};
|
||||
|
||||
const worker1 = new TaskWorker('task1', fn, knex, logger);
|
||||
await worker1.persistTask(settings);
|
||||
await knex<DbTasksRow>(DB_TASKS_TABLE).where('id', '=', 'task1').delete();
|
||||
await expect(worker1.findReadyTask()).resolves.toEqual({
|
||||
result: 'abort',
|
||||
});
|
||||
|
||||
const worker2 = new TaskWorker('task2', fn, knex, logger);
|
||||
await worker2.persistTask(settings);
|
||||
|
||||
await waitForExpect(async () => {
|
||||
await expect(worker2.findReadyTask()).resolves.toEqual({
|
||||
result: 'ready',
|
||||
settings,
|
||||
});
|
||||
});
|
||||
|
||||
await knex<DbTasksRow>(DB_TASKS_TABLE).where('id', '=', 'task2').delete();
|
||||
await expect(worker2.tryClaimTask('ticket', settings)).resolves.toBe(
|
||||
false,
|
||||
);
|
||||
|
||||
const worker3 = new TaskWorker('task3', fn, knex, logger);
|
||||
await worker3.persistTask(settings);
|
||||
|
||||
await waitForExpect(async () => {
|
||||
await expect(worker3.findReadyTask()).resolves.toEqual({
|
||||
result: 'ready',
|
||||
settings,
|
||||
});
|
||||
});
|
||||
|
||||
await expect(worker3.tryClaimTask('ticket', settings)).resolves.toBe(
|
||||
true,
|
||||
);
|
||||
await knex<DbTasksRow>(DB_TASKS_TABLE).where('id', '=', 'task3').delete();
|
||||
await expect(worker3.tryReleaseTask('ticket', settings)).resolves.toBe(
|
||||
false,
|
||||
);
|
||||
},
|
||||
);
|
||||
|
||||
it.each(databases.eachSupportedId())(
|
||||
'respects initialDelayDuration per worker, %p',
|
||||
async databaseId => {
|
||||
const knex = await databases.init(databaseId);
|
||||
await migrateBackendTasks(knex);
|
||||
|
||||
const abortFirst = new AbortController();
|
||||
const settings: TaskSettingsV2 = {
|
||||
version: 2,
|
||||
initialDelayDuration: 'PT0.3S',
|
||||
cadence: 'PT0.1S',
|
||||
timeoutAfterDuration: 'PT10S',
|
||||
};
|
||||
|
||||
// Start a single worker and make sure it waits and then goes to work
|
||||
const fn1 = jest.fn(async () => {});
|
||||
const worker1 = new TaskWorker(
|
||||
'task1',
|
||||
fn1,
|
||||
knex,
|
||||
logger,
|
||||
Duration.fromMillis(10),
|
||||
);
|
||||
await worker1.start(settings, { signal: abortFirst.signal });
|
||||
|
||||
expect(fn1).toHaveBeenCalledTimes(0);
|
||||
await new Promise(resolve => setTimeout(resolve, 250));
|
||||
expect(fn1).toHaveBeenCalledTimes(0);
|
||||
await new Promise(resolve => setTimeout(resolve, 100));
|
||||
expect(fn1.mock.calls.length).toBeGreaterThan(0);
|
||||
|
||||
// Start a second worker and make sure it waits but the first worker still works along
|
||||
const fn2 = jest.fn();
|
||||
const promise2 = new Promise(resolve => fn2.mockImplementation(resolve));
|
||||
const worker2 = new TaskWorker(
|
||||
'task1',
|
||||
fn2,
|
||||
knex,
|
||||
logger,
|
||||
Duration.fromMillis(10),
|
||||
);
|
||||
await worker2.start(settings, { signal: testScopedSignal() });
|
||||
|
||||
// We eventually abort the first worker just to make sure that the second
|
||||
// one for sure will get a go at running the task
|
||||
setTimeout(() => abortFirst.abort(), 1000);
|
||||
|
||||
const before = fn1.mock.calls.length;
|
||||
await promise2;
|
||||
expect(fn1.mock.calls.length).toBeGreaterThan(before);
|
||||
|
||||
await knex.destroy();
|
||||
},
|
||||
);
|
||||
|
||||
it.each(databases.eachSupportedId())(
|
||||
'next_run_start_at is always the min between schedule changes from cron frequency, %p',
|
||||
async databaseId => {
|
||||
const knex = await databases.init(databaseId);
|
||||
await migrateBackendTasks(knex);
|
||||
|
||||
const fn = jest.fn(
|
||||
async () => new Promise<void>(resolve => setTimeout(resolve, 50)),
|
||||
);
|
||||
const settings: TaskSettingsV2 = {
|
||||
version: 2,
|
||||
cadence: '*/15 * * * *',
|
||||
initialDelayDuration: 'PT2M',
|
||||
timeoutAfterDuration: 'PT1M',
|
||||
};
|
||||
|
||||
const worker = new TaskWorker('task99', fn, knex, logger);
|
||||
await worker.persistTask(settings);
|
||||
const row1 = (await knex<DbTasksRow>(DB_TASKS_TABLE))[0];
|
||||
|
||||
const settings2 = {
|
||||
...settings,
|
||||
cadence: '*/2 * * * *',
|
||||
initialDelayDuration: 'PT1M',
|
||||
};
|
||||
await worker.persistTask(settings2);
|
||||
const row2 = (await knex<DbTasksRow>(DB_TASKS_TABLE))[0];
|
||||
|
||||
expect(row2.next_run_start_at).not.toStrictEqual(row1.next_run_start_at);
|
||||
|
||||
const settings3 = { ...settings };
|
||||
await worker.persistTask(settings3);
|
||||
const row3 = (await knex<DbTasksRow>(DB_TASKS_TABLE))[0];
|
||||
|
||||
// The new timestamp can basically be 0 or a minute depending on how the
|
||||
// initialDelayDuration falls right on a cron boundary. This kinda
|
||||
// contrived check removes a test flakiness based on wall clock time.
|
||||
expect(
|
||||
Math.abs(
|
||||
+new Date(row3.next_run_start_at) - +new Date(row2.next_run_start_at),
|
||||
),
|
||||
).toBeLessThanOrEqual(60_000);
|
||||
|
||||
await knex.destroy();
|
||||
},
|
||||
);
|
||||
|
||||
it.each(databases.eachSupportedId())(
|
||||
'next_run_start_at is always the min between schedule changes when using human duration frequency, %p',
|
||||
async databaseId => {
|
||||
const knex = await databases.init(databaseId);
|
||||
await migrateBackendTasks(knex);
|
||||
|
||||
const fn = jest.fn(
|
||||
async () => new Promise<void>(resolve => setTimeout(resolve, 50)),
|
||||
);
|
||||
|
||||
const initialSettings: TaskSettingsV2 = {
|
||||
version: 2,
|
||||
cadence: 'PT120M',
|
||||
timeoutAfterDuration: 'PT1M',
|
||||
};
|
||||
|
||||
const worker = new TaskWorker('task99', fn, knex, logger);
|
||||
await worker.persistTask(initialSettings);
|
||||
// replicate task running, sets next_run_start_at based on cadence
|
||||
await worker.tryClaimTask('ticket', initialSettings);
|
||||
await worker.tryReleaseTask('ticket', initialSettings);
|
||||
|
||||
// grab initial row for comparisons later
|
||||
const rowAfterClaimAndRelease = (
|
||||
await knex<DbTasksRow>(DB_TASKS_TABLE)
|
||||
)[0];
|
||||
|
||||
const settings: TaskSettingsV2 = {
|
||||
...initialSettings,
|
||||
cadence: 'PT60M',
|
||||
};
|
||||
await worker.persistTask(settings);
|
||||
const row1 = (await knex<DbTasksRow>(DB_TASKS_TABLE))[0];
|
||||
|
||||
const rowAfterClaimAndReleaseNextStartAt = DateTime.fromJSDate(
|
||||
new Date(rowAfterClaimAndRelease.next_run_start_at),
|
||||
);
|
||||
const row1NextStartAt = DateTime.fromJSDate(
|
||||
new Date(row1.next_run_start_at),
|
||||
);
|
||||
const now = DateTime.now();
|
||||
expect(
|
||||
rowAfterClaimAndReleaseNextStartAt.diff(row1NextStartAt).as('minutes'),
|
||||
).toBeCloseTo(60, 1); // ensure that next start at is sooner than initial by one hour
|
||||
expect(row1NextStartAt.diff(now).as('minutes')).toBeCloseTo(60, 1); // ensure that next start at is later than now by one hour
|
||||
expect(
|
||||
rowAfterClaimAndReleaseNextStartAt.diff(now).as('minutes'),
|
||||
).toBeCloseTo(120, 1);
|
||||
|
||||
const settings2 = {
|
||||
...settings,
|
||||
};
|
||||
await worker.persistTask(settings2);
|
||||
const row2 = (await knex<DbTasksRow>(DB_TASKS_TABLE))[0];
|
||||
|
||||
expect(row2.next_run_start_at).toStrictEqual(row1.next_run_start_at);
|
||||
|
||||
await knex.destroy();
|
||||
},
|
||||
);
|
||||
|
||||
it.each(databases.eachSupportedId())(
|
||||
'next_run_start_at is always the min between schedule changes when using human duration frequency with initial start delay, %p',
|
||||
async databaseId => {
|
||||
const knex = await databases.init(databaseId);
|
||||
await migrateBackendTasks(knex);
|
||||
|
||||
const fn = jest.fn(
|
||||
async () => new Promise<void>(resolve => setTimeout(resolve, 50)),
|
||||
);
|
||||
|
||||
const initialSettings: TaskSettingsV2 = {
|
||||
version: 2,
|
||||
cadence: 'PT120M',
|
||||
initialDelayDuration: 'PT2M',
|
||||
timeoutAfterDuration: 'PT1M',
|
||||
};
|
||||
|
||||
const worker = new TaskWorker('task99', fn, knex, logger);
|
||||
await worker.persistTask(initialSettings);
|
||||
// replicate task running, sets next_run_start_at based on cadence
|
||||
await worker.tryClaimTask('ticket', initialSettings);
|
||||
await worker.tryReleaseTask('ticket', initialSettings);
|
||||
|
||||
// grab initial row for comparisons later
|
||||
const rowAfterClaimAndRelease = (
|
||||
await knex<DbTasksRow>(DB_TASKS_TABLE)
|
||||
)[0];
|
||||
|
||||
const settings: TaskSettingsV2 = {
|
||||
...initialSettings,
|
||||
cadence: 'PT60M',
|
||||
};
|
||||
await worker.persistTask(settings);
|
||||
const row1 = (await knex<DbTasksRow>(DB_TASKS_TABLE))[0];
|
||||
|
||||
const rowAfterClaimAndReleaseNextStartAt = DateTime.fromJSDate(
|
||||
new Date(rowAfterClaimAndRelease.next_run_start_at),
|
||||
);
|
||||
const row1NextStartAt = DateTime.fromJSDate(
|
||||
new Date(row1.next_run_start_at),
|
||||
);
|
||||
const now = DateTime.now();
|
||||
expect(
|
||||
rowAfterClaimAndReleaseNextStartAt.diff(row1NextStartAt).as('minutes'),
|
||||
).toBeCloseTo(62, 1); // ensure that next start at is sooner than initial by one hour, plus the 2 minute delay (set my tryReleaseTask)
|
||||
expect(row1NextStartAt.diff(now).as('minutes')).toBeCloseTo(60, 1); // ensure that next start at is later than now by one hour (2 minute delay doesn't take effect here)
|
||||
expect(
|
||||
rowAfterClaimAndReleaseNextStartAt.diff(now).as('minutes'),
|
||||
).toBeCloseTo(122, 1); // includes 2 minute start delay (which is persisted from tryReleaseTask)
|
||||
|
||||
const settings2 = {
|
||||
...settings,
|
||||
};
|
||||
await worker.persistTask(settings2);
|
||||
const row2 = (await knex<DbTasksRow>(DB_TASKS_TABLE))[0];
|
||||
|
||||
expect(row2.next_run_start_at).toStrictEqual(row1.next_run_start_at);
|
||||
|
||||
await knex.destroy();
|
||||
},
|
||||
);
|
||||
|
||||
it.each(databases.eachSupportedId())(
|
||||
'next_run_start_at is not set for manually-triggered tasks, %p',
|
||||
async databaseId => {
|
||||
const knex = await databases.init(databaseId);
|
||||
await migrateBackendTasks(knex);
|
||||
|
||||
const fn = jest.fn(
|
||||
async () => new Promise<void>(resolve => setTimeout(resolve, 50)),
|
||||
);
|
||||
|
||||
const initialSettings: TaskSettingsV2 = {
|
||||
version: 2,
|
||||
cadence: 'manual',
|
||||
timeoutAfterDuration: 'PT1M',
|
||||
};
|
||||
|
||||
const worker = new TaskWorker('task99', fn, knex, logger);
|
||||
await worker.persistTask(initialSettings);
|
||||
await worker.tryClaimTask('ticket', initialSettings);
|
||||
await worker.tryReleaseTask('ticket', initialSettings);
|
||||
|
||||
const row = (await knex<DbTasksRow>(DB_TASKS_TABLE))[0];
|
||||
expect(row.next_run_start_at).toBeNull();
|
||||
|
||||
await knex.destroy();
|
||||
},
|
||||
);
|
||||
});
|
||||
@@ -1,381 +0,0 @@
|
||||
/*
|
||||
* Copyright 2021 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { ConflictError, NotFoundError } from '@backstage/errors';
|
||||
import { CronTime } from 'cron';
|
||||
import { Knex } from 'knex';
|
||||
import { DateTime, Duration } from 'luxon';
|
||||
import { v4 as uuid } from 'uuid';
|
||||
import { DB_TASKS_TABLE, DbTasksRow } from '../database/tables';
|
||||
import { TaskFunction, TaskSettingsV2, taskSettingsV2Schema } from './types';
|
||||
import { delegateAbortController, nowPlus, sleep } from './util';
|
||||
import { LoggerService } from '@backstage/backend-plugin-api';
|
||||
|
||||
const DEFAULT_WORK_CHECK_FREQUENCY = Duration.fromObject({ seconds: 5 });
|
||||
|
||||
/**
|
||||
* Implements tasks that run across worker hosts, with collaborative locking.
|
||||
*
|
||||
* @private
|
||||
*/
|
||||
export class TaskWorker {
|
||||
constructor(
|
||||
private readonly taskId: string,
|
||||
private readonly fn: TaskFunction,
|
||||
private readonly knex: Knex,
|
||||
private readonly logger: LoggerService,
|
||||
private readonly workCheckFrequency: Duration = DEFAULT_WORK_CHECK_FREQUENCY,
|
||||
) {}
|
||||
|
||||
async start(settings: TaskSettingsV2, options?: { signal?: AbortSignal }) {
|
||||
try {
|
||||
await this.persistTask(settings);
|
||||
} catch (e) {
|
||||
throw new Error(`Failed to persist task, ${e}`);
|
||||
}
|
||||
|
||||
this.logger.info(
|
||||
`Task worker starting: ${this.taskId}, ${JSON.stringify(settings)}`,
|
||||
);
|
||||
|
||||
let workCheckFrequency = this.workCheckFrequency;
|
||||
const isDuration = settings?.cadence.startsWith('P');
|
||||
if (isDuration) {
|
||||
const cadence = Duration.fromISO(settings.cadence);
|
||||
if (cadence < workCheckFrequency) {
|
||||
workCheckFrequency = cadence;
|
||||
}
|
||||
}
|
||||
|
||||
let attemptNum = 1;
|
||||
(async () => {
|
||||
for (;;) {
|
||||
try {
|
||||
if (settings.initialDelayDuration) {
|
||||
await sleep(
|
||||
Duration.fromISO(settings.initialDelayDuration),
|
||||
options?.signal,
|
||||
);
|
||||
}
|
||||
|
||||
while (!options?.signal?.aborted) {
|
||||
const runResult = await this.runOnce(options?.signal);
|
||||
|
||||
if (runResult.result === 'abort') {
|
||||
break;
|
||||
}
|
||||
|
||||
await sleep(workCheckFrequency, options?.signal);
|
||||
}
|
||||
|
||||
this.logger.info(`Task worker finished: ${this.taskId}`);
|
||||
attemptNum = 0;
|
||||
break;
|
||||
} catch (e) {
|
||||
attemptNum += 1;
|
||||
this.logger.warn(
|
||||
`Task worker failed unexpectedly, attempt number ${attemptNum}, ${e}`,
|
||||
);
|
||||
await sleep(Duration.fromObject({ seconds: 1 }));
|
||||
}
|
||||
}
|
||||
})();
|
||||
}
|
||||
|
||||
static async trigger(knex: Knex, taskId: string): Promise<void> {
|
||||
// check if task exists
|
||||
const rows = await knex<DbTasksRow>(DB_TASKS_TABLE)
|
||||
.select(knex.raw(1))
|
||||
.where('id', '=', taskId);
|
||||
if (rows.length !== 1) {
|
||||
throw new NotFoundError(`Task ${taskId} does not exist`);
|
||||
}
|
||||
|
||||
const updatedRows = await knex<DbTasksRow>(DB_TASKS_TABLE)
|
||||
.where('id', '=', taskId)
|
||||
.whereNull('current_run_ticket')
|
||||
.update({
|
||||
next_run_start_at: knex.fn.now(),
|
||||
});
|
||||
if (updatedRows < 1) {
|
||||
throw new ConflictError(`Task ${taskId} is currently running`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Makes a single attempt at running the task to completion, if ready.
|
||||
*
|
||||
* @returns The outcome of the attempt
|
||||
*/
|
||||
private async runOnce(
|
||||
signal?: AbortSignal,
|
||||
): Promise<
|
||||
| { result: 'not-ready-yet' }
|
||||
| { result: 'abort' }
|
||||
| { result: 'failed' }
|
||||
| { result: 'completed' }
|
||||
> {
|
||||
const findResult = await this.findReadyTask();
|
||||
if (
|
||||
findResult.result === 'not-ready-yet' ||
|
||||
findResult.result === 'abort'
|
||||
) {
|
||||
return findResult;
|
||||
}
|
||||
|
||||
const taskSettings = findResult.settings;
|
||||
const ticket = uuid();
|
||||
|
||||
const claimed = await this.tryClaimTask(ticket, taskSettings);
|
||||
if (!claimed) {
|
||||
return { result: 'not-ready-yet' };
|
||||
}
|
||||
|
||||
// Abort the task execution either if the worker is stopped, or if the
|
||||
// task timeout is hit
|
||||
const taskAbortController = delegateAbortController(signal);
|
||||
const timeoutHandle = setTimeout(() => {
|
||||
taskAbortController.abort();
|
||||
}, Duration.fromISO(taskSettings.timeoutAfterDuration).as('milliseconds'));
|
||||
|
||||
try {
|
||||
await this.fn(taskAbortController.signal);
|
||||
taskAbortController.abort(); // releases resources
|
||||
} catch (e) {
|
||||
this.logger.error(e);
|
||||
await this.tryReleaseTask(ticket, taskSettings);
|
||||
return { result: 'failed' };
|
||||
} finally {
|
||||
clearTimeout(timeoutHandle);
|
||||
}
|
||||
|
||||
await this.tryReleaseTask(ticket, taskSettings);
|
||||
return { result: 'completed' };
|
||||
}
|
||||
|
||||
/**
|
||||
* Perform the initial store of the task info
|
||||
*/
|
||||
async persistTask(settings: TaskSettingsV2) {
|
||||
// Perform an initial parse to ensure that we will definitely be able to
|
||||
// read it back again.
|
||||
taskSettingsV2Schema.parse(settings);
|
||||
|
||||
const isManual = settings?.cadence === 'manual';
|
||||
const isDuration = settings?.cadence.startsWith('P');
|
||||
const isCron = !isManual && !isDuration;
|
||||
|
||||
let startAt: Knex.Raw | undefined;
|
||||
let nextStartAt: Knex.Raw | undefined;
|
||||
if (settings.initialDelayDuration) {
|
||||
startAt = nowPlus(
|
||||
Duration.fromISO(settings.initialDelayDuration),
|
||||
this.knex,
|
||||
);
|
||||
}
|
||||
|
||||
if (isCron) {
|
||||
const time = new CronTime(settings.cadence)
|
||||
.sendAt()
|
||||
.minus({ seconds: 1 }) // immediately, if "* * * * * *"
|
||||
.toUTC();
|
||||
|
||||
nextStartAt = this.nextRunAtRaw(time);
|
||||
startAt ||= nextStartAt;
|
||||
} else if (isManual) {
|
||||
nextStartAt = this.knex.raw('null');
|
||||
startAt ||= nextStartAt;
|
||||
} else {
|
||||
startAt ||= this.knex.fn.now();
|
||||
nextStartAt = nowPlus(Duration.fromISO(settings.cadence), this.knex);
|
||||
}
|
||||
|
||||
this.logger.debug(`task: ${this.taskId} configured to run at: ${startAt}`);
|
||||
|
||||
// It's OK if the task already exists; if it does, just replace its
|
||||
// settings with the new value and start the loop as usual.
|
||||
const settingsJson = JSON.stringify(settings);
|
||||
await this.knex<DbTasksRow>(DB_TASKS_TABLE)
|
||||
.insert({
|
||||
id: this.taskId,
|
||||
settings_json: settingsJson,
|
||||
next_run_start_at: startAt,
|
||||
})
|
||||
.onConflict('id')
|
||||
.merge(
|
||||
this.knex.client.config.client.includes('mysql')
|
||||
? {
|
||||
settings_json: settingsJson,
|
||||
next_run_start_at: this.knex.raw(
|
||||
`CASE WHEN ?? < ?? THEN ?? ELSE ?? END`,
|
||||
[
|
||||
nextStartAt,
|
||||
'next_run_start_at',
|
||||
nextStartAt,
|
||||
'next_run_start_at',
|
||||
],
|
||||
),
|
||||
}
|
||||
: {
|
||||
settings_json: this.knex.ref('excluded.settings_json'),
|
||||
next_run_start_at: this.knex.raw(
|
||||
`CASE WHEN ?? < ?? THEN ?? ELSE ?? END`,
|
||||
[
|
||||
nextStartAt,
|
||||
`${DB_TASKS_TABLE}.next_run_start_at`,
|
||||
nextStartAt,
|
||||
`${DB_TASKS_TABLE}.next_run_start_at`,
|
||||
],
|
||||
),
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the task is ready to run
|
||||
*/
|
||||
async findReadyTask(): Promise<
|
||||
| { result: 'not-ready-yet' }
|
||||
| { result: 'abort' }
|
||||
| { result: 'ready'; settings: TaskSettingsV2 }
|
||||
> {
|
||||
const [row] = await this.knex<DbTasksRow>(DB_TASKS_TABLE)
|
||||
.where('id', '=', this.taskId)
|
||||
.select({
|
||||
settingsJson: 'settings_json',
|
||||
ready: this.knex.raw(
|
||||
`CASE
|
||||
WHEN next_run_start_at <= ? AND current_run_ticket IS NULL THEN TRUE
|
||||
ELSE FALSE
|
||||
END`,
|
||||
[this.knex.fn.now()],
|
||||
),
|
||||
});
|
||||
|
||||
if (!row) {
|
||||
this.logger.info(
|
||||
'No longer able to find task; aborting and assuming that it has been unregistered or expired',
|
||||
);
|
||||
return { result: 'abort' };
|
||||
} else if (!row.ready) {
|
||||
return { result: 'not-ready-yet' };
|
||||
}
|
||||
|
||||
try {
|
||||
const obj = JSON.parse(row.settingsJson);
|
||||
const settings = taskSettingsV2Schema.parse(obj);
|
||||
return { result: 'ready', settings };
|
||||
} catch (e) {
|
||||
this.logger.info(
|
||||
`Task "${this.taskId}" is no longer able to parse task settings; aborting and assuming that a ` +
|
||||
`newer version of the task has been issued and being handled by other workers, ${e}`,
|
||||
);
|
||||
return { result: 'abort' };
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempts to claim a task that's ready for execution, on this worker's
|
||||
* behalf. We should not attempt to perform the work unless the claim really
|
||||
* goes through.
|
||||
*
|
||||
* @param ticket - A globally unique string that changes for each invocation
|
||||
* @param settings - The settings of the task to claim
|
||||
* @returns True if it was successfully claimed
|
||||
*/
|
||||
async tryClaimTask(
|
||||
ticket: string,
|
||||
settings: TaskSettingsV2,
|
||||
): Promise<boolean> {
|
||||
const startedAt = this.knex.fn.now();
|
||||
const expiresAt = settings.timeoutAfterDuration
|
||||
? nowPlus(Duration.fromISO(settings.timeoutAfterDuration), this.knex)
|
||||
: this.knex.raw('null');
|
||||
|
||||
const rows = await this.knex<DbTasksRow>(DB_TASKS_TABLE)
|
||||
.where('id', '=', this.taskId)
|
||||
.whereNull('current_run_ticket')
|
||||
.update({
|
||||
current_run_ticket: ticket,
|
||||
current_run_started_at: startedAt,
|
||||
current_run_expires_at: expiresAt,
|
||||
});
|
||||
|
||||
return rows === 1;
|
||||
}
|
||||
|
||||
async tryReleaseTask(
|
||||
ticket: string,
|
||||
settings: TaskSettingsV2,
|
||||
): Promise<boolean> {
|
||||
const isManual = settings?.cadence === 'manual';
|
||||
const isDuration = settings?.cadence.startsWith('P');
|
||||
const isCron = !isManual && !isDuration;
|
||||
|
||||
let nextRun: Knex.Raw;
|
||||
if (isCron) {
|
||||
const time = new CronTime(settings.cadence).sendAt().toUTC();
|
||||
this.logger.debug(`task: ${this.taskId} will next occur around ${time}`);
|
||||
|
||||
nextRun = this.nextRunAtRaw(time);
|
||||
} else if (isManual) {
|
||||
nextRun = this.knex.raw('null');
|
||||
} else {
|
||||
const dt = Duration.fromISO(settings.cadence).as('seconds');
|
||||
this.logger.debug(
|
||||
`task: ${this.taskId} will next occur around ${DateTime.now().plus({
|
||||
seconds: dt,
|
||||
})}`,
|
||||
);
|
||||
|
||||
if (this.knex.client.config.client.includes('sqlite3')) {
|
||||
nextRun = this.knex.raw(
|
||||
`max(datetime(next_run_start_at, ?), datetime('now'))`,
|
||||
[`+${dt} seconds`],
|
||||
);
|
||||
} else if (this.knex.client.config.client.includes('mysql')) {
|
||||
nextRun = this.knex.raw(
|
||||
`greatest(next_run_start_at + interval ${dt} second, now())`,
|
||||
);
|
||||
} else {
|
||||
nextRun = this.knex.raw(
|
||||
`greatest(next_run_start_at + interval '${dt} seconds', now())`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
const rows = await this.knex<DbTasksRow>(DB_TASKS_TABLE)
|
||||
.where('id', '=', this.taskId)
|
||||
.where('current_run_ticket', '=', ticket)
|
||||
.update({
|
||||
next_run_start_at: nextRun,
|
||||
current_run_ticket: this.knex.raw('null'),
|
||||
current_run_started_at: this.knex.raw('null'),
|
||||
current_run_expires_at: this.knex.raw('null'),
|
||||
});
|
||||
|
||||
return rows === 1;
|
||||
}
|
||||
|
||||
private nextRunAtRaw(time: DateTime): Knex.Raw {
|
||||
if (this.knex.client.config.client.includes('sqlite3')) {
|
||||
return this.knex.raw('datetime(?)', [time.toISO()]);
|
||||
} else if (this.knex.client.config.client.includes('mysql')) {
|
||||
return this.knex.raw(`?`, [time.toSQL({ includeOffset: false })]);
|
||||
}
|
||||
return this.knex.raw(`?`, [time.toISO()]);
|
||||
}
|
||||
}
|
||||
@@ -1,28 +0,0 @@
|
||||
/*
|
||||
* Copyright 2023 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
export function createTestScopedSignal(): () => AbortSignal {
|
||||
let testAbortController = new AbortController();
|
||||
|
||||
beforeEach(() => {
|
||||
testAbortController = new AbortController();
|
||||
});
|
||||
afterEach(() => {
|
||||
testAbortController.abort();
|
||||
});
|
||||
|
||||
return () => testAbortController.signal;
|
||||
}
|
||||
@@ -1,27 +0,0 @@
|
||||
/*
|
||||
* Copyright 2021 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
export { readTaskScheduleDefinitionFromConfig } from './readTaskScheduleDefinitionFromConfig';
|
||||
export { TaskScheduler } from './TaskScheduler';
|
||||
export type {
|
||||
PluginTaskScheduler,
|
||||
TaskFunction,
|
||||
TaskDescriptor,
|
||||
TaskInvocationDefinition,
|
||||
TaskRunner,
|
||||
TaskScheduleDefinition,
|
||||
TaskScheduleDefinitionConfig,
|
||||
} from './types';
|
||||
@@ -1,131 +0,0 @@
|
||||
/*
|
||||
* Copyright 2022 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { ConfigReader } from '@backstage/config';
|
||||
import { HumanDuration } from '@backstage/types';
|
||||
import { readTaskScheduleDefinitionFromConfig } from './readTaskScheduleDefinitionFromConfig';
|
||||
|
||||
describe('readTaskScheduleDefinitionFromConfig', () => {
|
||||
it('all valid values', () => {
|
||||
const config = new ConfigReader({
|
||||
frequency: {
|
||||
cron: '0 30 * * * *',
|
||||
},
|
||||
timeout: 'PT3M',
|
||||
initialDelay: {
|
||||
minutes: 20,
|
||||
},
|
||||
scope: 'global',
|
||||
});
|
||||
|
||||
const result = readTaskScheduleDefinitionFromConfig(config);
|
||||
|
||||
expect((result.frequency as { cron: string }).cron).toBe('0 30 * * * *');
|
||||
expect(result.timeout).toEqual({ minutes: 3 });
|
||||
expect((result.initialDelay as HumanDuration).minutes).toEqual(20);
|
||||
expect(result.scope).toBe('global');
|
||||
});
|
||||
|
||||
it('all valid required values', () => {
|
||||
const config = new ConfigReader({
|
||||
frequency: {
|
||||
cron: '0 30 * * * *',
|
||||
},
|
||||
timeout: 'PT3M',
|
||||
});
|
||||
|
||||
const result = readTaskScheduleDefinitionFromConfig(config);
|
||||
|
||||
expect((result.frequency as { cron: string }).cron).toBe('0 30 * * * *');
|
||||
expect(result.timeout).toEqual({ minutes: 3 });
|
||||
expect(result.initialDelay).toBeUndefined();
|
||||
expect(result.scope).toBeUndefined();
|
||||
});
|
||||
|
||||
it('fail without required frequency', () => {
|
||||
const config = new ConfigReader({
|
||||
timeout: 'PT3M',
|
||||
});
|
||||
|
||||
expect(() => readTaskScheduleDefinitionFromConfig(config)).toThrow(
|
||||
"Missing required config value at 'frequency'",
|
||||
);
|
||||
});
|
||||
|
||||
it('fail without required timeout', () => {
|
||||
const config = new ConfigReader({
|
||||
frequency: 'PT30M',
|
||||
});
|
||||
|
||||
expect(() => readTaskScheduleDefinitionFromConfig(config)).toThrow(
|
||||
"Missing required config value at 'timeout'",
|
||||
);
|
||||
});
|
||||
|
||||
it('invalid frequency key', () => {
|
||||
const config = new ConfigReader({
|
||||
frequency: {
|
||||
invalid: 'value',
|
||||
},
|
||||
timeout: 'PT3M',
|
||||
});
|
||||
|
||||
expect(() => readTaskScheduleDefinitionFromConfig(config)).toThrow(
|
||||
"Failed to read duration from config at 'frequency', Error: Needs one or more of 'years', 'months', 'weeks', 'days', 'hours', 'minutes', 'seconds', 'milliseconds'",
|
||||
);
|
||||
});
|
||||
|
||||
it('invalid frequency value', () => {
|
||||
const config = new ConfigReader({
|
||||
frequency: {
|
||||
minutes: 'value',
|
||||
},
|
||||
timeout: 'PT3M',
|
||||
});
|
||||
|
||||
expect(() => readTaskScheduleDefinitionFromConfig(config)).toThrow(
|
||||
"Failed to read duration from config, Error: Unable to convert config value for key 'frequency.minutes' in 'mock-config' to a number",
|
||||
);
|
||||
});
|
||||
|
||||
it('frequency value with additional invalid prop', () => {
|
||||
const config = new ConfigReader({
|
||||
frequency: {
|
||||
minutes: 20,
|
||||
invalid: 'value',
|
||||
},
|
||||
timeout: 'PT3M',
|
||||
});
|
||||
|
||||
expect(() => readTaskScheduleDefinitionFromConfig(config)).toThrow(
|
||||
"Failed to read duration from config at 'frequency', Error: Unknown property 'invalid'; expected one or more of 'years', 'months', 'weeks', 'days', 'hours', 'minutes', 'seconds', 'milliseconds'",
|
||||
);
|
||||
});
|
||||
|
||||
it('invalid scope value', () => {
|
||||
const config = new ConfigReader({
|
||||
frequency: {
|
||||
years: 2,
|
||||
},
|
||||
timeout: 'PT3M',
|
||||
scope: 'invalid',
|
||||
});
|
||||
|
||||
expect(() => readTaskScheduleDefinitionFromConfig(config)).toThrow(
|
||||
'Only "global" or "local" are allowed for TaskScheduleDefinition.scope, but got: invalid',
|
||||
);
|
||||
});
|
||||
});
|
||||
@@ -1,85 +0,0 @@
|
||||
/*
|
||||
* Copyright 2022 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { Config, readDurationFromConfig } from '@backstage/config';
|
||||
import { HumanDuration } from '@backstage/types';
|
||||
import { TaskScheduleDefinition } from './types';
|
||||
import { Duration } from 'luxon';
|
||||
|
||||
function readDuration(config: Config, key: string): HumanDuration {
|
||||
if (typeof config.get(key) === 'string') {
|
||||
const value = config.getString(key);
|
||||
const duration = Duration.fromISO(value);
|
||||
if (!duration.isValid) {
|
||||
throw new Error(`Invalid duration: ${value}`);
|
||||
}
|
||||
return duration.toObject();
|
||||
}
|
||||
|
||||
return readDurationFromConfig(config, { key });
|
||||
}
|
||||
|
||||
function readFrequency(
|
||||
config: Config,
|
||||
key: string,
|
||||
): { cron: string } | Duration | HumanDuration | { trigger: 'manual' } {
|
||||
const value = config.get(key);
|
||||
if (typeof value === 'object' && (value as { cron?: string }).cron) {
|
||||
return value as { cron: string };
|
||||
}
|
||||
if (
|
||||
typeof value === 'object' &&
|
||||
(value as { trigger?: string }).trigger === 'manual'
|
||||
) {
|
||||
return { trigger: 'manual' };
|
||||
}
|
||||
|
||||
return readDuration(config, key);
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads a TaskScheduleDefinition from a Config.
|
||||
* Expects the config not to be the root config,
|
||||
* but the config for the definition.
|
||||
*
|
||||
* @param config - config for a TaskScheduleDefinition.
|
||||
* @public
|
||||
* @deprecated Please import `readSchedulerServiceTaskScheduleDefinitionFromConfig` from `@backstage/backend-plugin-api` instead
|
||||
*/
|
||||
export function readTaskScheduleDefinitionFromConfig(
|
||||
config: Config,
|
||||
): TaskScheduleDefinition {
|
||||
const frequency = readFrequency(config, 'frequency');
|
||||
const timeout = readDuration(config, 'timeout');
|
||||
|
||||
const initialDelay = config.has('initialDelay')
|
||||
? readDuration(config, 'initialDelay')
|
||||
: undefined;
|
||||
|
||||
const scope = config.getOptionalString('scope');
|
||||
if (scope && !['global', 'local'].includes(scope)) {
|
||||
throw new Error(
|
||||
`Only "global" or "local" are allowed for TaskScheduleDefinition.scope, but got: ${scope}`,
|
||||
);
|
||||
}
|
||||
|
||||
return {
|
||||
frequency,
|
||||
timeout,
|
||||
initialDelay,
|
||||
scope: scope as 'global' | 'local' | undefined,
|
||||
};
|
||||
}
|
||||
@@ -1,437 +0,0 @@
|
||||
/*
|
||||
* Copyright 2021 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { HumanDuration, JsonObject } from '@backstage/types';
|
||||
import { CronTime } from 'cron';
|
||||
import { Duration } from 'luxon';
|
||||
import { z } from 'zod';
|
||||
|
||||
/**
|
||||
* A function that can be called as a scheduled task.
|
||||
*
|
||||
* It may optionally accept an abort signal argument. When the signal triggers,
|
||||
* processing should abort and return as quickly as possible.
|
||||
*
|
||||
* @public
|
||||
* @deprecated Please import `SchedulerServiceTaskFunction` from `@backstage/backend-plugin-api` instead
|
||||
*/
|
||||
export type TaskFunction =
|
||||
| ((abortSignal: AbortSignal) => void | Promise<void>)
|
||||
| (() => void | Promise<void>);
|
||||
|
||||
/**
|
||||
* A semi-opaque type to describe an actively scheduled task.
|
||||
*
|
||||
* @public
|
||||
* @deprecated Please import `SchedulerServiceTaskDescriptor` from `@backstage/backend-plugin-api` instead
|
||||
*/
|
||||
export type TaskDescriptor = {
|
||||
/**
|
||||
* The unique identifier of the task.
|
||||
*/
|
||||
id: string;
|
||||
/**
|
||||
* The scope of the task.
|
||||
*/
|
||||
scope: 'global' | 'local';
|
||||
/**
|
||||
* The settings that control the task flow. This is a semi-opaque structure
|
||||
* that is mainly there for debugging purposes. Do not make any assumptions
|
||||
* about the contents of this field.
|
||||
*/
|
||||
settings: { version: number } & JsonObject;
|
||||
};
|
||||
|
||||
/**
|
||||
* Options that control the scheduling of a task.
|
||||
*
|
||||
* @public
|
||||
* @deprecated Please import `SchedulerServiceTaskScheduleDefinition` from `@backstage/backend-plugin-api` instead
|
||||
*/
|
||||
export interface TaskScheduleDefinition {
|
||||
/**
|
||||
* How often you want the task to run. The system does its best to avoid
|
||||
* overlapping invocations.
|
||||
*
|
||||
* @remarks
|
||||
*
|
||||
* This is the best effort value; under some circumstances there can be
|
||||
* deviations. For example, if the task runtime is longer than the frequency
|
||||
* and the timeout has not been given or not been exceeded yet, the next
|
||||
* invocation of this task will be delayed until after the previous one
|
||||
* finishes.
|
||||
*
|
||||
* This is a required field.
|
||||
*/
|
||||
frequency:
|
||||
| {
|
||||
/**
|
||||
* A crontab style string.
|
||||
*
|
||||
* @remarks
|
||||
*
|
||||
* Overview:
|
||||
*
|
||||
* ```
|
||||
* ┌────────────── second (optional)
|
||||
* │ ┌──────────── minute
|
||||
* │ │ ┌────────── hour
|
||||
* │ │ │ ┌──────── day of month
|
||||
* │ │ │ │ ┌────── month
|
||||
* │ │ │ │ │ ┌──── day of week
|
||||
* │ │ │ │ │ │
|
||||
* │ │ │ │ │ │
|
||||
* * * * * * *
|
||||
* ```
|
||||
*/
|
||||
cron: string;
|
||||
}
|
||||
| Duration
|
||||
| HumanDuration
|
||||
/**
|
||||
* This task will only run when manually triggered with the `triggerTask` method; no automatic
|
||||
* scheduling. This is useful for locking of global tasks that should not be run concurrently.
|
||||
*/
|
||||
| { trigger: 'manual' };
|
||||
|
||||
/**
|
||||
* The maximum amount of time that a single task invocation can take, before
|
||||
* it's considered timed out and gets "released" such that a new invocation
|
||||
* is permitted to take place (possibly, then, on a different worker).
|
||||
*/
|
||||
timeout: Duration | HumanDuration;
|
||||
|
||||
/**
|
||||
* The amount of time that should pass before the first invocation happens.
|
||||
*
|
||||
* @remarks
|
||||
*
|
||||
* This can be useful in cold start scenarios to stagger or delay some heavy
|
||||
* compute jobs. If no value is given for this field then the first invocation
|
||||
* will happen as soon as possible according to the cadence.
|
||||
*
|
||||
* NOTE: This is a per-worker delay. If you have a cluster of workers all
|
||||
* collaborating on a task that has its `scope` field set to `'global'`, then
|
||||
* you may still see the task being processed by other long-lived workers,
|
||||
* while any given single worker is in its initial sleep delay time e.g. after
|
||||
* a deployment. Therefore, this parameter is not useful for "globally" pausing
|
||||
* work; its main intended use is for individual machines to get a chance to
|
||||
* reach some equilibrium at startup before triggering heavy batch workloads.
|
||||
*/
|
||||
initialDelay?: Duration | HumanDuration;
|
||||
|
||||
/**
|
||||
* Sets the scope of concurrency control / locking to apply for invocations of
|
||||
* this task.
|
||||
*
|
||||
* @remarks
|
||||
*
|
||||
* When the scope is set to the default value `'global'`, the scheduler will
|
||||
* attempt to ensure that only one worker machine runs the task at a time,
|
||||
* according to the given cadence. This means that as the number of worker
|
||||
* hosts increases, the invocation frequency of this task will not go up.
|
||||
* Instead, the load is spread randomly across hosts. This setting is useful
|
||||
* for tasks that access shared resources, for example catalog ingestion tasks
|
||||
* where you do not want many machines to repeatedly import the same data and
|
||||
* trample over each other.
|
||||
*
|
||||
* When the scope is set to `'local'`, there is no concurrency control across
|
||||
* hosts. Each host runs the task according to the given cadence similarly to
|
||||
* `setInterval`, but the runtime ensures that there are no overlapping runs.
|
||||
*
|
||||
* @defaultValue 'global'
|
||||
*/
|
||||
scope?: 'global' | 'local';
|
||||
}
|
||||
|
||||
/**
|
||||
* Config options for {@link TaskScheduleDefinition}
|
||||
* that control the scheduling of a task.
|
||||
*
|
||||
* @public
|
||||
* @deprecated Please import `SchedulerServiceTaskScheduleDefinitionConfig` from `@backstage/backend-plugin-api` instead
|
||||
*/
|
||||
export interface TaskScheduleDefinitionConfig {
|
||||
/**
|
||||
* How often you want the task to run. The system does its best to avoid
|
||||
* overlapping invocations.
|
||||
*
|
||||
* @remarks
|
||||
*
|
||||
* This is the best effort value; under some circumstances there can be
|
||||
* deviations. For example, if the task runtime is longer than the frequency
|
||||
* and the timeout has not been given or not been exceeded yet, the next
|
||||
* invocation of this task will be delayed until after the previous one
|
||||
* finishes.
|
||||
*
|
||||
* This is a required field.
|
||||
*/
|
||||
frequency:
|
||||
| {
|
||||
/**
|
||||
* A crontab style string.
|
||||
*
|
||||
* @remarks
|
||||
*
|
||||
* Overview:
|
||||
*
|
||||
* ```
|
||||
* ┌────────────── second (optional)
|
||||
* │ ┌──────────── minute
|
||||
* │ │ ┌────────── hour
|
||||
* │ │ │ ┌──────── day of month
|
||||
* │ │ │ │ ┌────── month
|
||||
* │ │ │ │ │ ┌──── day of week
|
||||
* │ │ │ │ │ │
|
||||
* │ │ │ │ │ │
|
||||
* * * * * * *
|
||||
* ```
|
||||
*/
|
||||
cron: string;
|
||||
}
|
||||
| string
|
||||
| HumanDuration;
|
||||
|
||||
/**
|
||||
* The maximum amount of time that a single task invocation can take, before
|
||||
* it's considered timed out and gets "released" such that a new invocation
|
||||
* is permitted to take place (possibly, then, on a different worker).
|
||||
*/
|
||||
timeout: string | HumanDuration;
|
||||
|
||||
/**
|
||||
* The amount of time that should pass before the first invocation happens.
|
||||
*
|
||||
* @remarks
|
||||
*
|
||||
* This can be useful in cold start scenarios to stagger or delay some heavy
|
||||
* compute jobs. If no value is given for this field then the first invocation
|
||||
* will happen as soon as possible according to the cadence.
|
||||
*
|
||||
* NOTE: This is a per-worker delay. If you have a cluster of workers all
|
||||
* collaborating on a task that has its `scope` field set to `'global'`, then
|
||||
* you may still see the task being processed by other long-lived workers,
|
||||
* while any given single worker is in its initial sleep delay time e.g. after
|
||||
* a deployment. Therefore, this parameter is not useful for "globally" pausing
|
||||
* work; its main intended use is for individual machines to get a chance to
|
||||
* reach some equilibrium at startup before triggering heavy batch workloads.
|
||||
*/
|
||||
initialDelay?: string | HumanDuration;
|
||||
|
||||
/**
|
||||
* Sets the scope of concurrency control / locking to apply for invocations of
|
||||
* this task.
|
||||
*
|
||||
* @remarks
|
||||
*
|
||||
* When the scope is set to the default value `'global'`, the scheduler will
|
||||
* attempt to ensure that only one worker machine runs the task at a time,
|
||||
* according to the given cadence. This means that as the number of worker
|
||||
* hosts increases, the invocation frequency of this task will not go up.
|
||||
* Instead, the load is spread randomly across hosts. This setting is useful
|
||||
* for tasks that access shared resources, for example catalog ingestion tasks
|
||||
* where you do not want many machines to repeatedly import the same data and
|
||||
* trample over each other.
|
||||
*
|
||||
* When the scope is set to `'local'`, there is no concurrency control across
|
||||
* hosts. Each host runs the task according to the given cadence similarly to
|
||||
* `setInterval`, but the runtime ensures that there are no overlapping runs.
|
||||
*
|
||||
* @defaultValue 'global'
|
||||
*/
|
||||
scope?: 'global' | 'local';
|
||||
}
|
||||
|
||||
/**
|
||||
* Options that apply to the invocation of a given task.
|
||||
*
|
||||
* @public
|
||||
* @deprecated Please import `SchedulerServiceTaskInvocationDefinition` from `@backstage/backend-plugin-api` instead
|
||||
*/
|
||||
export interface TaskInvocationDefinition {
|
||||
/**
|
||||
* A unique ID (within the scope of the plugin) for the task.
|
||||
*/
|
||||
id: string;
|
||||
|
||||
/**
|
||||
* The actual task function to be invoked regularly.
|
||||
*/
|
||||
fn: TaskFunction;
|
||||
|
||||
/**
|
||||
* An abort signal that, when triggered, will stop the recurring execution of
|
||||
* the task.
|
||||
*/
|
||||
signal?: AbortSignal;
|
||||
}
|
||||
|
||||
/**
|
||||
* A previously prepared task schedule, ready to be invoked.
|
||||
*
|
||||
* @public
|
||||
* @deprecated Please import `SchedulerServiceTaskRunner` from `@backstage/backend-plugin-api` instead
|
||||
*/
|
||||
export interface TaskRunner {
|
||||
/**
|
||||
* Takes the schedule and executes an actual task using it.
|
||||
*
|
||||
* @param task - The actual runtime properties of the task
|
||||
*/
|
||||
run(task: TaskInvocationDefinition): Promise<void>;
|
||||
}
|
||||
|
||||
/**
|
||||
* Deals with the scheduling of distributed tasks, for a given plugin.
|
||||
*
|
||||
* @public
|
||||
* @deprecated Please use `SchedulerService` from `@backstage/backend-plugin-api` instead (most likely via `coreServices.scheduler`)
|
||||
*/
|
||||
export interface PluginTaskScheduler {
|
||||
/**
|
||||
* Manually triggers a task by ID.
|
||||
*
|
||||
* If the task doesn't exist, a NotFoundError is thrown. If the task is
|
||||
* currently running, a ConflictError is thrown.
|
||||
*
|
||||
* @param id - The task ID
|
||||
*/
|
||||
triggerTask(id: string): Promise<void>;
|
||||
|
||||
/**
|
||||
* Schedules a task function for recurring runs.
|
||||
*
|
||||
* @remarks
|
||||
*
|
||||
* The `scope` task field controls whether to use coordinated exclusive
|
||||
* invocation across workers, or to just coordinate within the current worker.
|
||||
*
|
||||
* This convenience method performs both the scheduling and invocation in one
|
||||
* go.
|
||||
*
|
||||
* @param task - The task definition
|
||||
*/
|
||||
scheduleTask(
|
||||
task: TaskScheduleDefinition & TaskInvocationDefinition,
|
||||
): Promise<void>;
|
||||
|
||||
/**
|
||||
* Creates a scheduled but dormant recurring task, ready to be launched at a
|
||||
* later time.
|
||||
*
|
||||
* @remarks
|
||||
*
|
||||
* This method is useful for pre-creating a schedule in outer code to be
|
||||
* passed into an inner implementation, such that the outer code controls
|
||||
* scheduling while inner code controls implementation.
|
||||
*
|
||||
* @param schedule - The task schedule
|
||||
*/
|
||||
createScheduledTaskRunner(schedule: TaskScheduleDefinition): TaskRunner;
|
||||
|
||||
/**
|
||||
* Returns all scheduled tasks registered to this scheduler.
|
||||
*
|
||||
* @remarks
|
||||
*
|
||||
* This method is useful for triggering tasks manually using the triggerTask
|
||||
* functionality. Note that the returned tasks contain only tasks that have
|
||||
* been initialized in this instance of the scheduler.
|
||||
*
|
||||
* @returns Scheduled tasks
|
||||
*/
|
||||
getScheduledTasks(): Promise<TaskDescriptor[]>;
|
||||
}
|
||||
|
||||
function isValidOptionalDurationString(d: string | undefined): boolean {
|
||||
try {
|
||||
return !d || Duration.fromISO(d).isValid;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
function isValidCronFormat(c: string | undefined): boolean {
|
||||
try {
|
||||
if (!c) {
|
||||
return false;
|
||||
}
|
||||
// parse cron format to ensure it's a valid format.
|
||||
// eslint-disable-next-line no-new
|
||||
new CronTime(c);
|
||||
return true;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
function isValidTrigger(t: string): boolean {
|
||||
return t === 'manual';
|
||||
}
|
||||
|
||||
export const taskSettingsV1Schema = z.object({
|
||||
version: z.literal(1),
|
||||
initialDelayDuration: z
|
||||
.string()
|
||||
.optional()
|
||||
.refine(isValidOptionalDurationString, {
|
||||
message: 'Invalid duration, expecting ISO Period',
|
||||
}),
|
||||
recurringAtMostEveryDuration: z
|
||||
.string()
|
||||
.refine(isValidOptionalDurationString, {
|
||||
message: 'Invalid duration, expecting ISO Period',
|
||||
}),
|
||||
timeoutAfterDuration: z.string().refine(isValidOptionalDurationString, {
|
||||
message: 'Invalid duration, expecting ISO Period',
|
||||
}),
|
||||
});
|
||||
|
||||
/**
|
||||
* The properties that control a scheduled task (version 1).
|
||||
*/
|
||||
export type TaskSettingsV1 = z.infer<typeof taskSettingsV1Schema>;
|
||||
|
||||
export const taskSettingsV2Schema = z.object({
|
||||
version: z.literal(2),
|
||||
cadence: z
|
||||
.string()
|
||||
.refine(isValidCronFormat, { message: 'Invalid cron' })
|
||||
.or(
|
||||
z.string().refine(isValidTrigger, {
|
||||
message: "Invalid trigger, expecting 'manual'",
|
||||
}),
|
||||
)
|
||||
.or(
|
||||
z.string().refine(isValidOptionalDurationString, {
|
||||
message: 'Invalid duration, expecting ISO Period',
|
||||
}),
|
||||
),
|
||||
timeoutAfterDuration: z.string().refine(isValidOptionalDurationString, {
|
||||
message: 'Invalid duration, expecting ISO Period',
|
||||
}),
|
||||
initialDelayDuration: z
|
||||
.string()
|
||||
.optional()
|
||||
.refine(isValidOptionalDurationString, {
|
||||
message: 'Invalid duration, expecting ISO Period',
|
||||
}),
|
||||
});
|
||||
|
||||
/**
|
||||
* The properties that control a scheduled task (version 2).
|
||||
*/
|
||||
export type TaskSettingsV2 = z.infer<typeof taskSettingsV2Schema>;
|
||||
@@ -1,113 +0,0 @@
|
||||
/*
|
||||
* Copyright 2021 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import knexFactory, { Knex } from 'knex';
|
||||
import { Duration } from 'luxon';
|
||||
import { delegateAbortController, nowPlus, sleep, validateId } from './util';
|
||||
|
||||
class KnexBuilder {
|
||||
public build(client: string): Knex {
|
||||
return knexFactory({ client, useNullAsDefault: true });
|
||||
}
|
||||
}
|
||||
|
||||
describe('util', () => {
|
||||
describe('validateId', () => {
|
||||
it.each(['a', 'a_b', 'ab123c_2', 'a!', 'A', 'a-b', 'a.b', '_a', 'a_'])(
|
||||
'accepts valid inputs, %p',
|
||||
async input => {
|
||||
expect(validateId(input)).toBeUndefined();
|
||||
},
|
||||
);
|
||||
|
||||
it.each(['', null, Symbol('a')])(
|
||||
'rejects invalid inputs, %p',
|
||||
async input => {
|
||||
expect(() => validateId(input as any)).toThrow();
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
describe('sleep', () => {
|
||||
it('finishes the wait as expected with no signal', async () => {
|
||||
const ac = new AbortController();
|
||||
const start = Date.now();
|
||||
await sleep(Duration.fromObject({ seconds: 1 }), ac.signal);
|
||||
expect(Date.now() - start).toBeGreaterThan(800);
|
||||
}, 5_000);
|
||||
|
||||
it('aborts properly on the signal', async () => {
|
||||
const ac = new AbortController();
|
||||
const promise = sleep(Duration.fromObject({ seconds: 10 }), ac.signal);
|
||||
ac.abort();
|
||||
await promise;
|
||||
expect(true).toBe(true);
|
||||
}, 1_000);
|
||||
});
|
||||
|
||||
describe('delegateAbortController', () => {
|
||||
it('inherits parent abort state', () => {
|
||||
const parent = new AbortController();
|
||||
const child = delegateAbortController(parent.signal);
|
||||
expect(parent.signal.aborted).toBe(false);
|
||||
expect(child.signal.aborted).toBe(false);
|
||||
parent.abort();
|
||||
expect(parent.signal.aborted).toBe(true);
|
||||
expect(child.signal.aborted).toBe(true);
|
||||
});
|
||||
|
||||
it('does not inherit from child to parent', () => {
|
||||
const parent = new AbortController();
|
||||
const child = delegateAbortController(parent.signal);
|
||||
expect(parent.signal.aborted).toBe(false);
|
||||
expect(child.signal.aborted).toBe(false);
|
||||
child.abort();
|
||||
expect(parent.signal.aborted).toBe(false);
|
||||
expect(child.signal.aborted).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
describe('nowPlus', () => {
|
||||
describe('without duration', () => {
|
||||
const databases = [
|
||||
{ client: 'sqlite3', expected: 'CURRENT_TIMESTAMP' },
|
||||
{ client: 'mysql2', expected: 'CURRENT_TIMESTAMP' },
|
||||
{ client: 'pg', expected: 'CURRENT_TIMESTAMP' },
|
||||
];
|
||||
|
||||
it.each(databases)('for client $client', ({ client, expected }) => {
|
||||
const knex = new KnexBuilder().build(client);
|
||||
const result = nowPlus(undefined, knex);
|
||||
|
||||
expect(result.toString()).toBe(expected);
|
||||
});
|
||||
});
|
||||
describe('With duration', () => {
|
||||
const databases = [
|
||||
{ client: 'sqlite3', expected: "datetime('now', '20 seconds')" },
|
||||
{ client: 'mysql2', expected: 'now() + interval 20 second' },
|
||||
{ client: 'pg', expected: "now() + interval '20 seconds'" },
|
||||
];
|
||||
it.each(databases)('for client $client', ({ client, expected }) => {
|
||||
const duration = Duration.fromObject({ seconds: 20 });
|
||||
const knex = new KnexBuilder().build(client);
|
||||
const result = nowPlus(duration, knex);
|
||||
|
||||
expect(result.toString()).toBe(expected);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -1,111 +0,0 @@
|
||||
/*
|
||||
* Copyright 2021 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { InputError } from '@backstage/errors';
|
||||
import { Knex } from 'knex';
|
||||
import { DateTime, Duration } from 'luxon';
|
||||
|
||||
// Keep the IDs compatible with e.g. Prometheus labels
|
||||
export function validateId(id: string) {
|
||||
if (typeof id !== 'string' || !id.trim()) {
|
||||
throw new InputError(`${id} is not a valid ID, expected non-empty string`);
|
||||
}
|
||||
}
|
||||
|
||||
export function dbTime(t: Date | string): DateTime {
|
||||
if (typeof t === 'string') {
|
||||
return DateTime.fromSQL(t);
|
||||
}
|
||||
return DateTime.fromJSDate(t);
|
||||
}
|
||||
|
||||
export function nowPlus(duration: Duration | undefined, knex: Knex) {
|
||||
const seconds = duration?.as('seconds') ?? 0;
|
||||
if (!seconds) {
|
||||
return knex.fn.now();
|
||||
}
|
||||
|
||||
if (knex.client.config.client.includes('sqlite3')) {
|
||||
return knex.raw(`datetime('now', ?)`, [`${seconds} seconds`]);
|
||||
}
|
||||
|
||||
if (knex.client.config.client.includes('mysql')) {
|
||||
return knex.raw(`now() + interval ${seconds} second`);
|
||||
}
|
||||
|
||||
return knex.raw(`now() + interval '${seconds} seconds'`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sleep for the given duration, but return sooner if the abort signal
|
||||
* triggers.
|
||||
*
|
||||
* @param duration - The amount of time to sleep, at most
|
||||
* @param abortSignal - An optional abort signal that short circuits the wait
|
||||
*/
|
||||
export async function sleep(
|
||||
duration: Duration,
|
||||
abortSignal?: AbortSignal,
|
||||
): Promise<void> {
|
||||
if (abortSignal?.aborted) {
|
||||
return;
|
||||
}
|
||||
|
||||
await new Promise<void>(resolve => {
|
||||
let timeoutHandle: NodeJS.Timeout | undefined = undefined;
|
||||
|
||||
const done = () => {
|
||||
if (timeoutHandle) {
|
||||
clearTimeout(timeoutHandle);
|
||||
}
|
||||
abortSignal?.removeEventListener('abort', done);
|
||||
resolve();
|
||||
};
|
||||
|
||||
timeoutHandle = setTimeout(done, duration.as('milliseconds'));
|
||||
abortSignal?.addEventListener('abort', done);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new AbortController that, in addition to working as a regular
|
||||
* standalone controller, also gets aborted if the given parent signal
|
||||
* reaches aborted state.
|
||||
*
|
||||
* @param parent - The "parent" signal that can trigger the delegate
|
||||
*/
|
||||
export function delegateAbortController(parent?: AbortSignal): AbortController {
|
||||
const delegate = new AbortController();
|
||||
|
||||
if (parent) {
|
||||
if (parent.aborted) {
|
||||
delegate.abort();
|
||||
} else {
|
||||
const onParentAborted = () => {
|
||||
delegate.abort();
|
||||
};
|
||||
|
||||
const onChildAborted = () => {
|
||||
parent.removeEventListener('abort', onParentAborted);
|
||||
};
|
||||
|
||||
parent.addEventListener('abort', onParentAborted, { once: true });
|
||||
delegate.signal.addEventListener('abort', onChildAborted, { once: true });
|
||||
}
|
||||
}
|
||||
|
||||
return delegate;
|
||||
}
|
||||
@@ -34,7 +34,6 @@ import { version as root } from '../../../../package.json';
|
||||
import { version as appDefaults } from '../../../app-defaults/package.json';
|
||||
import { version as backendCommon } from '../../../backend-common/package.json';
|
||||
import { version as backendDefaults } from '../../../backend-defaults/package.json';
|
||||
import { version as backendTasks } from '../../../backend-tasks/package.json';
|
||||
import { version as catalogClient } from '../../../catalog-client/package.json';
|
||||
import { version as catalogModel } from '../../../catalog-model/package.json';
|
||||
import { version as cli } from '../../../cli/package.json';
|
||||
@@ -91,7 +90,6 @@ export const packageVersions = {
|
||||
'@backstage/app-defaults': appDefaults,
|
||||
'@backstage/backend-common': backendCommon,
|
||||
'@backstage/backend-defaults': backendDefaults,
|
||||
'@backstage/backend-tasks': backendTasks,
|
||||
'@backstage/catalog-client': catalogClient,
|
||||
'@backstage/catalog-model': catalogModel,
|
||||
'@backstage/cli': cli,
|
||||
|
||||
@@ -18,7 +18,6 @@
|
||||
"dependencies": {
|
||||
"@backstage/backend-common": "^{{version '@backstage/backend-common'}}",
|
||||
"@backstage/backend-defaults": "^{{version '@backstage/backend-defaults'}}",
|
||||
"@backstage/backend-tasks": "^{{version '@backstage/backend-tasks'}}",
|
||||
"@backstage/config": "^{{version '@backstage/config'}}",
|
||||
"@backstage/plugin-app-backend": "^{{version '@backstage/plugin-app-backend'}}",
|
||||
"@backstage/plugin-auth-backend": "^{{version '@backstage/plugin-auth-backend'}}",
|
||||
|
||||
@@ -3771,29 +3771,6 @@ __metadata:
|
||||
languageName: unknown
|
||||
linkType: soft
|
||||
|
||||
"@backstage/backend-tasks@workspace:packages/backend-tasks":
|
||||
version: 0.0.0-use.local
|
||||
resolution: "@backstage/backend-tasks@workspace:packages/backend-tasks"
|
||||
dependencies:
|
||||
"@backstage/backend-common": "workspace:^"
|
||||
"@backstage/backend-plugin-api": "workspace:^"
|
||||
"@backstage/backend-test-utils": "workspace:^"
|
||||
"@backstage/cli": "workspace:^"
|
||||
"@backstage/config": "workspace:^"
|
||||
"@backstage/errors": "workspace:^"
|
||||
"@backstage/types": "workspace:^"
|
||||
"@opentelemetry/api": ^1.3.0
|
||||
"@types/luxon": ^3.0.0
|
||||
cron: ^3.0.0
|
||||
knex: ^3.0.0
|
||||
lodash: ^4.17.21
|
||||
luxon: ^3.0.0
|
||||
uuid: ^9.0.0
|
||||
wait-for-expect: ^3.0.2
|
||||
zod: ^3.22.4
|
||||
languageName: unknown
|
||||
linkType: soft
|
||||
|
||||
"@backstage/backend-test-utils@workspace:^, @backstage/backend-test-utils@workspace:packages/backend-test-utils":
|
||||
version: 0.0.0-use.local
|
||||
resolution: "@backstage/backend-test-utils@workspace:packages/backend-test-utils"
|
||||
|
||||
Reference in New Issue
Block a user