Wrap scheduler tasks in OTEL spans
Signed-off-by: Boris Bera <bbera@coveo.com>
This commit is contained in:
@@ -0,0 +1,5 @@
|
||||
---
|
||||
'@backstage/backend-defaults': patch
|
||||
---
|
||||
|
||||
Wrap scheduled tasks from the scheduler core service now in OpenTelemetry spans
|
||||
+31
-10
@@ -23,13 +23,21 @@ import {
|
||||
SchedulerServiceTaskRunner,
|
||||
SchedulerServiceTaskScheduleDefinition,
|
||||
} from '@backstage/backend-plugin-api';
|
||||
import { Counter, Histogram, metrics } from '@opentelemetry/api';
|
||||
import {
|
||||
Counter,
|
||||
Histogram,
|
||||
metrics,
|
||||
SpanStatusCode,
|
||||
trace,
|
||||
} from '@opentelemetry/api';
|
||||
import { Knex } from 'knex';
|
||||
import { Duration } from 'luxon';
|
||||
import { LocalTaskWorker } from './LocalTaskWorker';
|
||||
import { TaskWorker } from './TaskWorker';
|
||||
import { TaskSettingsV2 } from './types';
|
||||
import { validateId } from './util';
|
||||
import { TRACER_ID, validateId } from './util';
|
||||
|
||||
const tracer = trace.getTracer(TRACER_ID);
|
||||
|
||||
/**
|
||||
* Implements the actual task management.
|
||||
@@ -85,7 +93,7 @@ export class PluginTaskSchedulerImpl implements SchedulerService {
|
||||
const knex = await this.databaseFactory();
|
||||
const worker = new TaskWorker(
|
||||
task.id,
|
||||
this.wrapInMetrics(task.fn, { labels: { taskId: task.id, scope } }),
|
||||
this.instrumentedFunction(task, scope),
|
||||
knex,
|
||||
this.logger.child({ task: task.id }),
|
||||
);
|
||||
@@ -93,7 +101,7 @@ export class PluginTaskSchedulerImpl implements SchedulerService {
|
||||
} else {
|
||||
const worker = new LocalTaskWorker(
|
||||
task.id,
|
||||
this.wrapInMetrics(task.fn, { labels: { taskId: task.id, scope } }),
|
||||
this.instrumentedFunction(task, scope),
|
||||
this.logger.child({ task: task.id }),
|
||||
);
|
||||
worker.start(settings, { signal: task.signal });
|
||||
@@ -121,20 +129,33 @@ export class PluginTaskSchedulerImpl implements SchedulerService {
|
||||
return this.allScheduledTasks;
|
||||
}
|
||||
|
||||
private wrapInMetrics(
|
||||
fn: SchedulerServiceTaskFunction,
|
||||
opts: { labels: Record<string, string> },
|
||||
private instrumentedFunction(
|
||||
task: SchedulerServiceTaskInvocationDefinition,
|
||||
scope: string,
|
||||
): SchedulerServiceTaskFunction {
|
||||
return async abort => {
|
||||
const labels = {
|
||||
...opts.labels,
|
||||
const labels: Record<string, string> = {
|
||||
taskId: task.id,
|
||||
scope,
|
||||
};
|
||||
this.counter.add(1, { ...labels, result: 'started' });
|
||||
|
||||
const startTime = process.hrtime();
|
||||
|
||||
try {
|
||||
await fn(abort);
|
||||
await tracer.startActiveSpan(`task ${task.id}`, async span => {
|
||||
try {
|
||||
span.setAttributes(labels);
|
||||
await task.fn(abort);
|
||||
} catch (error) {
|
||||
if (error instanceof Error) {
|
||||
span.recordException(error);
|
||||
}
|
||||
throw error;
|
||||
} finally {
|
||||
span.end();
|
||||
}
|
||||
});
|
||||
labels.result = 'completed';
|
||||
} catch (ex) {
|
||||
labels.result = 'failed';
|
||||
|
||||
@@ -18,6 +18,8 @@ import { InputError } from '@backstage/errors';
|
||||
import { Knex } from 'knex';
|
||||
import { DateTime, Duration } from 'luxon';
|
||||
|
||||
export const TRACER_ID = 'backstage.scheduler';
|
||||
|
||||
// Keep the IDs compatible with e.g. Prometheus labels
|
||||
export function validateId(id: string) {
|
||||
if (typeof id !== 'string' || !id.trim()) {
|
||||
|
||||
Reference in New Issue
Block a user