Allow manual trigger frequency in backend-tasks

Signed-off-by: Tim Hansen <timbonicush@spotify.com>
This commit is contained in:
Tim Hansen
2024-07-12 15:24:05 -06:00
parent 22cb1fcf2a
commit ba9abf4fba
14 changed files with 180 additions and 17 deletions
+6
View File
@@ -0,0 +1,6 @@
---
'@backstage/backend-plugin-api': patch
'@backstage/backend-defaults': patch
---
The `SchedulerService` now allows tasks with `frequency: { trigger: 'manual' }`. This means that the task will not be scheduled, but rather run only when manually triggered with `SchedulerService.triggerTask`.
+5
View File
@@ -0,0 +1,5 @@
---
'@backstage/backend-tasks': patch
---
The `PluginTaskScheduler` now allows tasks with `frequency: { trigger: 'manual' }`. This means that the task will not be scheduled, but rather run only when manually triggered with `PluginTaskScheduler.triggerTask`.
@@ -0,0 +1,37 @@
/*
* Copyright 2024 The Backstage Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// @ts-check
/**
* @param { import("knex").Knex } knex
* @returns { Promise<void> }
*/
exports.up = async function up(knex) {
await knex.schema.alterTable('backstage_backend_tasks__tasks', table => {
table.setNullable('next_run_start_at');
});
};
/**
* @param { import("knex").Knex } knex
* @returns { Promise<void> }
*/
exports.down = async function down(knex) {
await knex.schema.alterTable('backstage_backend_tasks__tasks', table => {
table.dropNullable('next_run_start_at');
});
};
@@ -347,6 +347,9 @@ describe('PluginTaskManagerImpl', () => {
expect(parseDuration({ milliseconds: 5000 })).toEqual('PT5S');
expect(parseDuration(Duration.fromMillis(5000))).toEqual('PT5S');
expect(parseDuration({ cron: '1 * * * *' })).toEqual('1 * * * *');
expect(parseDuration({ trigger: 'manual' })).toEqual({
trigger: 'manual',
});
});
});
});
@@ -155,6 +155,9 @@ export function parseDuration(
if (typeof frequency === 'object' && 'cron' in frequency) {
return frequency.cron;
}
if (typeof frequency === 'object' && 'trigger' in frequency) {
return frequency.trigger;
}
const parsed = Duration.isDuration(frequency)
? frequency
@@ -503,4 +503,32 @@ describe('TaskWorker', () => {
await knex.destroy();
},
);
it.each(databases.eachSupportedId())(
'next_run_start_at is not set for manually-triggered tasks, %p',
async databaseId => {
const knex = await databases.init(databaseId);
await migrateBackendTasks(knex);
const fn = jest.fn(
async () => new Promise<void>(resolve => setTimeout(resolve, 50)),
);
const initialSettings: TaskSettingsV2 = {
version: 2,
cadence: 'manual',
timeoutAfterDuration: 'PT1M',
};
const worker = new TaskWorker('task99', fn, knex, logger);
await worker.persistTask(initialSettings);
await worker.tryClaimTask('ticket', initialSettings);
await worker.tryReleaseTask('ticket', initialSettings);
const row = (await knex<DbTasksRow>(DB_TASKS_TABLE))[0];
expect(row.next_run_start_at).toBeNull();
await knex.destroy();
},
);
});
@@ -53,8 +53,8 @@ export class TaskWorker {
);
let workCheckFrequency = this.workCheckFrequency;
const isCron = !settings?.cadence.startsWith('P');
if (!isCron) {
const isDuration = settings?.cadence.startsWith('P');
if (isDuration) {
const cadence = Duration.fromISO(settings.cadence);
if (cadence < workCheckFrequency) {
workCheckFrequency = cadence;
@@ -175,7 +175,9 @@ export class TaskWorker {
// read it back again.
taskSettingsV2Schema.parse(settings);
const isCron = !settings?.cadence.startsWith('P');
const isManual = settings?.cadence === 'manual';
const isDuration = settings?.cadence.startsWith('P');
const isCron = !isManual && !isDuration;
let startAt: Knex.Raw | undefined;
let nextStartAt: Knex.Raw | undefined;
@@ -194,6 +196,9 @@ export class TaskWorker {
nextStartAt = this.nextRunAtRaw(time);
startAt ||= nextStartAt;
} else if (isManual) {
nextStartAt = this.knex.raw('null');
startAt ||= nextStartAt;
} else {
startAt ||= this.knex.fn.now();
nextStartAt = nowPlus(Duration.fromISO(settings.cadence), this.knex);
@@ -317,7 +322,9 @@ export class TaskWorker {
ticket: string,
settings: TaskSettingsV2,
): Promise<boolean> {
const isCron = !settings?.cadence.startsWith('P');
const isManual = settings?.cadence === 'manual';
const isDuration = settings?.cadence.startsWith('P');
const isCron = !isManual && !isDuration;
let nextRun: Knex.Raw;
if (isCron) {
@@ -325,6 +332,8 @@ export class TaskWorker {
this.logger.debug(`task: ${this.taskId} will next occur around ${time}`);
nextRun = this.nextRunAtRaw(time);
} else if (isManual) {
nextRun = this.knex.raw('null');
} else {
const dt = Duration.fromISO(settings.cadence).as('seconds');
this.logger.debug(
@@ -40,6 +40,10 @@ function isValidCronFormat(c: string | undefined): boolean {
}
}
function isValidTrigger(t: string): boolean {
return t === 'manual';
}
export const taskSettingsV1Schema = z.object({
version: z.literal(1),
initialDelayDuration: z
@@ -68,6 +72,11 @@ export const taskSettingsV2Schema = z.object({
cadence: z
.string()
.refine(isValidCronFormat, { message: 'Invalid cron' })
.or(
z.string().refine(isValidTrigger, {
message: "Invalid trigger, expecting 'manual'",
}),
)
.or(
z.string().refine(isValidOptionalDurationString, {
message: 'Invalid duration, expecting ISO Period',
@@ -96,7 +96,8 @@ export interface SchedulerServiceTaskScheduleDefinition {
cron: string;
}
| Duration
| HumanDuration;
| HumanDuration
| { trigger: 'manual' };
/**
* The maximum amount of time that a single task invocation can take, before
@@ -193,7 +194,12 @@ export interface SchedulerServiceTaskScheduleDefinitionConfig {
cron: string;
}
| string
| HumanDuration;
| HumanDuration
/**
* This task will only run when manually triggered with the `triggerTask` method; no automatic
* scheduling. This is useful for locking of global tasks that should not be run concurrently.
*/
| { trigger: 'manual' };
/**
* The maximum amount of time that a single task invocation can take, before
@@ -361,14 +367,20 @@ function readDuration(config: Config, key: string): HumanDuration {
return readDurationFromConfig(config, { key });
}
function readCronOrDuration(
function readFrequency(
config: Config,
key: string,
): { cron: string } | HumanDuration {
): { cron: string } | HumanDuration | { trigger: 'manual' } {
const value = config.get(key);
if (typeof value === 'object' && (value as { cron?: string }).cron) {
return value as { cron: string };
}
if (
typeof value === 'object' &&
(value as { trigger?: string }).trigger === 'manual'
) {
return { trigger: 'manual' };
}
return readDuration(config, key);
}
@@ -383,7 +395,7 @@ function readCronOrDuration(
export function readSchedulerServiceTaskScheduleDefinitionFromConfig(
config: Config,
): SchedulerServiceTaskScheduleDefinition {
const frequency = readCronOrDuration(config, 'frequency');
const frequency = readFrequency(config, 'frequency');
const timeout = readDuration(config, 'timeout');
const initialDelay = config.has('initialDelay')
@@ -152,6 +152,9 @@ export function parseDuration(
if ('cron' in frequency) {
return frequency.cron;
}
if ('trigger' in frequency) {
return frequency.trigger;
}
const parsed = Duration.isDuration(frequency)
? frequency
@@ -503,4 +503,32 @@ describe('TaskWorker', () => {
await knex.destroy();
},
);
it.each(databases.eachSupportedId())(
'next_run_start_at is not set for manually-triggered tasks, %p',
async databaseId => {
const knex = await databases.init(databaseId);
await migrateBackendTasks(knex);
const fn = jest.fn(
async () => new Promise<void>(resolve => setTimeout(resolve, 50)),
);
const initialSettings: TaskSettingsV2 = {
version: 2,
cadence: 'manual',
timeoutAfterDuration: 'PT1M',
};
const worker = new TaskWorker('task99', fn, knex, logger);
await worker.persistTask(initialSettings);
await worker.tryClaimTask('ticket', initialSettings);
await worker.tryReleaseTask('ticket', initialSettings);
const row = (await knex<DbTasksRow>(DB_TASKS_TABLE))[0];
expect(row.next_run_start_at).toBeNull();
await knex.destroy();
},
);
});
+13 -4
View File
@@ -52,8 +52,8 @@ export class TaskWorker {
);
let workCheckFrequency = this.workCheckFrequency;
const isCron = !settings?.cadence.startsWith('P');
if (!isCron) {
const isDuration = settings?.cadence.startsWith('P');
if (isDuration) {
const cadence = Duration.fromISO(settings.cadence);
if (cadence < workCheckFrequency) {
workCheckFrequency = cadence;
@@ -174,7 +174,9 @@ export class TaskWorker {
// read it back again.
taskSettingsV2Schema.parse(settings);
const isCron = !settings?.cadence.startsWith('P');
const isManual = settings?.cadence === 'manual';
const isDuration = settings?.cadence.startsWith('P');
const isCron = !isManual && !isDuration;
let startAt: Knex.Raw | undefined;
let nextStartAt: Knex.Raw | undefined;
@@ -193,6 +195,9 @@ export class TaskWorker {
nextStartAt = this.nextRunAtRaw(time);
startAt ||= nextStartAt;
} else if (isManual) {
nextStartAt = this.knex.raw('null');
startAt ||= nextStartAt;
} else {
startAt ||= this.knex.fn.now();
nextStartAt = nowPlus(Duration.fromISO(settings.cadence), this.knex);
@@ -316,7 +321,9 @@ export class TaskWorker {
ticket: string,
settings: TaskSettingsV2,
): Promise<boolean> {
const isCron = !settings?.cadence.startsWith('P');
const isManual = settings?.cadence === 'manual';
const isDuration = settings?.cadence.startsWith('P');
const isCron = !isManual && !isDuration;
let nextRun: Knex.Raw;
if (isCron) {
@@ -324,6 +331,8 @@ export class TaskWorker {
this.logger.debug(`task: ${this.taskId} will next occur around ${time}`);
nextRun = this.nextRunAtRaw(time);
} else if (isManual) {
nextRun = this.knex.raw('null');
} else {
const dt = Duration.fromISO(settings.cadence).as('seconds');
this.logger.debug(
@@ -32,14 +32,20 @@ function readDuration(config: Config, key: string): HumanDuration {
return readDurationFromConfig(config, { key });
}
function readCronOrDuration(
function readFrequency(
config: Config,
key: string,
): { cron: string } | Duration | HumanDuration {
): { cron: string } | Duration | HumanDuration | { trigger: 'manual' } {
const value = config.get(key);
if (typeof value === 'object' && (value as { cron?: string }).cron) {
return value as { cron: string };
}
if (
typeof value === 'object' &&
(value as { trigger?: string }).trigger === 'manual'
) {
return { trigger: 'manual' };
}
return readDuration(config, key);
}
@@ -56,7 +62,7 @@ function readCronOrDuration(
export function readTaskScheduleDefinitionFromConfig(
config: Config,
): TaskScheduleDefinition {
const frequency = readCronOrDuration(config, 'frequency');
const frequency = readFrequency(config, 'frequency');
const timeout = readDuration(config, 'timeout');
const initialDelay = config.has('initialDelay')
+6 -1
View File
@@ -100,7 +100,12 @@ export interface TaskScheduleDefinition {
cron: string;
}
| Duration
| HumanDuration;
| HumanDuration
/**
* This task will only run when manually triggered with the `triggerTask` method; no automatic
* scheduling. This is useful for locking of global tasks that should not be run concurrently.
*/
| { trigger: 'manual' };
/**
* The maximum amount of time that a single task invocation can take, before