feat(signals,events)!: migrate signals to use events service

+ allow defining event paylod in events service

Signed-off-by: Heikki Hellgren <heikki.hellgren@op.fi>
This commit is contained in:
Heikki Hellgren
2024-02-28 08:12:12 +02:00
parent 1d1c8a79fb
commit daf85dc4ab
16 changed files with 72 additions and 64 deletions
+7
View File
@@ -0,0 +1,7 @@
---
'@backstage/plugin-notifications-backend': minor
'@backstage/plugin-signals-backend': minor
'@backstage/plugin-signals-node': minor
---
BREAKING CHANGE: Migrates signals to use the `EventsService` and makes it mandatory
+1 -1
View File
@@ -106,7 +106,7 @@ function makeCreateEnv(config: Config) {
eventsService,
);
const signalService = DefaultSignalService.create({
eventBroker,
events: eventsService,
});
root.info(`Created UrlReader ${reader}`);
+1 -1
View File
@@ -22,7 +22,7 @@ export default async function createPlugin(
): Promise<Router> {
return await createRouter({
logger: env.logger,
eventBroker: env.eventBroker,
events: env.events,
identity: env.identity,
discovery: env.discovery,
});
@@ -33,9 +33,9 @@ import {
} from '@backstage/catalog-client';
import { DefaultSignalService } from '@backstage/plugin-signals-node';
import {
EventBroker,
EventParams,
EventSubscriber,
EventsService,
EventsServiceSubscribeOptions,
} from '@backstage/plugin-events-node';
export interface ServerOptions {
@@ -96,19 +96,17 @@ export async function startStandaloneServer(
},
};
const mockSubscribers: EventSubscriber[] = [];
const eventBroker: EventBroker = {
const mockSubscribers: EventsServiceSubscribeOptions[] = [];
const events: EventsService = {
async publish(params: EventParams): Promise<void> {
mockSubscribers.forEach(sub => sub.onEvent(params));
},
subscribe(...subscribers: EventSubscriber[]) {
subscribers.flat().forEach(subscriber => {
mockSubscribers.push(subscriber);
});
async subscribe(subscription: EventsServiceSubscribeOptions) {
mockSubscribers.push(subscription);
},
};
const signalService = DefaultSignalService.create({ eventBroker });
const signalService = DefaultSignalService.create({ events });
const router = await createRouter({
logger,
+2 -2
View File
@@ -5,7 +5,7 @@
```ts
import { AuthService } from '@backstage/backend-plugin-api';
import { BackendFeature } from '@backstage/backend-plugin-api';
import { EventBroker } from '@backstage/plugin-events-node';
import { EventsService } from '@backstage/plugin-events-node';
import express from 'express';
import { IdentityApi } from '@backstage/plugin-auth-node';
import { LoggerService } from '@backstage/backend-plugin-api';
@@ -22,7 +22,7 @@ export interface RouterOptions {
// (undocumented)
discovery: PluginEndpointDiscovery;
// (undocumented)
eventBroker?: EventBroker;
events: EventsService;
// (undocumented)
identity: IdentityApi;
// (undocumented)
+12 -3
View File
@@ -18,6 +18,7 @@ import {
createBackendPlugin,
} from '@backstage/backend-plugin-api';
import { createRouter } from './service/router';
import { eventsServiceRef } from '@backstage/plugin-events-node';
/**
* Signals backend plugin
@@ -35,10 +36,17 @@ export const signalsPlugin = createBackendPlugin({
discovery: coreServices.discovery,
userInfo: coreServices.userInfo,
auth: coreServices.auth,
// TODO: EventBroker. It is optional for now but it's actually required so waiting for the new backend system
// for the events-backend for this to work.
events: eventsServiceRef,
},
async init({ httpRouter, logger, identity, discovery, userInfo, auth }) {
async init({
httpRouter,
logger,
identity,
discovery,
userInfo,
auth,
events,
}) {
httpRouter.use(
await createRouter({
logger,
@@ -46,6 +54,7 @@ export const signalsPlugin = createBackendPlugin({
discovery,
userInfo,
auth,
events,
}),
);
},
@@ -14,7 +14,7 @@
* limitations under the License.
*/
import { WebSocket } from 'ws';
import { EventSubscriber } from '@backstage/plugin-events-node';
import { EventsServiceSubscribeOptions } from '@backstage/plugin-events-node';
import { SignalManager } from './SignalManager';
import { getVoidLogger } from '@backstage/backend-common';
@@ -56,15 +56,15 @@ class MockWebSocket {
describe('SignalManager', () => {
let onEvent: Function;
const mockEventBroker = {
const mockEvents = {
publish: async () => {},
subscribe: (subscriber: EventSubscriber) => {
subscribe: async (subscriber: EventsServiceSubscribeOptions) => {
onEvent = subscriber.onEvent;
},
};
const manager = SignalManager.create({
eventBroker: mockEventBroker,
events: mockEvents,
logger: getVoidLogger(),
});
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { EventBroker, EventParams } from '@backstage/plugin-events-node';
import { EventParams, EventsService } from '@backstage/plugin-events-node';
import { SignalPayload } from '@backstage/plugin-signals-node';
import { RawData, WebSocket } from 'ws';
import { v4 as uuid } from 'uuid';
@@ -38,8 +38,7 @@ export type SignalConnection = {
* @internal
*/
export type SignalManagerOptions = {
// TODO: Remove optional when events-backend can offer this service
eventBroker?: EventBroker;
events: EventsService;
logger: LoggerService;
};
@@ -49,7 +48,7 @@ export class SignalManager {
string,
SignalConnection
>();
private eventBroker?: EventBroker;
private events: EventsService;
private logger: LoggerService;
static create(options: SignalManagerOptions) {
@@ -57,12 +56,13 @@ export class SignalManager {
}
private constructor(options: SignalManagerOptions) {
({ eventBroker: this.eventBroker, logger: this.logger } = options);
({ events: this.events, logger: this.logger } = options);
this.eventBroker?.subscribe({
supportsEventTopics: () => ['signals'],
onEvent: (params: EventParams<SignalPayload>) =>
this.onEventBrokerEvent(params),
this.events.subscribe({
id: 'signals',
topics: ['signals'],
onEvent: (params: EventParams) =>
this.onEventBrokerEvent(params.eventPayload as SignalPayload),
});
}
@@ -126,10 +126,7 @@ export class SignalManager {
}
}
private async onEventBrokerEvent(
params: EventParams<SignalPayload>,
): Promise<void> {
const { eventPayload } = params;
private async onEventBrokerEvent(eventPayload: SignalPayload): Promise<void> {
if (!eventPayload.channel || !eventPayload.message) {
return;
}
@@ -21,11 +21,11 @@ import express from 'express';
import request from 'supertest';
import { createRouter } from './router';
import { EventBroker } from '@backstage/plugin-events-node';
import { EventsService } from '@backstage/plugin-events-node';
import { IdentityApi } from '@backstage/plugin-auth-node';
import { UserInfoService } from '@backstage/backend-plugin-api';
const eventBrokerMock: jest.Mocked<EventBroker> = {
const eventsServiceMock: jest.Mocked<EventsService> = {
subscribe: jest.fn(),
publish: jest.fn(),
};
@@ -50,7 +50,7 @@ describe('createRouter', () => {
const router = await createRouter({
logger: getVoidLogger(),
identity: identityApiMock,
eventBroker: eventBrokerMock,
events: eventsServiceMock,
discovery,
userInfo,
});
@@ -30,14 +30,14 @@ import * as https from 'https';
import http, { IncomingMessage } from 'http';
import { SignalManager } from './SignalManager';
import { IdentityApi } from '@backstage/plugin-auth-node';
import { EventBroker } from '@backstage/plugin-events-node';
import { EventsService } from '@backstage/plugin-events-node';
import { WebSocket, WebSocketServer } from 'ws';
import { Duplex } from 'stream';
/** @public */
export interface RouterOptions {
logger: LoggerService;
eventBroker?: EventBroker;
events: EventsService;
identity: IdentityApi;
discovery: PluginEndpointDiscovery;
auth?: AuthService;
@@ -24,9 +24,9 @@ import { createRouter } from './router';
import { DefaultSignalService } from '@backstage/plugin-signals-node';
import { DefaultIdentityClient } from '@backstage/plugin-auth-node';
import {
EventBroker,
EventParams,
EventSubscriber,
EventsService,
EventsServiceSubscribeOptions,
} from '@backstage/plugin-events-node';
import {
BackstageCredentials,
@@ -53,20 +53,18 @@ export async function startStandaloneServer(
issuer: await discovery.getExternalBaseUrl('auth'),
});
const mockSubscribers: EventSubscriber[] = [];
const eventBroker: EventBroker = {
const mockSubscribers: EventsServiceSubscribeOptions[] = [];
const events: EventsService = {
async publish(params: EventParams): Promise<void> {
mockSubscribers.forEach(sub => sub.onEvent(params));
},
subscribe(...subscribers: EventSubscriber[]) {
subscribers.flat().forEach(subscriber => {
mockSubscribers.push(subscriber);
});
async subscribe(subscription: EventsServiceSubscribeOptions) {
mockSubscribers.push(subscription);
},
};
const signals = DefaultSignalService.create({
eventBroker,
events,
});
const userInfo: UserInfoService = {
@@ -81,7 +79,7 @@ export async function startStandaloneServer(
const router = await createRouter({
logger,
identity,
eventBroker,
events,
discovery,
userInfo,
});
+2 -2
View File
@@ -3,7 +3,7 @@
> Do not edit this file. It is a report generated by [API Extractor](https://api-extractor.com/).
```ts
import { EventBroker } from '@backstage/plugin-events-node';
import { EventsService } from '@backstage/plugin-events-node';
import { JsonObject } from '@backstage/types';
import { ServiceRef } from '@backstage/backend-plugin-api';
@@ -35,7 +35,7 @@ export const signalService: ServiceRef<SignalService, 'plugin'>;
// @public (undocumented)
export type SignalServiceOptions = {
eventBroker?: EventBroker;
events: EventsService;
};
// (No @packageDocumentation comment for this package)
@@ -16,12 +16,12 @@
import { DefaultSignalService } from './DefaultSignalService';
describe('DefaultSignalService', () => {
const mockEventBroker = {
const mockEvents = {
publish: jest.fn(),
subscribe: jest.fn(),
};
const service = DefaultSignalService.create({ eventBroker: mockEventBroker });
const service = DefaultSignalService.create({ events: mockEvents });
it('should publish signal', () => {
const signal = {
@@ -30,7 +30,7 @@ describe('DefaultSignalService', () => {
message: { msg: 'hello world' },
};
service.publish(signal);
expect(mockEventBroker.publish).toHaveBeenCalledWith({
expect(mockEvents.publish).toHaveBeenCalledWith({
topic: 'signals',
eventPayload: signal,
});
@@ -13,22 +13,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { EventBroker } from '@backstage/plugin-events-node';
import { EventsService } from '@backstage/plugin-events-node';
import { SignalPayload, SignalServiceOptions } from './types';
import { SignalService } from './SignalService';
import { JsonObject } from '@backstage/types';
/** @public */
export class DefaultSignalService implements SignalService {
// TODO: Remove this to be optional when events-backend has eventBroker as service
private eventBroker?: EventBroker;
private events: EventsService;
static create(options: SignalServiceOptions) {
return new DefaultSignalService(options);
}
private constructor(options: SignalServiceOptions) {
({ eventBroker: this.eventBroker } = options);
({ events: this.events } = options);
}
/**
@@ -38,7 +37,7 @@ export class DefaultSignalService implements SignalService {
async publish<TMessage extends JsonObject = JsonObject>(
signal: SignalPayload<TMessage>,
) {
await this.eventBroker?.publish({
await this.events.publish({
topic: 'signals',
eventPayload: signal,
});
+4 -4
View File
@@ -19,6 +19,7 @@ import {
} from '@backstage/backend-plugin-api';
import { DefaultSignalService } from './DefaultSignalService';
import { SignalService } from './SignalService';
import { eventsServiceRef } from '@backstage/plugin-events-node';
/** @public */
export const signalService = createServiceRef<SignalService>({
@@ -28,11 +29,10 @@ export const signalService = createServiceRef<SignalService>({
createServiceFactory({
service,
deps: {
// TODO: EventBroker. It is optional for now but it's actually required so waiting for the new backend system
// for the events-backend for this to work.
events: eventsServiceRef,
},
factory({}) {
return DefaultSignalService.create({});
factory({ events }) {
return DefaultSignalService.create({ events });
},
}),
});
+2 -2
View File
@@ -13,14 +13,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { EventBroker } from '@backstage/plugin-events-node';
import { EventsService } from '@backstage/plugin-events-node';
import { JsonObject } from '@backstage/types';
/**
* @public
*/
export type SignalServiceOptions = {
eventBroker?: EventBroker;
events: EventsService;
};
/** @public */