Wrap scheduler tasks in OTEL spans

Signed-off-by: Boris Bera <bbera@coveo.com>
This commit is contained in:
Boris Bera
2024-07-23 15:01:24 -04:00
parent 8711fa8004
commit 5705424dfd
3 changed files with 38 additions and 10 deletions
+5
View File
@@ -0,0 +1,5 @@
---
'@backstage/backend-defaults': patch
---
Wrap scheduled tasks from the scheduler core service now in OpenTelemetry spans
@@ -23,13 +23,21 @@ import {
SchedulerServiceTaskRunner,
SchedulerServiceTaskScheduleDefinition,
} from '@backstage/backend-plugin-api';
import { Counter, Histogram, metrics } from '@opentelemetry/api';
import {
Counter,
Histogram,
metrics,
SpanStatusCode,
trace,
} from '@opentelemetry/api';
import { Knex } from 'knex';
import { Duration } from 'luxon';
import { LocalTaskWorker } from './LocalTaskWorker';
import { TaskWorker } from './TaskWorker';
import { TaskSettingsV2 } from './types';
import { validateId } from './util';
import { TRACER_ID, validateId } from './util';
const tracer = trace.getTracer(TRACER_ID);
/**
* Implements the actual task management.
@@ -85,7 +93,7 @@ export class PluginTaskSchedulerImpl implements SchedulerService {
const knex = await this.databaseFactory();
const worker = new TaskWorker(
task.id,
this.wrapInMetrics(task.fn, { labels: { taskId: task.id, scope } }),
this.instrumentedFunction(task, scope),
knex,
this.logger.child({ task: task.id }),
);
@@ -93,7 +101,7 @@ export class PluginTaskSchedulerImpl implements SchedulerService {
} else {
const worker = new LocalTaskWorker(
task.id,
this.wrapInMetrics(task.fn, { labels: { taskId: task.id, scope } }),
this.instrumentedFunction(task, scope),
this.logger.child({ task: task.id }),
);
worker.start(settings, { signal: task.signal });
@@ -121,20 +129,33 @@ export class PluginTaskSchedulerImpl implements SchedulerService {
return this.allScheduledTasks;
}
private wrapInMetrics(
fn: SchedulerServiceTaskFunction,
opts: { labels: Record<string, string> },
private instrumentedFunction(
task: SchedulerServiceTaskInvocationDefinition,
scope: string,
): SchedulerServiceTaskFunction {
return async abort => {
const labels = {
...opts.labels,
const labels: Record<string, string> = {
taskId: task.id,
scope,
};
this.counter.add(1, { ...labels, result: 'started' });
const startTime = process.hrtime();
try {
await fn(abort);
await tracer.startActiveSpan(`task ${task.id}`, async span => {
try {
span.setAttributes(labels);
await task.fn(abort);
} catch (error) {
if (error instanceof Error) {
span.recordException(error);
}
throw error;
} finally {
span.end();
}
});
labels.result = 'completed';
} catch (ex) {
labels.result = 'failed';
@@ -18,6 +18,8 @@ import { InputError } from '@backstage/errors';
import { Knex } from 'knex';
import { DateTime, Duration } from 'luxon';
export const TRACER_ID = 'backstage.scheduler';
// Keep the IDs compatible with e.g. Prometheus labels
export function validateId(id: string) {
if (typeof id !== 'string' || !id.trim()) {