feat(events): add new events service
Signed-off-by: Patrick Jungermann <Patrick.Jungermann@gmail.com>
This commit is contained in:
@@ -0,0 +1,39 @@
|
||||
---
|
||||
'@backstage/plugin-events-backend-test-utils': patch
|
||||
'@backstage/plugin-events-backend': patch
|
||||
'@backstage/plugin-events-node': patch
|
||||
---
|
||||
|
||||
Add new `EventsService` as well as `eventsServiceRef` for the new backend system.
|
||||
|
||||
**Summary:**
|
||||
|
||||
- new:
|
||||
`EventsService`, `eventsServiceRef`, `TestEventsService`
|
||||
- deprecated:
|
||||
`EventBroker`, `EventPublisher`, `EventSubscriber`, `DefaultEventBroker`, `EventsBackend`,
|
||||
most parts of `EventsExtensionPoint` (alpha),
|
||||
`TestEventBroker`, `TestEventPublisher`, `TestEventSubscriber`
|
||||
|
||||
Add the `eventsServiceRef` as dependency to your backend plugins
|
||||
or backend plugin modules.
|
||||
|
||||
**Details:**
|
||||
|
||||
The previous implementation using the `EventsExtensionPoint` was added in the early stages
|
||||
of the new backend system and does not respect the plugin isolation.
|
||||
This made it not compatible anymore with the new backend system.
|
||||
|
||||
Additionally, the previous interfaces had some room for simplification,
|
||||
supporting less exposure of internal concerns as well.
|
||||
|
||||
Hereby, this change adds a new `EventsService` interface as replacement for the now deprecated `EventBroker`.
|
||||
The new interface does not require any `EventPublisher` or `EventSubscriber` interfaces anymore.
|
||||
Instead, it is expected that the `EventsService` gets passed into publishers and subscribers,
|
||||
and used internally. There is no need to expose anything of that at their own interfaces.
|
||||
|
||||
Most parts of `EventsExtensionPoint` (alpha) are deprecated as well and were not usable
|
||||
(by other plugins or their modules) anyway.
|
||||
|
||||
The `DefaultEventBroker` implementation is deprecated and wraps the new `DefaultEventsService` implementation.
|
||||
Optionally, an instance can be passed as argument to allow mixed setups to operate alongside.
|
||||
@@ -6,9 +6,11 @@
|
||||
import { EventBroker } from '@backstage/plugin-events-node';
|
||||
import { EventParams } from '@backstage/plugin-events-node';
|
||||
import { EventPublisher } from '@backstage/plugin-events-node';
|
||||
import { EventsService } from '@backstage/plugin-events-node';
|
||||
import { EventsServiceSubscribeOptions } from '@backstage/plugin-events-node';
|
||||
import { EventSubscriber } from '@backstage/plugin-events-node';
|
||||
|
||||
// @public (undocumented)
|
||||
// @public @deprecated (undocumented)
|
||||
export class TestEventBroker implements EventBroker {
|
||||
// (undocumented)
|
||||
publish(params: EventParams): Promise<void>;
|
||||
@@ -22,7 +24,7 @@ export class TestEventBroker implements EventBroker {
|
||||
readonly subscribed: EventSubscriber[];
|
||||
}
|
||||
|
||||
// @public (undocumented)
|
||||
// @public @deprecated (undocumented)
|
||||
export class TestEventPublisher implements EventPublisher {
|
||||
// (undocumented)
|
||||
get eventBroker(): EventBroker | undefined;
|
||||
@@ -31,6 +33,20 @@ export class TestEventPublisher implements EventPublisher {
|
||||
}
|
||||
|
||||
// @public (undocumented)
|
||||
export class TestEventsService implements EventsService {
|
||||
// (undocumented)
|
||||
publish(params: EventParams): Promise<void>;
|
||||
// (undocumented)
|
||||
get published(): EventParams[];
|
||||
// (undocumented)
|
||||
reset(): void;
|
||||
// (undocumented)
|
||||
subscribe(options: EventsServiceSubscribeOptions): Promise<void>;
|
||||
// (undocumented)
|
||||
get subscribed(): EventsServiceSubscribeOptions[];
|
||||
}
|
||||
|
||||
// @public @deprecated (undocumented)
|
||||
export class TestEventSubscriber implements EventSubscriber {
|
||||
constructor(name: string, topics: string[]);
|
||||
// (undocumented)
|
||||
|
||||
@@ -0,0 +1,19 @@
|
||||
/*
|
||||
* Copyright 2024 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
export { TestEventBroker } from './testUtils/TestEventBroker';
|
||||
export { TestEventPublisher } from './testUtils/TestEventPublisher';
|
||||
export { TestEventSubscriber } from './testUtils/TestEventSubscriber';
|
||||
@@ -20,4 +20,5 @@
|
||||
* @packageDocumentation
|
||||
*/
|
||||
|
||||
export * from './deprecated';
|
||||
export * from './testUtils';
|
||||
|
||||
@@ -20,7 +20,10 @@ import {
|
||||
EventSubscriber,
|
||||
} from '@backstage/plugin-events-node';
|
||||
|
||||
/** @public */
|
||||
/**
|
||||
* @public
|
||||
* @deprecated use `TestEventsService` instead
|
||||
*/
|
||||
export class TestEventBroker implements EventBroker {
|
||||
readonly published: EventParams[] = [];
|
||||
readonly subscribed: EventSubscriber[] = [];
|
||||
|
||||
@@ -16,7 +16,10 @@
|
||||
|
||||
import { EventBroker, EventPublisher } from '@backstage/plugin-events-node';
|
||||
|
||||
/** @public */
|
||||
/**
|
||||
* @public
|
||||
* @deprecated `EventPublisher` was replaced by `EventsService.publish`
|
||||
*/
|
||||
export class TestEventPublisher implements EventPublisher {
|
||||
#eventBroker?: EventBroker;
|
||||
|
||||
|
||||
@@ -16,7 +16,10 @@
|
||||
|
||||
import { EventParams, EventSubscriber } from '@backstage/plugin-events-node';
|
||||
|
||||
/** @public */
|
||||
/**
|
||||
* @public
|
||||
* @deprecated `EventSubscriber` was replaced by `EventsService.subscribe`.
|
||||
*/
|
||||
export class TestEventSubscriber implements EventSubscriber {
|
||||
readonly name: string;
|
||||
readonly topics: string[];
|
||||
|
||||
@@ -0,0 +1,48 @@
|
||||
/*
|
||||
* Copyright 2024 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import {
|
||||
EventParams,
|
||||
EventsService,
|
||||
EventsServiceSubscribeOptions,
|
||||
} from '@backstage/plugin-events-node';
|
||||
|
||||
/** @public */
|
||||
export class TestEventsService implements EventsService {
|
||||
#published: EventParams[] = [];
|
||||
#subscribed: EventsServiceSubscribeOptions[] = [];
|
||||
|
||||
async publish(params: EventParams): Promise<void> {
|
||||
this.#published.push(params);
|
||||
}
|
||||
|
||||
async subscribe(options: EventsServiceSubscribeOptions): Promise<void> {
|
||||
this.#subscribed.push(options);
|
||||
}
|
||||
|
||||
get published(): EventParams[] {
|
||||
return this.#published;
|
||||
}
|
||||
|
||||
get subscribed(): EventsServiceSubscribeOptions[] {
|
||||
return this.#subscribed;
|
||||
}
|
||||
|
||||
reset(): void {
|
||||
this.#published = [];
|
||||
this.#subscribed = [];
|
||||
}
|
||||
}
|
||||
@@ -14,6 +14,4 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
export { TestEventBroker } from './TestEventBroker';
|
||||
export { TestEventPublisher } from './TestEventPublisher';
|
||||
export { TestEventSubscriber } from './TestEventSubscriber';
|
||||
export { TestEventsService } from './TestEventsService';
|
||||
|
||||
@@ -7,14 +7,17 @@ import { Config } from '@backstage/config';
|
||||
import { EventBroker } from '@backstage/plugin-events-node';
|
||||
import { EventParams } from '@backstage/plugin-events-node';
|
||||
import { EventPublisher } from '@backstage/plugin-events-node';
|
||||
import { EventsService } from '@backstage/plugin-events-node';
|
||||
import { EventSubscriber } from '@backstage/plugin-events-node';
|
||||
import express from 'express';
|
||||
import { HttpPostIngressOptions } from '@backstage/plugin-events-node';
|
||||
import { Logger } from 'winston';
|
||||
import { LoggerService } from '@backstage/backend-plugin-api';
|
||||
|
||||
// @public
|
||||
// @public @deprecated
|
||||
export class DefaultEventBroker implements EventBroker {
|
||||
constructor(logger: Logger);
|
||||
// @deprecated
|
||||
constructor(logger: LoggerService, events?: EventsService);
|
||||
// (undocumented)
|
||||
publish(params: EventParams): Promise<void>;
|
||||
// (undocumented)
|
||||
@@ -23,7 +26,7 @@ export class DefaultEventBroker implements EventBroker {
|
||||
): void;
|
||||
}
|
||||
|
||||
// @public
|
||||
// @public @deprecated
|
||||
export class EventsBackend {
|
||||
constructor(logger: Logger);
|
||||
// (undocumented)
|
||||
|
||||
@@ -0,0 +1,18 @@
|
||||
/*
|
||||
* Copyright 2024 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
export { EventsBackend } from './service/EventsBackend';
|
||||
export { DefaultEventBroker } from './service/DefaultEventBroker';
|
||||
@@ -20,6 +20,5 @@
|
||||
* @packageDocumentation
|
||||
*/
|
||||
|
||||
export { EventsBackend } from './service/EventsBackend';
|
||||
export * from './deprecated';
|
||||
export { HttpPostIngressEventPublisher } from './service/http';
|
||||
export { DefaultEventBroker } from './service/DefaultEventBroker';
|
||||
|
||||
@@ -85,15 +85,15 @@ describe('DefaultEventBroker', () => {
|
||||
}
|
||||
})();
|
||||
|
||||
const errorSpy = jest.spyOn(logger, 'error');
|
||||
const warnSpy = jest.spyOn(logger, 'warn');
|
||||
const eventBroker = new DefaultEventBroker(logger);
|
||||
|
||||
eventBroker.subscribe(subscriber1);
|
||||
await eventBroker.publish({ topic, eventPayload: '1' });
|
||||
|
||||
expect(errorSpy).toHaveBeenCalledTimes(1);
|
||||
expect(errorSpy).toHaveBeenCalledWith(
|
||||
'Subscriber "Subscriber1" failed to process event',
|
||||
expect(warnSpy).toHaveBeenCalledTimes(1);
|
||||
expect(warnSpy).toHaveBeenCalledWith(
|
||||
'Subscriber "Subscriber1" failed to process event for topic "testTopic"',
|
||||
new Error('NOPE 1'),
|
||||
);
|
||||
|
||||
@@ -101,13 +101,13 @@ describe('DefaultEventBroker', () => {
|
||||
await eventBroker.publish({ topic, eventPayload: '2' });
|
||||
|
||||
// With two subscribers we should not halt on the first error but call all subscribers
|
||||
expect(errorSpy).toHaveBeenCalledTimes(3);
|
||||
expect(errorSpy).toHaveBeenCalledWith(
|
||||
'Subscriber "Subscriber1" failed to process event',
|
||||
expect(warnSpy).toHaveBeenCalledTimes(3);
|
||||
expect(warnSpy).toHaveBeenCalledWith(
|
||||
'Subscriber "Subscriber1" failed to process event for topic "testTopic"',
|
||||
new Error('NOPE 2'),
|
||||
);
|
||||
expect(errorSpy).toHaveBeenCalledWith(
|
||||
'Subscriber "Subscriber2" failed to process event',
|
||||
expect(warnSpy).toHaveBeenCalledWith(
|
||||
'Subscriber "Subscriber2" failed to process event for topic "testTopic"',
|
||||
new Error('NOPE 2'),
|
||||
);
|
||||
});
|
||||
|
||||
@@ -14,12 +14,14 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { LoggerService } from '@backstage/backend-plugin-api';
|
||||
import {
|
||||
DefaultEventsService,
|
||||
EventBroker,
|
||||
EventParams,
|
||||
EventsService,
|
||||
EventSubscriber,
|
||||
} from '@backstage/plugin-events-node';
|
||||
import { Logger } from 'winston';
|
||||
|
||||
/**
|
||||
* In process event broker which will pass the event to all registered subscribers
|
||||
@@ -27,44 +29,34 @@ import { Logger } from 'winston';
|
||||
* Events will not be persisted in any form.
|
||||
*
|
||||
* @public
|
||||
* @deprecated use `DefaultEventsService` from `@backstage/plugin-events-node` instead
|
||||
*/
|
||||
// TODO(pjungermann): add prom metrics? (see plugins/catalog-backend/src/util/metrics.ts, etc.)
|
||||
export class DefaultEventBroker implements EventBroker {
|
||||
constructor(private readonly logger: Logger) {}
|
||||
private readonly events: EventsService;
|
||||
|
||||
private readonly subscribers: {
|
||||
[topic: string]: EventSubscriber[];
|
||||
} = {};
|
||||
/**
|
||||
*
|
||||
* @param logger - logger
|
||||
* @param events - replacement that gets wrapped to support not yet migrated implementations.
|
||||
* An instance can be passed (required for a mixed mode), otherwise a new instance gets created internally.
|
||||
* @deprecated use `DefaultEventsService` directly instead
|
||||
*/
|
||||
constructor(logger: LoggerService, events?: EventsService) {
|
||||
this.events = events ?? DefaultEventsService.create({ logger });
|
||||
}
|
||||
|
||||
async publish(params: EventParams): Promise<void> {
|
||||
this.logger.debug(
|
||||
`Event received: topic=${params.topic}, metadata=${JSON.stringify(
|
||||
params.metadata,
|
||||
)}, payload=${JSON.stringify(params.eventPayload)}`,
|
||||
);
|
||||
|
||||
const subscribed = this.subscribers[params.topic] ?? [];
|
||||
await Promise.all(
|
||||
subscribed.map(async subscriber => {
|
||||
try {
|
||||
await subscriber.onEvent(params);
|
||||
} catch (error) {
|
||||
this.logger.error(
|
||||
`Subscriber "${subscriber.constructor.name}" failed to process event`,
|
||||
error,
|
||||
);
|
||||
}
|
||||
}),
|
||||
);
|
||||
return this.events.publish(params);
|
||||
}
|
||||
|
||||
subscribe(
|
||||
...subscribers: Array<EventSubscriber | Array<EventSubscriber>>
|
||||
): void {
|
||||
subscribers.flat().forEach(subscriber => {
|
||||
subscriber.supportsEventTopics().forEach(topic => {
|
||||
this.subscribers[topic] = this.subscribers[topic] ?? [];
|
||||
this.subscribers[topic].push(subscriber);
|
||||
subscribers.flat().forEach(async subscriber => {
|
||||
await this.events.subscribe({
|
||||
id: subscriber.constructor.name,
|
||||
topics: subscriber.supportsEventTopics(),
|
||||
onEvent: subscriber.onEvent.bind(subscriber),
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
@@ -26,6 +26,7 @@ import { DefaultEventBroker } from './DefaultEventBroker';
|
||||
* A builder that helps wire up all component parts of the event management.
|
||||
*
|
||||
* @public
|
||||
* @deprecated `EventBroker`, `EventPublisher`, and `EventSubscriber` got replaced by `EventsService` and its methods.
|
||||
*/
|
||||
export class EventsBackend {
|
||||
private eventBroker: EventBroker;
|
||||
|
||||
@@ -13,15 +13,15 @@ import { HttpPostIngressOptions } from '@backstage/plugin-events-node';
|
||||
export interface EventsExtensionPoint {
|
||||
// (undocumented)
|
||||
addHttpPostIngress(options: HttpPostIngressOptions): void;
|
||||
// (undocumented)
|
||||
// @deprecated (undocumented)
|
||||
addPublishers(
|
||||
...publishers: Array<EventPublisher | Array<EventPublisher>>
|
||||
): void;
|
||||
// (undocumented)
|
||||
// @deprecated (undocumented)
|
||||
addSubscribers(
|
||||
...subscribers: Array<EventSubscriber | Array<EventSubscriber>>
|
||||
): void;
|
||||
// (undocumented)
|
||||
// @deprecated (undocumented)
|
||||
setEventBroker(eventBroker: EventBroker): void;
|
||||
}
|
||||
|
||||
|
||||
@@ -3,7 +3,21 @@
|
||||
> Do not edit this file. It is a report generated by [API Extractor](https://api-extractor.com/).
|
||||
|
||||
```ts
|
||||
import { LoggerService } from '@backstage/backend-plugin-api';
|
||||
import { ServiceRef } from '@backstage/backend-plugin-api';
|
||||
|
||||
// @public
|
||||
export class DefaultEventsService implements EventsService {
|
||||
// (undocumented)
|
||||
static create(options: { logger: LoggerService }): DefaultEventsService;
|
||||
forPlugin(pluginId: string): EventsService;
|
||||
// (undocumented)
|
||||
publish(params: EventParams): Promise<void>;
|
||||
// (undocumented)
|
||||
subscribe(options: EventsServiceSubscribeOptions): Promise<void>;
|
||||
}
|
||||
|
||||
// @public @deprecated
|
||||
export interface EventBroker {
|
||||
publish(params: EventParams): Promise<void>;
|
||||
subscribe(
|
||||
@@ -18,9 +32,9 @@ export interface EventParams<TPayload = unknown> {
|
||||
topic: string;
|
||||
}
|
||||
|
||||
// @public
|
||||
// @public @deprecated
|
||||
export interface EventPublisher {
|
||||
// (undocumented)
|
||||
// @deprecated (undocumented)
|
||||
setEventBroker(eventBroker: EventBroker): Promise<void>;
|
||||
}
|
||||
|
||||
@@ -39,8 +53,29 @@ export abstract class EventRouter implements EventPublisher, EventSubscriber {
|
||||
}
|
||||
|
||||
// @public
|
||||
export interface EventsService {
|
||||
publish(params: EventParams): Promise<void>;
|
||||
subscribe(options: EventsServiceSubscribeOptions): Promise<void>;
|
||||
}
|
||||
|
||||
// @public (undocumented)
|
||||
export type EventsServiceEventHandler = (params: EventParams) => Promise<void>;
|
||||
|
||||
// @public
|
||||
export const eventsServiceRef: ServiceRef<EventsService, 'plugin'>;
|
||||
|
||||
// @public (undocumented)
|
||||
export type EventsServiceSubscribeOptions = {
|
||||
id: string;
|
||||
topics: string[];
|
||||
onEvent: EventsServiceEventHandler;
|
||||
};
|
||||
|
||||
// @public @deprecated
|
||||
export interface EventSubscriber {
|
||||
// @deprecated
|
||||
onEvent(params: EventParams): Promise<void>;
|
||||
// @deprecated
|
||||
supportsEventTopics(): string[];
|
||||
}
|
||||
|
||||
|
||||
@@ -45,6 +45,7 @@
|
||||
"@backstage/backend-plugin-api": "workspace:^"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@backstage/backend-common": "workspace:^",
|
||||
"@backstage/cli": "workspace:^"
|
||||
},
|
||||
"files": [
|
||||
|
||||
@@ -0,0 +1,111 @@
|
||||
/*
|
||||
* Copyright 2022 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { getVoidLogger } from '@backstage/backend-common';
|
||||
import { DefaultEventsService } from './DefaultEventsService';
|
||||
import { EventParams } from './EventParams';
|
||||
|
||||
const logger = getVoidLogger();
|
||||
|
||||
describe('DefaultEventsService', () => {
|
||||
it('passes events to interested subscribers', async () => {
|
||||
const events = DefaultEventsService.create({ logger });
|
||||
const eventsSubscriber1: EventParams[] = [];
|
||||
const eventsSubscriber2: EventParams[] = [];
|
||||
|
||||
await events.subscribe({
|
||||
id: 'subscriber1',
|
||||
topics: ['topicA', 'topicB'],
|
||||
onEvent: async event => {
|
||||
eventsSubscriber1.push(event);
|
||||
},
|
||||
});
|
||||
await events.subscribe({
|
||||
id: 'subscriber2',
|
||||
topics: ['topicB', 'topicC'],
|
||||
onEvent: async event => {
|
||||
eventsSubscriber2.push(event);
|
||||
},
|
||||
});
|
||||
await events.publish({
|
||||
topic: 'topicA',
|
||||
eventPayload: { test: 'topicA' },
|
||||
});
|
||||
await events.publish({
|
||||
topic: 'topicB',
|
||||
eventPayload: { test: 'topicB' },
|
||||
});
|
||||
await events.publish({
|
||||
topic: 'topicC',
|
||||
eventPayload: { test: 'topicC' },
|
||||
});
|
||||
await events.publish({
|
||||
topic: 'topicD',
|
||||
eventPayload: { test: 'topicD' },
|
||||
});
|
||||
|
||||
expect(eventsSubscriber1).toEqual([
|
||||
{ topic: 'topicA', eventPayload: { test: 'topicA' } },
|
||||
{ topic: 'topicB', eventPayload: { test: 'topicB' } },
|
||||
]);
|
||||
expect(eventsSubscriber2).toEqual([
|
||||
{ topic: 'topicB', eventPayload: { test: 'topicB' } },
|
||||
{ topic: 'topicC', eventPayload: { test: 'topicC' } },
|
||||
]);
|
||||
});
|
||||
|
||||
it('logs errors from subscribers', async () => {
|
||||
const topic = 'testTopic';
|
||||
|
||||
const warnSpy = jest.spyOn(logger, 'warn');
|
||||
const events = DefaultEventsService.create({ logger });
|
||||
|
||||
await events.subscribe({
|
||||
id: 'subscriber1',
|
||||
topics: [topic],
|
||||
onEvent: event => {
|
||||
throw new Error(`NOPE ${event.eventPayload}`);
|
||||
},
|
||||
});
|
||||
await events.publish({ topic, eventPayload: '1' });
|
||||
|
||||
expect(warnSpy).toHaveBeenCalledTimes(1);
|
||||
expect(warnSpy).toHaveBeenCalledWith(
|
||||
'Subscriber "subscriber1" failed to process event for topic "testTopic"',
|
||||
new Error('NOPE 1'),
|
||||
);
|
||||
|
||||
await events.subscribe({
|
||||
id: 'subscriber2',
|
||||
topics: [topic],
|
||||
onEvent: event => {
|
||||
throw new Error(`NOPE ${event.eventPayload}`);
|
||||
},
|
||||
});
|
||||
await events.publish({ topic, eventPayload: '2' });
|
||||
|
||||
// With two subscribers we should not halt on the first error but call all subscribers
|
||||
expect(warnSpy).toHaveBeenCalledTimes(3);
|
||||
expect(warnSpy).toHaveBeenCalledWith(
|
||||
'Subscriber "subscriber1" failed to process event for topic "testTopic"',
|
||||
new Error('NOPE 2'),
|
||||
);
|
||||
expect(warnSpy).toHaveBeenCalledWith(
|
||||
'Subscriber "subscriber2" failed to process event for topic "testTopic"',
|
||||
new Error('NOPE 2'),
|
||||
);
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,104 @@
|
||||
/*
|
||||
* Copyright 2024 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { LoggerService } from '@backstage/backend-plugin-api';
|
||||
import { EventParams } from './EventParams';
|
||||
import { EventsService, EventsServiceSubscribeOptions } from './EventsService';
|
||||
|
||||
/**
|
||||
* In-process event broker which will pass the event to all registered subscribers
|
||||
* interested in it.
|
||||
* Events will not be persisted in any form.
|
||||
* Events will not be passed to subscribers at other instances of the same cluster.
|
||||
*
|
||||
* @public
|
||||
*/
|
||||
// TODO(pjungermann): add opentelemetry? (see plugins/catalog-backend/src/util/opentelemetry.ts, etc.)
|
||||
export class DefaultEventsService implements EventsService {
|
||||
private readonly subscribers = new Map<
|
||||
string,
|
||||
Omit<EventsServiceSubscribeOptions, 'topics'>[]
|
||||
>();
|
||||
|
||||
private constructor(private readonly logger: LoggerService) {}
|
||||
|
||||
static create(options: { logger: LoggerService }): DefaultEventsService {
|
||||
return new DefaultEventsService(options.logger);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a plugin-scoped context of the `EventService`
|
||||
* that ensures to prefix subscriber IDs with the plugin ID.
|
||||
*
|
||||
* @param pluginId - The plugin that the `EventService` should be created for.
|
||||
*/
|
||||
forPlugin(pluginId: string): EventsService {
|
||||
return {
|
||||
publish: (params: EventParams): Promise<void> => {
|
||||
return this.publish(params);
|
||||
},
|
||||
subscribe: (options: EventsServiceSubscribeOptions): Promise<void> => {
|
||||
return this.subscribe({
|
||||
...options,
|
||||
id: `${pluginId}.${options.id}`,
|
||||
});
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
async publish(params: EventParams): Promise<void> {
|
||||
this.logger.debug(
|
||||
`Event received: topic=${params.topic}, metadata=${JSON.stringify(
|
||||
params.metadata,
|
||||
)}, payload=${JSON.stringify(params.eventPayload)}`,
|
||||
);
|
||||
|
||||
if (!this.subscribers.has(params.topic)) {
|
||||
return;
|
||||
}
|
||||
|
||||
const onEventPromises: Promise<void>[] = [];
|
||||
this.subscribers.get(params.topic)?.forEach(subscription => {
|
||||
onEventPromises.push(
|
||||
(async () => {
|
||||
try {
|
||||
await subscription.onEvent(params);
|
||||
} catch (error) {
|
||||
this.logger.warn(
|
||||
`Subscriber "${subscription.id}" failed to process event for topic "${params.topic}"`,
|
||||
error,
|
||||
);
|
||||
}
|
||||
})(),
|
||||
);
|
||||
});
|
||||
|
||||
await Promise.all(onEventPromises);
|
||||
}
|
||||
|
||||
async subscribe(options: EventsServiceSubscribeOptions): Promise<void> {
|
||||
options.topics.forEach(topic => {
|
||||
if (!this.subscribers.has(topic)) {
|
||||
this.subscribers.set(topic, []);
|
||||
}
|
||||
|
||||
this.subscribers.get(topic)!.push({
|
||||
id: options.id,
|
||||
onEvent: options.onEvent,
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -23,6 +23,7 @@ import { EventSubscriber } from './EventSubscriber';
|
||||
* others can subscribe for future events for topics they are interested in.
|
||||
*
|
||||
* @public
|
||||
* @deprecated use `EventsService` instead
|
||||
*/
|
||||
export interface EventBroker {
|
||||
/**
|
||||
|
||||
@@ -23,7 +23,11 @@ import { EventBroker } from './EventBroker';
|
||||
* or from event brokers, queues, etc.
|
||||
*
|
||||
* @public
|
||||
* @deprecated use the `EventsService` via the constructor, setter, or other means instead
|
||||
*/
|
||||
export interface EventPublisher {
|
||||
/**
|
||||
* @deprecated use the `EventsService` via the constructor, setter, or other means instead
|
||||
*/
|
||||
setEventBroker(eventBroker: EventBroker): Promise<void>;
|
||||
}
|
||||
|
||||
@@ -22,10 +22,13 @@ import { EventParams } from './EventParams';
|
||||
* or other actions to react on events.
|
||||
*
|
||||
* @public
|
||||
* @deprecated use the `EventsService` via the constructor, setter, or other means instead
|
||||
*/
|
||||
export interface EventSubscriber {
|
||||
/**
|
||||
* Supported event topics like "github", "bitbucketCloud", etc.
|
||||
*
|
||||
* @deprecated use the `EventsService` via the constructor, setter, or other means instead
|
||||
*/
|
||||
supportsEventTopics(): string[];
|
||||
|
||||
@@ -33,6 +36,7 @@ export interface EventSubscriber {
|
||||
* React on a received event.
|
||||
*
|
||||
* @param params - parameters for the to be received event.
|
||||
* @deprecated you are not required to expose this anymore when using `EventsService`
|
||||
*/
|
||||
onEvent(params: EventParams): Promise<void>;
|
||||
}
|
||||
|
||||
@@ -0,0 +1,57 @@
|
||||
/*
|
||||
* Copyright 2024 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { EventParams } from './EventParams';
|
||||
|
||||
/**
|
||||
* Allows a decoupled and asynchronous communication between components.
|
||||
* Components can publish events for a given topic and
|
||||
* others can subscribe for future events for topics they are interested in.
|
||||
*
|
||||
* @public
|
||||
*/
|
||||
export interface EventsService {
|
||||
/**
|
||||
* Publishes an event for the topic.
|
||||
*
|
||||
* @param params - parameters for the to be published event.
|
||||
*/
|
||||
publish(params: EventParams): Promise<void>;
|
||||
|
||||
/**
|
||||
* Subscribes to one or more topics, registering an event handler for them.
|
||||
*
|
||||
* @param options - event subscription options.
|
||||
*/
|
||||
subscribe(options: EventsServiceSubscribeOptions): Promise<void>;
|
||||
}
|
||||
|
||||
/**
|
||||
* @public
|
||||
*/
|
||||
export type EventsServiceSubscribeOptions = {
|
||||
/**
|
||||
* Identifier for the subscription. E.g., used as part of log messages.
|
||||
*/
|
||||
id: string;
|
||||
topics: string[];
|
||||
onEvent: EventsServiceEventHandler;
|
||||
};
|
||||
|
||||
/**
|
||||
* @public
|
||||
*/
|
||||
export type EventsServiceEventHandler = (params: EventParams) => Promise<void>;
|
||||
@@ -14,10 +14,13 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
export type { EventBroker } from './EventBroker';
|
||||
export type { EventParams } from './EventParams';
|
||||
export type { EventPublisher } from './EventPublisher';
|
||||
export { EventRouter } from './EventRouter';
|
||||
export type { EventSubscriber } from './EventSubscriber';
|
||||
export type {
|
||||
EventsService,
|
||||
EventsServiceSubscribeOptions,
|
||||
EventsServiceEventHandler,
|
||||
} from './EventsService';
|
||||
export { DefaultEventsService } from './DefaultEventsService';
|
||||
export * from './http';
|
||||
export { SubTopicEventRouter } from './SubTopicEventRouter';
|
||||
|
||||
@@ -0,0 +1,19 @@
|
||||
/*
|
||||
* Copyright 2024 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
export type { EventBroker } from './api/EventBroker';
|
||||
export type { EventPublisher } from './api/EventPublisher';
|
||||
export type { EventSubscriber } from './api/EventSubscriber';
|
||||
@@ -26,12 +26,21 @@ import {
|
||||
* @alpha
|
||||
*/
|
||||
export interface EventsExtensionPoint {
|
||||
/**
|
||||
* @deprecated use `eventsServiceRef` and `eventsServiceFactory` instead
|
||||
*/
|
||||
setEventBroker(eventBroker: EventBroker): void;
|
||||
|
||||
/**
|
||||
* @deprecated use `EventsService.publish` instead
|
||||
*/
|
||||
addPublishers(
|
||||
...publishers: Array<EventPublisher | Array<EventPublisher>>
|
||||
): void;
|
||||
|
||||
/**
|
||||
* @deprecated use `EventsService.subscribe` instead
|
||||
*/
|
||||
addSubscribers(
|
||||
...subscribers: Array<EventSubscriber | Array<EventSubscriber>>
|
||||
): void;
|
||||
|
||||
@@ -21,3 +21,5 @@
|
||||
*/
|
||||
|
||||
export * from './api';
|
||||
export * from './deprecated';
|
||||
export { eventsServiceRef } from './service';
|
||||
|
||||
@@ -0,0 +1,47 @@
|
||||
/*
|
||||
* Copyright 2024 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import {
|
||||
coreServices,
|
||||
createServiceFactory,
|
||||
createServiceRef,
|
||||
} from '@backstage/backend-plugin-api';
|
||||
import { EventsService, DefaultEventsService } from './api';
|
||||
|
||||
/**
|
||||
* The {@link EventsService} that allows to publish events, and subscribe to topics.
|
||||
* Uses the `root` scope so that events can be shared across all plugins, modules, and more.
|
||||
*
|
||||
* @public
|
||||
*/
|
||||
export const eventsServiceRef = createServiceRef<EventsService>({
|
||||
id: 'events.service',
|
||||
scope: 'plugin',
|
||||
defaultFactory: async service =>
|
||||
createServiceFactory({
|
||||
service,
|
||||
deps: {
|
||||
pluginMetadata: coreServices.pluginMetadata,
|
||||
rootLogger: coreServices.rootLogger,
|
||||
},
|
||||
async createRootContext({ rootLogger }) {
|
||||
return DefaultEventsService.create({ logger: rootLogger });
|
||||
},
|
||||
async factory({ pluginMetadata }, eventsService) {
|
||||
return eventsService.forPlugin(pluginMetadata.getId());
|
||||
},
|
||||
}),
|
||||
});
|
||||
@@ -6495,6 +6495,7 @@ __metadata:
|
||||
version: 0.0.0-use.local
|
||||
resolution: "@backstage/plugin-events-node@workspace:plugins/events-node"
|
||||
dependencies:
|
||||
"@backstage/backend-common": "workspace:^"
|
||||
"@backstage/backend-plugin-api": "workspace:^"
|
||||
"@backstage/cli": "workspace:^"
|
||||
languageName: unknown
|
||||
|
||||
Reference in New Issue
Block a user