Allow manual trigger frequency in backend-tasks
Signed-off-by: Tim Hansen <timbonicush@spotify.com>
This commit is contained in:
@@ -0,0 +1,6 @@
|
||||
---
|
||||
'@backstage/backend-plugin-api': patch
|
||||
'@backstage/backend-defaults': patch
|
||||
---
|
||||
|
||||
The `SchedulerService` now allows tasks with `frequency: { trigger: 'manual' }`. This means that the task will not be scheduled, but rather run only when manually triggered with `SchedulerService.triggerTask`.
|
||||
@@ -0,0 +1,5 @@
|
||||
---
|
||||
'@backstage/backend-tasks': patch
|
||||
---
|
||||
|
||||
The `PluginTaskScheduler` now allows tasks with `frequency: { trigger: 'manual' }`. This means that the task will not be scheduled, but rather run only when manually triggered with `PluginTaskScheduler.triggerTask`.
|
||||
@@ -0,0 +1,37 @@
|
||||
/*
|
||||
* Copyright 2024 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
// @ts-check
|
||||
|
||||
/**
|
||||
* @param { import("knex").Knex } knex
|
||||
* @returns { Promise<void> }
|
||||
*/
|
||||
exports.up = async function up(knex) {
|
||||
await knex.schema.alterTable('backstage_backend_tasks__tasks', table => {
|
||||
table.setNullable('next_run_start_at');
|
||||
});
|
||||
};
|
||||
|
||||
/**
|
||||
* @param { import("knex").Knex } knex
|
||||
* @returns { Promise<void> }
|
||||
*/
|
||||
exports.down = async function down(knex) {
|
||||
await knex.schema.alterTable('backstage_backend_tasks__tasks', table => {
|
||||
table.dropNullable('next_run_start_at');
|
||||
});
|
||||
};
|
||||
+3
@@ -347,6 +347,9 @@ describe('PluginTaskManagerImpl', () => {
|
||||
expect(parseDuration({ milliseconds: 5000 })).toEqual('PT5S');
|
||||
expect(parseDuration(Duration.fromMillis(5000))).toEqual('PT5S');
|
||||
expect(parseDuration({ cron: '1 * * * *' })).toEqual('1 * * * *');
|
||||
expect(parseDuration({ trigger: 'manual' })).toEqual({
|
||||
trigger: 'manual',
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -155,6 +155,9 @@ export function parseDuration(
|
||||
if (typeof frequency === 'object' && 'cron' in frequency) {
|
||||
return frequency.cron;
|
||||
}
|
||||
if (typeof frequency === 'object' && 'trigger' in frequency) {
|
||||
return frequency.trigger;
|
||||
}
|
||||
|
||||
const parsed = Duration.isDuration(frequency)
|
||||
? frequency
|
||||
|
||||
@@ -503,4 +503,32 @@ describe('TaskWorker', () => {
|
||||
await knex.destroy();
|
||||
},
|
||||
);
|
||||
|
||||
it.each(databases.eachSupportedId())(
|
||||
'next_run_start_at is not set for manually-triggered tasks, %p',
|
||||
async databaseId => {
|
||||
const knex = await databases.init(databaseId);
|
||||
await migrateBackendTasks(knex);
|
||||
|
||||
const fn = jest.fn(
|
||||
async () => new Promise<void>(resolve => setTimeout(resolve, 50)),
|
||||
);
|
||||
|
||||
const initialSettings: TaskSettingsV2 = {
|
||||
version: 2,
|
||||
cadence: 'manual',
|
||||
timeoutAfterDuration: 'PT1M',
|
||||
};
|
||||
|
||||
const worker = new TaskWorker('task99', fn, knex, logger);
|
||||
await worker.persistTask(initialSettings);
|
||||
await worker.tryClaimTask('ticket', initialSettings);
|
||||
await worker.tryReleaseTask('ticket', initialSettings);
|
||||
|
||||
const row = (await knex<DbTasksRow>(DB_TASKS_TABLE))[0];
|
||||
expect(row.next_run_start_at).toBeNull();
|
||||
|
||||
await knex.destroy();
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
@@ -53,8 +53,8 @@ export class TaskWorker {
|
||||
);
|
||||
|
||||
let workCheckFrequency = this.workCheckFrequency;
|
||||
const isCron = !settings?.cadence.startsWith('P');
|
||||
if (!isCron) {
|
||||
const isDuration = settings?.cadence.startsWith('P');
|
||||
if (isDuration) {
|
||||
const cadence = Duration.fromISO(settings.cadence);
|
||||
if (cadence < workCheckFrequency) {
|
||||
workCheckFrequency = cadence;
|
||||
@@ -175,7 +175,9 @@ export class TaskWorker {
|
||||
// read it back again.
|
||||
taskSettingsV2Schema.parse(settings);
|
||||
|
||||
const isCron = !settings?.cadence.startsWith('P');
|
||||
const isManual = settings?.cadence === 'manual';
|
||||
const isDuration = settings?.cadence.startsWith('P');
|
||||
const isCron = !isManual && !isDuration;
|
||||
|
||||
let startAt: Knex.Raw | undefined;
|
||||
let nextStartAt: Knex.Raw | undefined;
|
||||
@@ -194,6 +196,9 @@ export class TaskWorker {
|
||||
|
||||
nextStartAt = this.nextRunAtRaw(time);
|
||||
startAt ||= nextStartAt;
|
||||
} else if (isManual) {
|
||||
nextStartAt = this.knex.raw('null');
|
||||
startAt ||= nextStartAt;
|
||||
} else {
|
||||
startAt ||= this.knex.fn.now();
|
||||
nextStartAt = nowPlus(Duration.fromISO(settings.cadence), this.knex);
|
||||
@@ -317,7 +322,9 @@ export class TaskWorker {
|
||||
ticket: string,
|
||||
settings: TaskSettingsV2,
|
||||
): Promise<boolean> {
|
||||
const isCron = !settings?.cadence.startsWith('P');
|
||||
const isManual = settings?.cadence === 'manual';
|
||||
const isDuration = settings?.cadence.startsWith('P');
|
||||
const isCron = !isManual && !isDuration;
|
||||
|
||||
let nextRun: Knex.Raw;
|
||||
if (isCron) {
|
||||
@@ -325,6 +332,8 @@ export class TaskWorker {
|
||||
this.logger.debug(`task: ${this.taskId} will next occur around ${time}`);
|
||||
|
||||
nextRun = this.nextRunAtRaw(time);
|
||||
} else if (isManual) {
|
||||
nextRun = this.knex.raw('null');
|
||||
} else {
|
||||
const dt = Duration.fromISO(settings.cadence).as('seconds');
|
||||
this.logger.debug(
|
||||
|
||||
@@ -40,6 +40,10 @@ function isValidCronFormat(c: string | undefined): boolean {
|
||||
}
|
||||
}
|
||||
|
||||
function isValidTrigger(t: string): boolean {
|
||||
return t === 'manual';
|
||||
}
|
||||
|
||||
export const taskSettingsV1Schema = z.object({
|
||||
version: z.literal(1),
|
||||
initialDelayDuration: z
|
||||
@@ -68,6 +72,11 @@ export const taskSettingsV2Schema = z.object({
|
||||
cadence: z
|
||||
.string()
|
||||
.refine(isValidCronFormat, { message: 'Invalid cron' })
|
||||
.or(
|
||||
z.string().refine(isValidTrigger, {
|
||||
message: "Invalid trigger, expecting 'manual'",
|
||||
}),
|
||||
)
|
||||
.or(
|
||||
z.string().refine(isValidOptionalDurationString, {
|
||||
message: 'Invalid duration, expecting ISO Period',
|
||||
|
||||
@@ -96,7 +96,8 @@ export interface SchedulerServiceTaskScheduleDefinition {
|
||||
cron: string;
|
||||
}
|
||||
| Duration
|
||||
| HumanDuration;
|
||||
| HumanDuration
|
||||
| { trigger: 'manual' };
|
||||
|
||||
/**
|
||||
* The maximum amount of time that a single task invocation can take, before
|
||||
@@ -193,7 +194,12 @@ export interface SchedulerServiceTaskScheduleDefinitionConfig {
|
||||
cron: string;
|
||||
}
|
||||
| string
|
||||
| HumanDuration;
|
||||
| HumanDuration
|
||||
/**
|
||||
* This task will only run when manually triggered with the `triggerTask` method; no automatic
|
||||
* scheduling. This is useful for locking of global tasks that should not be run concurrently.
|
||||
*/
|
||||
| { trigger: 'manual' };
|
||||
|
||||
/**
|
||||
* The maximum amount of time that a single task invocation can take, before
|
||||
@@ -361,14 +367,20 @@ function readDuration(config: Config, key: string): HumanDuration {
|
||||
return readDurationFromConfig(config, { key });
|
||||
}
|
||||
|
||||
function readCronOrDuration(
|
||||
function readFrequency(
|
||||
config: Config,
|
||||
key: string,
|
||||
): { cron: string } | HumanDuration {
|
||||
): { cron: string } | HumanDuration | { trigger: 'manual' } {
|
||||
const value = config.get(key);
|
||||
if (typeof value === 'object' && (value as { cron?: string }).cron) {
|
||||
return value as { cron: string };
|
||||
}
|
||||
if (
|
||||
typeof value === 'object' &&
|
||||
(value as { trigger?: string }).trigger === 'manual'
|
||||
) {
|
||||
return { trigger: 'manual' };
|
||||
}
|
||||
|
||||
return readDuration(config, key);
|
||||
}
|
||||
@@ -383,7 +395,7 @@ function readCronOrDuration(
|
||||
export function readSchedulerServiceTaskScheduleDefinitionFromConfig(
|
||||
config: Config,
|
||||
): SchedulerServiceTaskScheduleDefinition {
|
||||
const frequency = readCronOrDuration(config, 'frequency');
|
||||
const frequency = readFrequency(config, 'frequency');
|
||||
const timeout = readDuration(config, 'timeout');
|
||||
|
||||
const initialDelay = config.has('initialDelay')
|
||||
|
||||
@@ -152,6 +152,9 @@ export function parseDuration(
|
||||
if ('cron' in frequency) {
|
||||
return frequency.cron;
|
||||
}
|
||||
if ('trigger' in frequency) {
|
||||
return frequency.trigger;
|
||||
}
|
||||
|
||||
const parsed = Duration.isDuration(frequency)
|
||||
? frequency
|
||||
|
||||
@@ -503,4 +503,32 @@ describe('TaskWorker', () => {
|
||||
await knex.destroy();
|
||||
},
|
||||
);
|
||||
|
||||
it.each(databases.eachSupportedId())(
|
||||
'next_run_start_at is not set for manually-triggered tasks, %p',
|
||||
async databaseId => {
|
||||
const knex = await databases.init(databaseId);
|
||||
await migrateBackendTasks(knex);
|
||||
|
||||
const fn = jest.fn(
|
||||
async () => new Promise<void>(resolve => setTimeout(resolve, 50)),
|
||||
);
|
||||
|
||||
const initialSettings: TaskSettingsV2 = {
|
||||
version: 2,
|
||||
cadence: 'manual',
|
||||
timeoutAfterDuration: 'PT1M',
|
||||
};
|
||||
|
||||
const worker = new TaskWorker('task99', fn, knex, logger);
|
||||
await worker.persistTask(initialSettings);
|
||||
await worker.tryClaimTask('ticket', initialSettings);
|
||||
await worker.tryReleaseTask('ticket', initialSettings);
|
||||
|
||||
const row = (await knex<DbTasksRow>(DB_TASKS_TABLE))[0];
|
||||
expect(row.next_run_start_at).toBeNull();
|
||||
|
||||
await knex.destroy();
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
@@ -52,8 +52,8 @@ export class TaskWorker {
|
||||
);
|
||||
|
||||
let workCheckFrequency = this.workCheckFrequency;
|
||||
const isCron = !settings?.cadence.startsWith('P');
|
||||
if (!isCron) {
|
||||
const isDuration = settings?.cadence.startsWith('P');
|
||||
if (isDuration) {
|
||||
const cadence = Duration.fromISO(settings.cadence);
|
||||
if (cadence < workCheckFrequency) {
|
||||
workCheckFrequency = cadence;
|
||||
@@ -174,7 +174,9 @@ export class TaskWorker {
|
||||
// read it back again.
|
||||
taskSettingsV2Schema.parse(settings);
|
||||
|
||||
const isCron = !settings?.cadence.startsWith('P');
|
||||
const isManual = settings?.cadence === 'manual';
|
||||
const isDuration = settings?.cadence.startsWith('P');
|
||||
const isCron = !isManual && !isDuration;
|
||||
|
||||
let startAt: Knex.Raw | undefined;
|
||||
let nextStartAt: Knex.Raw | undefined;
|
||||
@@ -193,6 +195,9 @@ export class TaskWorker {
|
||||
|
||||
nextStartAt = this.nextRunAtRaw(time);
|
||||
startAt ||= nextStartAt;
|
||||
} else if (isManual) {
|
||||
nextStartAt = this.knex.raw('null');
|
||||
startAt ||= nextStartAt;
|
||||
} else {
|
||||
startAt ||= this.knex.fn.now();
|
||||
nextStartAt = nowPlus(Duration.fromISO(settings.cadence), this.knex);
|
||||
@@ -316,7 +321,9 @@ export class TaskWorker {
|
||||
ticket: string,
|
||||
settings: TaskSettingsV2,
|
||||
): Promise<boolean> {
|
||||
const isCron = !settings?.cadence.startsWith('P');
|
||||
const isManual = settings?.cadence === 'manual';
|
||||
const isDuration = settings?.cadence.startsWith('P');
|
||||
const isCron = !isManual && !isDuration;
|
||||
|
||||
let nextRun: Knex.Raw;
|
||||
if (isCron) {
|
||||
@@ -324,6 +331,8 @@ export class TaskWorker {
|
||||
this.logger.debug(`task: ${this.taskId} will next occur around ${time}`);
|
||||
|
||||
nextRun = this.nextRunAtRaw(time);
|
||||
} else if (isManual) {
|
||||
nextRun = this.knex.raw('null');
|
||||
} else {
|
||||
const dt = Duration.fromISO(settings.cadence).as('seconds');
|
||||
this.logger.debug(
|
||||
|
||||
@@ -32,14 +32,20 @@ function readDuration(config: Config, key: string): HumanDuration {
|
||||
return readDurationFromConfig(config, { key });
|
||||
}
|
||||
|
||||
function readCronOrDuration(
|
||||
function readFrequency(
|
||||
config: Config,
|
||||
key: string,
|
||||
): { cron: string } | Duration | HumanDuration {
|
||||
): { cron: string } | Duration | HumanDuration | { trigger: 'manual' } {
|
||||
const value = config.get(key);
|
||||
if (typeof value === 'object' && (value as { cron?: string }).cron) {
|
||||
return value as { cron: string };
|
||||
}
|
||||
if (
|
||||
typeof value === 'object' &&
|
||||
(value as { trigger?: string }).trigger === 'manual'
|
||||
) {
|
||||
return { trigger: 'manual' };
|
||||
}
|
||||
|
||||
return readDuration(config, key);
|
||||
}
|
||||
@@ -56,7 +62,7 @@ function readCronOrDuration(
|
||||
export function readTaskScheduleDefinitionFromConfig(
|
||||
config: Config,
|
||||
): TaskScheduleDefinition {
|
||||
const frequency = readCronOrDuration(config, 'frequency');
|
||||
const frequency = readFrequency(config, 'frequency');
|
||||
const timeout = readDuration(config, 'timeout');
|
||||
|
||||
const initialDelay = config.has('initialDelay')
|
||||
|
||||
@@ -100,7 +100,12 @@ export interface TaskScheduleDefinition {
|
||||
cron: string;
|
||||
}
|
||||
| Duration
|
||||
| HumanDuration;
|
||||
| HumanDuration
|
||||
/**
|
||||
* This task will only run when manually triggered with the `triggerTask` method; no automatic
|
||||
* scheduling. This is useful for locking of global tasks that should not be run concurrently.
|
||||
*/
|
||||
| { trigger: 'manual' };
|
||||
|
||||
/**
|
||||
* The maximum amount of time that a single task invocation can take, before
|
||||
|
||||
Reference in New Issue
Block a user