Merge pull request #32946 from backstage/freben/scm-metrics
catalog: add metrics to the SCM events handling
This commit is contained in:
@@ -0,0 +1,7 @@
|
||||
---
|
||||
'@backstage/plugin-catalog-backend': minor
|
||||
---
|
||||
|
||||
Added opentelemetry metrics for SCM events:
|
||||
|
||||
- `catalog.events.scm.messages` with attribute `eventType`: Counter for the number of SCM events actually received by the catalog backend. The `eventType` is currently either `location` or `repository`.
|
||||
@@ -0,0 +1,7 @@
|
||||
---
|
||||
'@backstage/plugin-catalog-node': minor
|
||||
---
|
||||
|
||||
Added the ability for SCM events subscribers to mark the fact that they have taken actions based on events, which produces output metrics:
|
||||
|
||||
- `catalog.events.scm.actions` with attribute `action`: Counter for the number of actions actually taken by catalog internals or other subscribers, based on SCM events. The `action` is currently either `create`, `delete`, `refresh`, or `move`.
|
||||
@@ -322,6 +322,7 @@ openapi
|
||||
OpenSearch
|
||||
OpenShift
|
||||
openssl
|
||||
opentelemetry
|
||||
orgs
|
||||
overridable
|
||||
padding
|
||||
|
||||
@@ -35,6 +35,7 @@ describe('DefaultLocationStore', () => {
|
||||
const mockScmEvents = {
|
||||
subscribe: jest.fn(),
|
||||
publish: jest.fn(),
|
||||
markEventActionTaken: jest.fn(),
|
||||
};
|
||||
let subscriber: CatalogScmEventsServiceSubscriber | undefined;
|
||||
|
||||
@@ -362,6 +363,11 @@ describe('DefaultLocationStore', () => {
|
||||
},
|
||||
],
|
||||
});
|
||||
|
||||
expect(mockScmEvents.markEventActionTaken).toHaveBeenCalledWith({
|
||||
count: 1,
|
||||
action: 'delete',
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -483,6 +489,15 @@ describe('DefaultLocationStore', () => {
|
||||
],
|
||||
removed: [],
|
||||
});
|
||||
|
||||
expect(mockScmEvents.markEventActionTaken).toHaveBeenCalledWith({
|
||||
count: 1,
|
||||
action: 'delete',
|
||||
});
|
||||
expect(mockScmEvents.markEventActionTaken).toHaveBeenCalledWith({
|
||||
count: 1,
|
||||
action: 'create',
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -589,6 +604,11 @@ describe('DefaultLocationStore', () => {
|
||||
},
|
||||
],
|
||||
});
|
||||
|
||||
expect(mockScmEvents.markEventActionTaken).toHaveBeenCalledWith({
|
||||
count: 1,
|
||||
action: 'delete',
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -709,6 +729,11 @@ describe('DefaultLocationStore', () => {
|
||||
],
|
||||
removed: [],
|
||||
});
|
||||
|
||||
expect(mockScmEvents.markEventActionTaken).toHaveBeenCalledWith({
|
||||
count: 1,
|
||||
action: 'move',
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -305,16 +305,40 @@ export class DefaultLocationStore implements LocationStore, EntityProvider {
|
||||
}
|
||||
|
||||
if (exactLocationsToDelete.size > 0) {
|
||||
await this.#deleteLocationsByExactUrl(exactLocationsToDelete);
|
||||
const count = await this.#deleteLocationsByExactUrl(
|
||||
exactLocationsToDelete,
|
||||
);
|
||||
this.scmEvents.markEventActionTaken({
|
||||
count,
|
||||
action: 'delete',
|
||||
});
|
||||
}
|
||||
if (locationPrefixesToDelete.size > 0) {
|
||||
await this.#deleteLocationsByUrlPrefix(locationPrefixesToDelete);
|
||||
const count = await this.#deleteLocationsByUrlPrefix(
|
||||
locationPrefixesToDelete,
|
||||
);
|
||||
this.scmEvents.markEventActionTaken({
|
||||
count,
|
||||
action: 'delete',
|
||||
});
|
||||
}
|
||||
if (exactLocationsToCreate.size > 0) {
|
||||
await this.#createLocationsByExactUrl(exactLocationsToCreate);
|
||||
const count = await this.#createLocationsByExactUrl(
|
||||
exactLocationsToCreate,
|
||||
);
|
||||
this.scmEvents.markEventActionTaken({
|
||||
count,
|
||||
action: 'create',
|
||||
});
|
||||
}
|
||||
if (locationPrefixesToMove.size > 0) {
|
||||
await this.#moveLocationsByUrlPrefix(locationPrefixesToMove);
|
||||
const count = await this.#moveLocationsByUrlPrefix(
|
||||
locationPrefixesToMove,
|
||||
);
|
||||
this.scmEvents.markEventActionTaken({
|
||||
count,
|
||||
action: 'move',
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -50,6 +50,7 @@ describe('GenericScmEventRefreshProvider', () => {
|
||||
return { unsubscribe: () => {} };
|
||||
}),
|
||||
publish: jest.fn(),
|
||||
markEventActionTaken: jest.fn(),
|
||||
};
|
||||
|
||||
const store = new GenericScmEventRefreshProvider(knex, scmEvents, {
|
||||
|
||||
@@ -150,6 +150,8 @@ export class GenericScmEventRefreshProvider implements EntityProvider {
|
||||
|
||||
count += Number(result);
|
||||
}
|
||||
|
||||
this.#scmEvents.markEventActionTaken({ count, action: 'refresh' });
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -43,6 +43,7 @@ import {
|
||||
} from '@backstage/plugin-catalog-node/alpha';
|
||||
import { eventsServiceRef } from '@backstage/plugin-events-node';
|
||||
import { Permission } from '@backstage/plugin-permission-common';
|
||||
import { metrics } from '@opentelemetry/api';
|
||||
import { merge } from 'lodash';
|
||||
import { CatalogBuilder } from './CatalogBuilder';
|
||||
import { actionsRegistryServiceRef } from '@backstage/backend-plugin-api/alpha';
|
||||
@@ -301,6 +302,24 @@ export const catalogPlugin = createBackendPlugin({
|
||||
catalog,
|
||||
actionsRegistry,
|
||||
});
|
||||
|
||||
// Track SCM event message counts as a metric
|
||||
const meter = metrics.getMeter('default');
|
||||
const scmEventsMessagesCounter = meter.createCounter<{
|
||||
eventType: string;
|
||||
}>('catalog.events.scm.messages', {
|
||||
description:
|
||||
'Number of SCM event messages received by the catalog backend',
|
||||
unit: 'short',
|
||||
});
|
||||
catalogScmEvents.subscribe({
|
||||
onEvents: async e => {
|
||||
for (const event of e) {
|
||||
const eventType = event.type.split('.')[0];
|
||||
scmEventsMessagesCounter.add(1, { eventType });
|
||||
}
|
||||
},
|
||||
});
|
||||
},
|
||||
});
|
||||
},
|
||||
|
||||
@@ -1429,6 +1429,7 @@ describe('POST /locations/by-query works end to end', () => {
|
||||
const mockScmEvents = {
|
||||
subscribe: jest.fn(),
|
||||
publish: jest.fn(),
|
||||
markEventActionTaken: jest.fn(),
|
||||
};
|
||||
|
||||
const store = new DefaultLocationStore(knex, mockScmEvents, {
|
||||
|
||||
@@ -68,6 +68,7 @@
|
||||
"@backstage/plugin-permission-common": "workspace:^",
|
||||
"@backstage/plugin-permission-node": "workspace:^",
|
||||
"@backstage/types": "workspace:^",
|
||||
"@opentelemetry/api": "^1.9.0",
|
||||
"lodash": "^4.17.21",
|
||||
"yaml": "^2.0.0"
|
||||
},
|
||||
|
||||
@@ -125,6 +125,7 @@ export type CatalogScmEventContext = {
|
||||
|
||||
// @alpha
|
||||
export interface CatalogScmEventsService {
|
||||
markEventActionTaken(options: { count?: number; action: string }): void;
|
||||
publish(events: CatalogScmEvent[]): Promise<void>;
|
||||
subscribe(subscriber: CatalogScmEventsServiceSubscriber): {
|
||||
unsubscribe: () => void;
|
||||
|
||||
@@ -15,11 +15,21 @@
|
||||
*/
|
||||
|
||||
import { createDeferred } from '@backstage/types';
|
||||
import { MetricsAPI } from '@opentelemetry/api';
|
||||
import { DefaultCatalogScmEventsService } from './DefaultCatalogScmEventsService';
|
||||
|
||||
describe('DefaultCatalogScmEventsService', () => {
|
||||
const counterAdd = jest.fn();
|
||||
const mockMetrics = {
|
||||
getMeter: () => ({
|
||||
createCounter: () => ({
|
||||
add: counterAdd,
|
||||
}),
|
||||
}),
|
||||
} as unknown as MetricsAPI;
|
||||
|
||||
it('should publish and subscribe to events', async () => {
|
||||
const service = new DefaultCatalogScmEventsService();
|
||||
const service = new DefaultCatalogScmEventsService(mockMetrics);
|
||||
|
||||
const subscriber1 = {
|
||||
onEvents: jest.fn(),
|
||||
@@ -53,7 +63,7 @@ describe('DefaultCatalogScmEventsService', () => {
|
||||
});
|
||||
|
||||
it('waits for all subscribers to acknowledge the events', async () => {
|
||||
const service = new DefaultCatalogScmEventsService();
|
||||
const service = new DefaultCatalogScmEventsService(mockMetrics);
|
||||
|
||||
const work1 = createDeferred<void>();
|
||||
const work2 = createDeferred<void>();
|
||||
@@ -102,4 +112,12 @@ describe('DefaultCatalogScmEventsService', () => {
|
||||
|
||||
expect(completed).toBe(true);
|
||||
});
|
||||
|
||||
it('marks event actions taken', () => {
|
||||
const service = new DefaultCatalogScmEventsService(mockMetrics);
|
||||
|
||||
service.markEventActionTaken({ action: 'refresh' });
|
||||
|
||||
expect(counterAdd).toHaveBeenCalledWith(1, { action: 'refresh' });
|
||||
});
|
||||
});
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { Counter, MetricsAPI } from '@opentelemetry/api';
|
||||
import {
|
||||
CatalogScmEvent,
|
||||
CatalogScmEventsService,
|
||||
@@ -21,19 +22,36 @@ import {
|
||||
} from './types';
|
||||
|
||||
/**
|
||||
* The default implementation of the {@link CatalogScmEventsService}/{@link catalogScmEventsServiceRef}.
|
||||
* The default implementation of the
|
||||
* {@link CatalogScmEventsService}/{@link catalogScmEventsServiceRef}.
|
||||
*
|
||||
* @internal
|
||||
* @remarks
|
||||
*
|
||||
* This implementation is in-memory, which requires the producers and consumer
|
||||
* (the catalog backend) to be deployed together.
|
||||
*
|
||||
* It's defined in here instead of in the catalog-backend plugin because this
|
||||
* allows us to have a default factory whether you happen to be co-installed
|
||||
* with the catalog-backend plugin or not.
|
||||
*/
|
||||
export class DefaultCatalogScmEventsService implements CatalogScmEventsService {
|
||||
readonly #subscribers: Set<CatalogScmEventsServiceSubscriber>;
|
||||
readonly #metrics: {
|
||||
actions: Counter<{ action: string }>;
|
||||
};
|
||||
|
||||
constructor() {
|
||||
constructor(metrics: MetricsAPI) {
|
||||
this.#subscribers = new Set();
|
||||
|
||||
const meter = metrics.getMeter('default');
|
||||
this.#metrics = {
|
||||
actions: meter.createCounter('catalog.events.scm.actions', {
|
||||
description:
|
||||
'Number of actions taken as a result of SCM event messages',
|
||||
unit: 'short',
|
||||
}),
|
||||
};
|
||||
}
|
||||
|
||||
subscribe(subscriber: CatalogScmEventsServiceSubscriber): {
|
||||
@@ -58,4 +76,8 @@ export class DefaultCatalogScmEventsService implements CatalogScmEventsService {
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
markEventActionTaken(options: { count?: number; action: string }): void {
|
||||
this.#metrics.actions.add(options.count ?? 1, { action: options.action });
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@ import {
|
||||
createServiceFactory,
|
||||
createServiceRef,
|
||||
} from '@backstage/backend-plugin-api';
|
||||
import { metrics } from '@opentelemetry/api';
|
||||
import { CatalogScmEventsService } from './types';
|
||||
import { DefaultCatalogScmEventsService } from './DefaultCatalogScmEventsService';
|
||||
|
||||
@@ -39,7 +40,7 @@ export const catalogScmEventsServiceRef =
|
||||
service,
|
||||
deps: {},
|
||||
createRootContext() {
|
||||
return new DefaultCatalogScmEventsService();
|
||||
return new DefaultCatalogScmEventsService(metrics);
|
||||
},
|
||||
factory(_, ctx) {
|
||||
return ctx;
|
||||
|
||||
@@ -55,6 +55,26 @@ export interface CatalogScmEventsService {
|
||||
* guarantees.
|
||||
*/
|
||||
publish(events: CatalogScmEvent[]): Promise<void>;
|
||||
|
||||
/**
|
||||
* As a consumer of SCM events, mark that you have taken an action as a result
|
||||
* of an SCM event.
|
||||
*
|
||||
* This is typically used to record metrics or other observability signals
|
||||
* about how SCM events are handled, for example counting how many refresh,
|
||||
* delete, create, or move operations are triggered by incoming events.
|
||||
*/
|
||||
markEventActionTaken(options: {
|
||||
/**
|
||||
* The number of actions taken of the given type. Defaults to 1.
|
||||
*/
|
||||
count?: number;
|
||||
/**
|
||||
* The type of action taken - typically "refresh", "delete",
|
||||
* "create", or "move".
|
||||
*/
|
||||
action: string;
|
||||
}): void;
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
Reference in New Issue
Block a user