feat: improve signal lifecycle management + server side pinging
Signed-off-by: Heikki Hellgren <heikki.hellgren@op.fi>
This commit is contained in:
@@ -0,0 +1,5 @@
|
||||
---
|
||||
'@backstage/plugin-signals-backend': patch
|
||||
---
|
||||
|
||||
Improved signal lifecycle management and added server side pinging of connections
|
||||
@@ -25,5 +25,6 @@ export default async function createPlugin(
|
||||
events: env.events,
|
||||
identity: env.identity,
|
||||
discovery: env.discovery,
|
||||
config: env.config,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -5,9 +5,11 @@
|
||||
```ts
|
||||
import { AuthService } from '@backstage/backend-plugin-api';
|
||||
import { BackendFeature } from '@backstage/backend-plugin-api';
|
||||
import { Config } from '@backstage/config';
|
||||
import { EventsService } from '@backstage/plugin-events-node';
|
||||
import express from 'express';
|
||||
import { IdentityApi } from '@backstage/plugin-auth-node';
|
||||
import { LifecycleService } from '@backstage/backend-plugin-api';
|
||||
import { LoggerService } from '@backstage/backend-plugin-api';
|
||||
import { PluginEndpointDiscovery } from '@backstage/backend-common';
|
||||
import { UserInfoService } from '@backstage/backend-plugin-api';
|
||||
@@ -20,12 +22,16 @@ export interface RouterOptions {
|
||||
// (undocumented)
|
||||
auth?: AuthService;
|
||||
// (undocumented)
|
||||
config: Config;
|
||||
// (undocumented)
|
||||
discovery: PluginEndpointDiscovery;
|
||||
// (undocumented)
|
||||
events: EventsService;
|
||||
// (undocumented)
|
||||
identity: IdentityApi;
|
||||
// (undocumented)
|
||||
lifecycle?: LifecycleService;
|
||||
// (undocumented)
|
||||
logger: LoggerService;
|
||||
// (undocumented)
|
||||
userInfo?: UserInfoService;
|
||||
|
||||
@@ -41,7 +41,7 @@
|
||||
"node-fetch": "^2.6.7",
|
||||
"uuid": "^9.0.0",
|
||||
"winston": "^3.2.1",
|
||||
"ws": "^8.14.2",
|
||||
"ws": "^8.17.0",
|
||||
"yn": "^4.0.0"
|
||||
},
|
||||
"devDependencies": {
|
||||
@@ -51,6 +51,7 @@
|
||||
"@backstage/plugin-auth-backend-module-guest-provider": "workspace:^",
|
||||
"@backstage/plugin-events-backend": "workspace:^",
|
||||
"@types/supertest": "^2.0.8",
|
||||
"@types/ws": "^8.5.10",
|
||||
"msw": "^1.0.0",
|
||||
"supertest": "^6.2.4"
|
||||
},
|
||||
|
||||
@@ -32,6 +32,8 @@ export const signalsPlugin = createBackendPlugin({
|
||||
deps: {
|
||||
httpRouter: coreServices.httpRouter,
|
||||
logger: coreServices.logger,
|
||||
config: coreServices.rootConfig,
|
||||
lifecycle: coreServices.rootLifecycle,
|
||||
identity: coreServices.identity,
|
||||
discovery: coreServices.discovery,
|
||||
userInfo: coreServices.userInfo,
|
||||
@@ -41,6 +43,8 @@ export const signalsPlugin = createBackendPlugin({
|
||||
async init({
|
||||
httpRouter,
|
||||
logger,
|
||||
config,
|
||||
lifecycle,
|
||||
identity,
|
||||
discovery,
|
||||
userInfo,
|
||||
@@ -50,7 +54,9 @@ export const signalsPlugin = createBackendPlugin({
|
||||
httpRouter.use(
|
||||
await createRouter({
|
||||
logger,
|
||||
config,
|
||||
identity,
|
||||
lifecycle,
|
||||
discovery,
|
||||
userInfo,
|
||||
auth,
|
||||
|
||||
@@ -17,6 +17,7 @@ import { WebSocket } from 'ws';
|
||||
import { EventsServiceSubscribeOptions } from '@backstage/plugin-events-node';
|
||||
import { SignalManager } from './SignalManager';
|
||||
import { getVoidLogger } from '@backstage/backend-common';
|
||||
import { ConfigReader } from '@backstage/config';
|
||||
|
||||
class MockWebSocket {
|
||||
closed: boolean = false;
|
||||
@@ -30,6 +31,11 @@ class MockWebSocket {
|
||||
this.closed = true;
|
||||
}
|
||||
|
||||
terminate(): void {
|
||||
this.readyState = WebSocket.CLOSED;
|
||||
this.closed = true;
|
||||
}
|
||||
|
||||
on(
|
||||
event: string | symbol,
|
||||
listener: (this: WebSocket, ...args: any[]) => void,
|
||||
@@ -63,9 +69,23 @@ describe('SignalManager', () => {
|
||||
},
|
||||
};
|
||||
|
||||
const shutdownHooks: Function[] = [];
|
||||
const mockLifecycle = {
|
||||
addShutdownHook: (hook: Function) => shutdownHooks.push(hook),
|
||||
};
|
||||
|
||||
const manager = SignalManager.create({
|
||||
events: mockEvents,
|
||||
logger: getVoidLogger(),
|
||||
config: new ConfigReader({}),
|
||||
lifecycle: mockLifecycle as any,
|
||||
});
|
||||
|
||||
it('should close all connections when server is closed', () => {
|
||||
const ws = new MockWebSocket();
|
||||
manager.addConnection(ws as unknown as WebSocket);
|
||||
shutdownHooks.forEach(hook => hook());
|
||||
expect(ws.closed).toBeTruthy();
|
||||
});
|
||||
|
||||
it('should close connection on error', () => {
|
||||
|
||||
@@ -20,8 +20,10 @@ import { v4 as uuid } from 'uuid';
|
||||
import { JsonObject } from '@backstage/types';
|
||||
import {
|
||||
BackstageUserInfo,
|
||||
LifecycleService,
|
||||
LoggerService,
|
||||
} from '@backstage/backend-plugin-api';
|
||||
import { Config } from '@backstage/config';
|
||||
|
||||
/**
|
||||
* @internal
|
||||
@@ -32,6 +34,7 @@ export type SignalConnection = {
|
||||
ws: WebSocket;
|
||||
ownershipEntityRefs: string[];
|
||||
subscriptions: Set<string>;
|
||||
isAlive: boolean;
|
||||
};
|
||||
|
||||
/**
|
||||
@@ -39,7 +42,9 @@ export type SignalConnection = {
|
||||
*/
|
||||
export type SignalManagerOptions = {
|
||||
events: EventsService;
|
||||
config: Config;
|
||||
logger: LoggerService;
|
||||
lifecycle?: LifecycleService;
|
||||
};
|
||||
|
||||
/** @internal */
|
||||
@@ -50,6 +55,7 @@ export class SignalManager {
|
||||
>();
|
||||
private events: EventsService;
|
||||
private logger: LoggerService;
|
||||
private pingInterval: ReturnType<typeof setInterval> | undefined;
|
||||
|
||||
static create(options: SignalManagerOptions) {
|
||||
return new SignalManager(options);
|
||||
@@ -64,11 +70,43 @@ export class SignalManager {
|
||||
onEvent: (params: EventParams) =>
|
||||
this.onEventBrokerEvent(params.eventPayload as SignalPayload),
|
||||
});
|
||||
|
||||
options.lifecycle?.addShutdownHook(() => this.onShutdown());
|
||||
}
|
||||
|
||||
private ping() {
|
||||
this.connections.forEach(conn => {
|
||||
if (!conn.isAlive) {
|
||||
this.logger.debug(`Connection ${conn.id} is not alive, terminating`);
|
||||
conn.ws.terminate();
|
||||
return;
|
||||
}
|
||||
|
||||
conn.isAlive = false;
|
||||
conn.ws.ping();
|
||||
});
|
||||
}
|
||||
|
||||
private onShutdown() {
|
||||
if (this.pingInterval) {
|
||||
clearInterval(this.pingInterval);
|
||||
}
|
||||
|
||||
// TODO: Unsubscribe from events?
|
||||
|
||||
this.connections.forEach(conn => {
|
||||
conn.ws.terminate();
|
||||
});
|
||||
this.connections.clear();
|
||||
}
|
||||
|
||||
addConnection(ws: WebSocket, identity?: BackstageUserInfo) {
|
||||
const id = uuid();
|
||||
// Start pinging on first connection
|
||||
if (!this.pingInterval) {
|
||||
this.pingInterval = setInterval(() => this.ping(), 30000);
|
||||
}
|
||||
|
||||
const id = uuid();
|
||||
const conn = {
|
||||
id,
|
||||
user: identity?.userEntityRef ?? 'user:default/guest',
|
||||
@@ -77,25 +115,37 @@ export class SignalManager {
|
||||
'user:default/guest',
|
||||
],
|
||||
subscriptions: new Set<string>(),
|
||||
isAlive: true,
|
||||
};
|
||||
|
||||
this.connections.set(id, conn);
|
||||
|
||||
this.logger.debug(`Connection ${id} connected`);
|
||||
ws.on('error', (err: Error) => {
|
||||
this.logger.error(
|
||||
`Error occurred with connection ${id}: ${err}, closing connection`,
|
||||
);
|
||||
ws.close();
|
||||
ws.terminate();
|
||||
this.connections.delete(id);
|
||||
});
|
||||
|
||||
ws.on('close', (code: number, reason: Buffer) => {
|
||||
this.logger.info(
|
||||
this.logger.debug(
|
||||
`Connection ${id} closed with code ${code}, reason: ${reason}`,
|
||||
);
|
||||
ws.terminate();
|
||||
this.connections.delete(id);
|
||||
});
|
||||
|
||||
ws.on('ping', () => {
|
||||
conn.isAlive = true;
|
||||
ws.pong();
|
||||
});
|
||||
|
||||
ws.on('pong', () => {
|
||||
conn.isAlive = true;
|
||||
});
|
||||
|
||||
ws.on('message', (data: RawData, isBinary: boolean) => {
|
||||
this.logger.debug(`Received message from connection ${id}: ${data}`);
|
||||
if (isBinary) {
|
||||
@@ -114,12 +164,12 @@ export class SignalManager {
|
||||
|
||||
private handleMessage(connection: SignalConnection, message: JsonObject) {
|
||||
if (message.action === 'subscribe' && message.channel) {
|
||||
this.logger.info(
|
||||
this.logger.debug(
|
||||
`Connection ${connection.id} subscribed to ${message.channel}`,
|
||||
);
|
||||
connection.subscriptions.add(message.channel as string);
|
||||
} else if (message.action === 'unsubscribe' && message.channel) {
|
||||
this.logger.info(
|
||||
this.logger.debug(
|
||||
`Connection ${connection.id} unsubscribed from ${message.channel}`,
|
||||
);
|
||||
connection.subscriptions.delete(message.channel as string);
|
||||
|
||||
@@ -24,6 +24,7 @@ import { createRouter } from './router';
|
||||
import { EventsService } from '@backstage/plugin-events-node';
|
||||
import { IdentityApi } from '@backstage/plugin-auth-node';
|
||||
import { UserInfoService } from '@backstage/backend-plugin-api';
|
||||
import { ConfigReader } from '@backstage/config';
|
||||
|
||||
const eventsServiceMock: jest.Mocked<EventsService> = {
|
||||
subscribe: jest.fn(),
|
||||
@@ -53,6 +54,7 @@ describe('createRouter', () => {
|
||||
events: eventsServiceMock,
|
||||
discovery,
|
||||
userInfo,
|
||||
config: new ConfigReader({}),
|
||||
});
|
||||
app = express().use(router);
|
||||
});
|
||||
|
||||
@@ -23,6 +23,7 @@ import Router from 'express-promise-router';
|
||||
import {
|
||||
AuthService,
|
||||
BackstageUserInfo,
|
||||
LifecycleService,
|
||||
LoggerService,
|
||||
UserInfoService,
|
||||
} from '@backstage/backend-plugin-api';
|
||||
@@ -33,6 +34,7 @@ import { IdentityApi } from '@backstage/plugin-auth-node';
|
||||
import { EventsService } from '@backstage/plugin-events-node';
|
||||
import { WebSocket, WebSocketServer } from 'ws';
|
||||
import { Duplex } from 'stream';
|
||||
import { Config } from '@backstage/config';
|
||||
|
||||
/** @public */
|
||||
export interface RouterOptions {
|
||||
@@ -40,6 +42,8 @@ export interface RouterOptions {
|
||||
events: EventsService;
|
||||
identity: IdentityApi;
|
||||
discovery: PluginEndpointDiscovery;
|
||||
config: Config;
|
||||
lifecycle?: LifecycleService;
|
||||
auth?: AuthService;
|
||||
userInfo?: UserInfoService;
|
||||
}
|
||||
@@ -56,16 +60,12 @@ export async function createRouter(
|
||||
let apiUrl: string | undefined = undefined;
|
||||
|
||||
const webSocketServer = new WebSocketServer({
|
||||
noServer: true,
|
||||
clientTracking: false,
|
||||
noServer: true, // handle upgrade manually
|
||||
clientTracking: false, // handle connections in SignalManager
|
||||
});
|
||||
|
||||
webSocketServer.on('error', (error: Error) => {
|
||||
logger.error('WebSocket server error', error);
|
||||
});
|
||||
|
||||
webSocketServer.on('close', () => {
|
||||
logger.info('WebSocket server closed');
|
||||
logger.error(`WebSocket server error: ${error}`);
|
||||
});
|
||||
|
||||
const handleUpgrade = async (
|
||||
@@ -94,7 +94,7 @@ export async function createRouter(
|
||||
}
|
||||
}
|
||||
} catch (e) {
|
||||
logger.error('Failed to authenticate WebSocket connection', e);
|
||||
logger.error(`Failed to authenticate WebSocket connection: ${e}`);
|
||||
socket.write(
|
||||
'HTTP/1.1 401 Web Socket Protocol Handshake\r\n' +
|
||||
'Upgrade: WebSocket\r\n' +
|
||||
@@ -115,7 +115,14 @@ export async function createRouter(
|
||||
},
|
||||
);
|
||||
} catch (e) {
|
||||
logger.error('Failed to handle WebSocket upgrade', e);
|
||||
logger.error(`Failed to handle WebSocket upgrade: ${e}`);
|
||||
socket.write(
|
||||
'HTTP/1.1 500 Web Socket Protocol Handshake\r\n' +
|
||||
'Upgrade: WebSocket\r\n' +
|
||||
'Connection: Upgrade\r\n' +
|
||||
'\r\n',
|
||||
);
|
||||
socket.destroy();
|
||||
}
|
||||
};
|
||||
|
||||
@@ -145,7 +152,6 @@ export async function createRouter(
|
||||
router.use(upgradeMiddleware);
|
||||
|
||||
router.get('/health', (_, response) => {
|
||||
logger.info('PONG!');
|
||||
response.json({ status: 'ok' });
|
||||
});
|
||||
|
||||
|
||||
@@ -7271,6 +7271,7 @@ __metadata:
|
||||
"@backstage/types": "workspace:^"
|
||||
"@types/express": "*"
|
||||
"@types/supertest": ^2.0.8
|
||||
"@types/ws": ^8.5.10
|
||||
express: ^4.17.1
|
||||
express-promise-router: ^4.1.0
|
||||
http-proxy-middleware: ^2.0.0
|
||||
@@ -7279,7 +7280,7 @@ __metadata:
|
||||
supertest: ^6.2.4
|
||||
uuid: ^9.0.0
|
||||
winston: ^3.2.1
|
||||
ws: ^8.14.2
|
||||
ws: ^8.17.0
|
||||
yn: ^4.0.0
|
||||
languageName: unknown
|
||||
linkType: soft
|
||||
@@ -42139,9 +42140,9 @@ __metadata:
|
||||
languageName: node
|
||||
linkType: hard
|
||||
|
||||
"ws@npm:*, ws@npm:^8.11.0, ws@npm:^8.12.0, ws@npm:^8.13.0, ws@npm:^8.14.2, ws@npm:^8.16.0, ws@npm:^8.8.0":
|
||||
version: 8.16.0
|
||||
resolution: "ws@npm:8.16.0"
|
||||
"ws@npm:*, ws@npm:^8.11.0, ws@npm:^8.12.0, ws@npm:^8.13.0, ws@npm:^8.14.2, ws@npm:^8.16.0, ws@npm:^8.17.0, ws@npm:^8.8.0":
|
||||
version: 8.17.0
|
||||
resolution: "ws@npm:8.17.0"
|
||||
peerDependencies:
|
||||
bufferutil: ^4.0.1
|
||||
utf-8-validate: ">=5.0.2"
|
||||
@@ -42150,7 +42151,7 @@ __metadata:
|
||||
optional: true
|
||||
utf-8-validate:
|
||||
optional: true
|
||||
checksum: feb3eecd2bae82fa8a8beef800290ce437d8b8063bdc69712725f21aef77c49cb2ff45c6e5e7fce622248f9c7abaee506bae0a9064067ffd6935460c7357321b
|
||||
checksum: 147ef9eab0251364e1d2c55338ad0efb15e6913923ccbfdf20f7a8a6cb8f88432bcd7f4d8f66977135bfad35575644f9983201c1a361019594a4e53977bf6d4e
|
||||
languageName: node
|
||||
linkType: hard
|
||||
|
||||
|
||||
Reference in New Issue
Block a user