Add cancelTask to SchedulerService for cancelling running tasks
Adds the ability to cancel currently running scheduled tasks via a new cancelTask method on the SchedulerService interface. For global (distributed) tasks, the database lock is released and a periodic liveness check detects the lost ticket and aborts the task function's AbortSignal. For local tasks, the abort signal is triggered directly. Also adds a REST endpoint at POST /.backstage/scheduler/v1/tasks/:id/cancel. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Fredrik Adelöw <freben@spotify.com>
This commit is contained in:
@@ -0,0 +1,6 @@
|
||||
---
|
||||
'@backstage/backend-plugin-api': minor
|
||||
'@backstage/backend-defaults': patch
|
||||
---
|
||||
|
||||
Added `cancelTask` method to the `SchedulerService` interface and implementation, allowing cancellation of currently running scheduled tasks. For global tasks, the database lock is released and a periodic liveness check aborts the running task function. For local tasks, the task's abort signal is triggered directly. A new `POST /.backstage/scheduler/v1/tasks/:id/cancel` endpoint is also available.
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
import { LocalTaskWorker } from './LocalTaskWorker';
|
||||
import { mockServices } from '@backstage/backend-test-utils';
|
||||
import { ConflictError } from '@backstage/errors';
|
||||
import waitFor from 'wait-for-expect';
|
||||
|
||||
jest.setTimeout(10_000);
|
||||
@@ -110,6 +111,54 @@ describe('LocalTaskWorker', () => {
|
||||
controller.abort();
|
||||
});
|
||||
|
||||
it('can cancel a running task', async () => {
|
||||
let receivedSignal: AbortSignal | undefined;
|
||||
const fn = jest.fn(async (signal: AbortSignal) => {
|
||||
receivedSignal = signal;
|
||||
await new Promise(r => setTimeout(r, 5000));
|
||||
});
|
||||
const controller = new AbortController();
|
||||
|
||||
const worker = new LocalTaskWorker('a', fn, logger);
|
||||
worker.start(
|
||||
{
|
||||
version: 2,
|
||||
cadence: 'PT10S',
|
||||
timeoutAfterDuration: 'PT10S',
|
||||
},
|
||||
{ signal: controller.signal },
|
||||
);
|
||||
|
||||
await waitFor(() => {
|
||||
expect(fn).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
expect(receivedSignal?.aborted).toBe(false);
|
||||
worker.cancel();
|
||||
expect(receivedSignal?.aborted).toBe(true);
|
||||
|
||||
controller.abort();
|
||||
});
|
||||
|
||||
it('cannot cancel a task that is not running', async () => {
|
||||
const fn = jest.fn();
|
||||
const controller = new AbortController();
|
||||
|
||||
const worker = new LocalTaskWorker('a', fn, logger);
|
||||
worker.start(
|
||||
{
|
||||
version: 2,
|
||||
initialDelayDuration: 'PT1000S',
|
||||
cadence: 'PT10S',
|
||||
timeoutAfterDuration: 'PT10S',
|
||||
},
|
||||
{ signal: controller.signal },
|
||||
);
|
||||
|
||||
expect(() => worker.cancel()).toThrow(ConflictError);
|
||||
controller.abort();
|
||||
});
|
||||
|
||||
it('goes through the expected states', async () => {
|
||||
const fn = jest
|
||||
.fn()
|
||||
|
||||
@@ -29,6 +29,7 @@ import { delegateAbortController, serializeError, sleep } from './util';
|
||||
*/
|
||||
export class LocalTaskWorker {
|
||||
private abortWait: AbortController | undefined;
|
||||
private taskAbortController: AbortController | undefined;
|
||||
#taskState: Exclude<TaskApiTasksResponse['taskState'], null> = {
|
||||
status: 'idle',
|
||||
};
|
||||
@@ -93,6 +94,13 @@ export class LocalTaskWorker {
|
||||
this.abortWait.abort();
|
||||
}
|
||||
|
||||
cancel(): void {
|
||||
if (!this.taskAbortController) {
|
||||
throw new ConflictError(`Task ${this.taskId} is not running`);
|
||||
}
|
||||
this.taskAbortController.abort();
|
||||
}
|
||||
|
||||
taskState(): TaskApiTasksResponse['taskState'] {
|
||||
return this.#taskState;
|
||||
}
|
||||
@@ -134,10 +142,10 @@ export class LocalTaskWorker {
|
||||
): Promise<void> {
|
||||
// Abort the task execution either if the worker is stopped, or if the
|
||||
// task timeout is hit
|
||||
const taskAbortController = delegateAbortController(signal);
|
||||
this.taskAbortController = delegateAbortController(signal);
|
||||
const timeoutDuration = Duration.fromISO(settings.timeoutAfterDuration);
|
||||
const timeoutHandle = setTimeout(() => {
|
||||
taskAbortController.abort();
|
||||
this.taskAbortController?.abort();
|
||||
}, timeoutDuration.as('milliseconds'));
|
||||
|
||||
this.#taskState = {
|
||||
@@ -152,7 +160,7 @@ export class LocalTaskWorker {
|
||||
};
|
||||
|
||||
try {
|
||||
await this.fn(taskAbortController.signal);
|
||||
await this.fn(this.taskAbortController.signal);
|
||||
this.#taskState.lastRunEndedAt = DateTime.utc().toISO()!;
|
||||
this.#taskState.lastRunError = undefined;
|
||||
} catch (e) {
|
||||
@@ -162,7 +170,8 @@ export class LocalTaskWorker {
|
||||
|
||||
// release resources
|
||||
clearTimeout(timeoutHandle);
|
||||
taskAbortController.abort();
|
||||
this.taskAbortController.abort();
|
||||
this.taskAbortController = undefined;
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
+96
@@ -399,6 +399,102 @@ describe('PluginTaskManagerImpl', () => {
|
||||
);
|
||||
});
|
||||
|
||||
describe('cancelTask with local scope', () => {
|
||||
it('can cancel a running task', async () => {
|
||||
const { manager } = await init('SQLITE_3');
|
||||
|
||||
const promise = createDeferred();
|
||||
|
||||
await manager.scheduleTask({
|
||||
id: 'task1',
|
||||
timeout: Duration.fromMillis(5000),
|
||||
frequency: Duration.fromObject({ years: 1 }),
|
||||
fn: async () => {
|
||||
promise.resolve();
|
||||
await new Promise(r => setTimeout(r, 20000));
|
||||
},
|
||||
scope: 'local',
|
||||
});
|
||||
|
||||
await promise;
|
||||
await expect(manager.cancelTask('task1')).resolves.toBeUndefined();
|
||||
}, 60_000);
|
||||
|
||||
it('cannot cancel a task that is not running', async () => {
|
||||
const { manager } = await init('SQLITE_3');
|
||||
|
||||
const fn = jest.fn();
|
||||
await manager.scheduleTask({
|
||||
id: 'task1',
|
||||
timeout: Duration.fromMillis(5000),
|
||||
frequency: Duration.fromObject({ years: 1 }),
|
||||
initialDelay: Duration.fromObject({ years: 1 }),
|
||||
fn,
|
||||
scope: 'local',
|
||||
});
|
||||
|
||||
await expect(manager.cancelTask('task1')).rejects.toThrow(ConflictError);
|
||||
}, 60_000);
|
||||
});
|
||||
|
||||
describe('cancelTask with global scope', () => {
|
||||
it.each(databases.eachSupportedId())(
|
||||
'can cancel a running task, %p',
|
||||
async databaseId => {
|
||||
const { manager } = await init(databaseId);
|
||||
|
||||
const promise = createDeferred();
|
||||
|
||||
await manager.scheduleTask({
|
||||
id: 'task1',
|
||||
timeout: Duration.fromMillis(5000),
|
||||
frequency: Duration.fromObject({ years: 1 }),
|
||||
fn: async () => {
|
||||
promise.resolve();
|
||||
await new Promise(r => setTimeout(r, 20000));
|
||||
},
|
||||
scope: 'global',
|
||||
});
|
||||
|
||||
await promise;
|
||||
await expect(manager.cancelTask('task1')).resolves.toBeUndefined();
|
||||
},
|
||||
);
|
||||
|
||||
it.each(databases.eachSupportedId())(
|
||||
'cannot cancel a non-existent task, %p',
|
||||
async databaseId => {
|
||||
const { manager } = await init(databaseId);
|
||||
|
||||
await expect(manager.cancelTask('nonexistent')).rejects.toThrow(
|
||||
NotFoundError,
|
||||
);
|
||||
},
|
||||
);
|
||||
|
||||
it.each(databases.eachSupportedId())(
|
||||
'cannot cancel a task that is not running, %p',
|
||||
async databaseId => {
|
||||
const { manager } = await init(databaseId);
|
||||
|
||||
const fn = jest.fn();
|
||||
const promise = new Promise(resolve => fn.mockImplementation(resolve));
|
||||
await manager.scheduleTask({
|
||||
id: 'task1',
|
||||
timeout: Duration.fromMillis(5000),
|
||||
frequency: Duration.fromObject({ years: 1 }),
|
||||
initialDelay: Duration.fromObject({ years: 1 }),
|
||||
fn,
|
||||
scope: 'global',
|
||||
});
|
||||
|
||||
await expect(manager.cancelTask('task1')).rejects.toThrow(
|
||||
ConflictError,
|
||||
);
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
describe('parseDuration', () => {
|
||||
it('should parse durations', () => {
|
||||
expect(parseDuration({ milliseconds: 5000 })).toEqual('PT5S');
|
||||
|
||||
@@ -107,6 +107,17 @@ export class PluginTaskSchedulerImpl implements SchedulerService {
|
||||
await TaskWorker.trigger(knex, id);
|
||||
}
|
||||
|
||||
async cancelTask(id: string): Promise<void> {
|
||||
const localTask = this.localWorkersById.get(id);
|
||||
if (localTask) {
|
||||
localTask.cancel();
|
||||
return;
|
||||
}
|
||||
|
||||
const knex = await this.databaseFactory();
|
||||
await TaskWorker.cancel(knex, id);
|
||||
}
|
||||
|
||||
async scheduleTask(
|
||||
task: SchedulerServiceTaskScheduleDefinition &
|
||||
SchedulerServiceTaskInvocationDefinition,
|
||||
@@ -206,6 +217,15 @@ export class PluginTaskSchedulerImpl implements SchedulerService {
|
||||
},
|
||||
);
|
||||
|
||||
router.post(
|
||||
'/.backstage/scheduler/v1/tasks/:id/cancel',
|
||||
async (req, res) => {
|
||||
const { id } = req.params;
|
||||
await this.cancelTask(id);
|
||||
res.status(200).end();
|
||||
},
|
||||
);
|
||||
|
||||
return router;
|
||||
}
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
*/
|
||||
|
||||
import { TestDatabases, mockServices } from '@backstage/backend-test-utils';
|
||||
import { ConflictError, NotFoundError } from '@backstage/errors';
|
||||
import { DateTime, Duration } from 'luxon';
|
||||
import waitForExpect from 'wait-for-expect';
|
||||
import { migrateBackendTasks } from '../database/migrateBackendTasks';
|
||||
@@ -584,4 +585,79 @@ describe('TaskWorker', () => {
|
||||
await knex.destroy();
|
||||
},
|
||||
);
|
||||
|
||||
it.each(databases.eachSupportedId())(
|
||||
'can cancel a running task, %p',
|
||||
async databaseId => {
|
||||
const knex = await databases.init(databaseId);
|
||||
await migrateBackendTasks(knex);
|
||||
|
||||
const fn = jest.fn(async () => {});
|
||||
const settings: TaskSettingsV2 = {
|
||||
version: 2,
|
||||
cadence: '* * * * * *',
|
||||
initialDelayDuration: undefined,
|
||||
timeoutAfterDuration: Duration.fromObject({ minutes: 1 }).toISO()!,
|
||||
};
|
||||
|
||||
const worker = new TaskWorker('task1', fn, knex, logger);
|
||||
await worker.persistTask(settings);
|
||||
await worker.tryClaimTask('ticket', settings);
|
||||
|
||||
// Verify the task is running
|
||||
let row = (await knex<DbTasksRow>(DB_TASKS_TABLE))[0];
|
||||
expect(row.current_run_ticket).toBe('ticket');
|
||||
|
||||
await TaskWorker.cancel(knex, 'task1');
|
||||
|
||||
// Verify the task is now idle with a cancellation error recorded
|
||||
row = (await knex<DbTasksRow>(DB_TASKS_TABLE))[0];
|
||||
expect(row.current_run_ticket).toBeNull();
|
||||
expect(row.current_run_started_at).toBeNull();
|
||||
expect(row.current_run_expires_at).toBeNull();
|
||||
expect(row.last_run_ended_at).not.toBeNull();
|
||||
expect(row.last_run_error_json).toContain('Task was cancelled');
|
||||
|
||||
await knex.destroy();
|
||||
},
|
||||
);
|
||||
|
||||
it.each(databases.eachSupportedId())(
|
||||
'cannot cancel a non-existent task, %p',
|
||||
async databaseId => {
|
||||
const knex = await databases.init(databaseId);
|
||||
await migrateBackendTasks(knex);
|
||||
|
||||
await expect(TaskWorker.cancel(knex, 'nonexistent')).rejects.toThrow(
|
||||
NotFoundError,
|
||||
);
|
||||
|
||||
await knex.destroy();
|
||||
},
|
||||
);
|
||||
|
||||
it.each(databases.eachSupportedId())(
|
||||
'cannot cancel a task that is not running, %p',
|
||||
async databaseId => {
|
||||
const knex = await databases.init(databaseId);
|
||||
await migrateBackendTasks(knex);
|
||||
|
||||
const fn = jest.fn(async () => {});
|
||||
const settings: TaskSettingsV2 = {
|
||||
version: 2,
|
||||
cadence: '* * * * * *',
|
||||
initialDelayDuration: undefined,
|
||||
timeoutAfterDuration: Duration.fromObject({ minutes: 1 }).toISO()!,
|
||||
};
|
||||
|
||||
const worker = new TaskWorker('task1', fn, knex, logger);
|
||||
await worker.persistTask(settings);
|
||||
|
||||
await expect(TaskWorker.cancel(knex, 'task1')).rejects.toThrow(
|
||||
ConflictError,
|
||||
);
|
||||
|
||||
await knex.destroy();
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
@@ -152,6 +152,31 @@ export class TaskWorker {
|
||||
}
|
||||
}
|
||||
|
||||
static async cancel(knex: Knex, taskId: string): Promise<void> {
|
||||
// check if task exists
|
||||
const rows = await knex<DbTasksRow>(DB_TASKS_TABLE)
|
||||
.select(knex.raw(1))
|
||||
.where('id', '=', taskId);
|
||||
if (rows.length !== 1) {
|
||||
throw new NotFoundError(`Task ${taskId} does not exist`);
|
||||
}
|
||||
|
||||
const dbNull = knex.raw('null');
|
||||
const updatedRows = await knex<DbTasksRow>(DB_TASKS_TABLE)
|
||||
.where('id', '=', taskId)
|
||||
.whereNotNull('current_run_ticket')
|
||||
.update({
|
||||
current_run_ticket: dbNull,
|
||||
current_run_started_at: dbNull,
|
||||
current_run_expires_at: dbNull,
|
||||
last_run_ended_at: knex.fn.now(),
|
||||
last_run_error_json: serializeError(new Error('Task was cancelled')),
|
||||
});
|
||||
if (updatedRows < 1) {
|
||||
throw new ConflictError(`Task ${taskId} is not running`);
|
||||
}
|
||||
}
|
||||
|
||||
static async taskStates(
|
||||
knex: Knex,
|
||||
): Promise<Map<string, TaskApiTasksResponse['taskState']>> {
|
||||
@@ -227,11 +252,16 @@ export class TaskWorker {
|
||||
}
|
||||
|
||||
// Abort the task execution either if the worker is stopped, or if the
|
||||
// task timeout is hit
|
||||
// task timeout is hit, or if the task ticket was lost (e.g. due to
|
||||
// cancellation from another host)
|
||||
const taskAbortController = delegateAbortController(signal);
|
||||
const timeoutHandle = setTimeout(() => {
|
||||
taskAbortController.abort();
|
||||
}, Duration.fromISO(taskSettings.timeoutAfterDuration).as('milliseconds'));
|
||||
const livenessHandle = setInterval(
|
||||
() => this.checkLiveness(ticket, taskAbortController),
|
||||
this.workCheckFrequency.as('milliseconds'),
|
||||
);
|
||||
|
||||
try {
|
||||
this.#workerState = {
|
||||
@@ -248,6 +278,7 @@ export class TaskWorker {
|
||||
status: 'idle',
|
||||
};
|
||||
clearTimeout(timeoutHandle);
|
||||
clearInterval(livenessHandle);
|
||||
}
|
||||
|
||||
await this.tryReleaseTask(ticket, taskSettings);
|
||||
@@ -334,6 +365,33 @@ export class TaskWorker {
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks whether the current task ticket is still valid in the database.
|
||||
* If the ticket has been cleared (e.g. by cancellation or janitor cleanup),
|
||||
* aborts the task execution.
|
||||
*/
|
||||
private async checkLiveness(
|
||||
ticket: string,
|
||||
taskAbortController: AbortController,
|
||||
): Promise<void> {
|
||||
try {
|
||||
const [row] = await this.knex<DbTasksRow>(DB_TASKS_TABLE)
|
||||
.where('id', '=', this.taskId)
|
||||
.select('current_run_ticket');
|
||||
|
||||
if (!row || row.current_run_ticket !== ticket) {
|
||||
this.logger.info(
|
||||
`Task ticket for "${this.taskId}" is no longer valid; aborting execution`,
|
||||
);
|
||||
taskAbortController.abort();
|
||||
}
|
||||
} catch (e) {
|
||||
this.logger.warn(
|
||||
`Failed to check liveness for task "${this.taskId}", ${e}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the task is ready to run
|
||||
*/
|
||||
|
||||
@@ -304,6 +304,16 @@ export interface SchedulerService {
|
||||
*/
|
||||
triggerTask(id: string): Promise<void>;
|
||||
|
||||
/**
|
||||
* Cancels a currently running task by ID, marking it as idle.
|
||||
*
|
||||
* If the task doesn't exist, a NotFoundError is thrown. If the task is
|
||||
* not currently running, a ConflictError is thrown.
|
||||
*
|
||||
* @param id - The task ID
|
||||
*/
|
||||
cancelTask(id: string): Promise<void>;
|
||||
|
||||
/**
|
||||
* Schedules a task function for recurring runs.
|
||||
*
|
||||
|
||||
Reference in New Issue
Block a user