feat: improve signal lifecycle management + server side pinging

Signed-off-by: Heikki Hellgren <heikki.hellgren@op.fi>
This commit is contained in:
Heikki Hellgren
2024-05-02 09:59:37 +03:00
parent bb5b6ee309
commit 845d56a76f
10 changed files with 119 additions and 21 deletions
+5
View File
@@ -0,0 +1,5 @@
---
'@backstage/plugin-signals-backend': patch
---
Improved signal lifecycle management and added server side pinging of connections
@@ -25,5 +25,6 @@ export default async function createPlugin(
events: env.events,
identity: env.identity,
discovery: env.discovery,
config: env.config,
});
}
+6
View File
@@ -5,9 +5,11 @@
```ts
import { AuthService } from '@backstage/backend-plugin-api';
import { BackendFeature } from '@backstage/backend-plugin-api';
import { Config } from '@backstage/config';
import { EventsService } from '@backstage/plugin-events-node';
import express from 'express';
import { IdentityApi } from '@backstage/plugin-auth-node';
import { LifecycleService } from '@backstage/backend-plugin-api';
import { LoggerService } from '@backstage/backend-plugin-api';
import { PluginEndpointDiscovery } from '@backstage/backend-common';
import { UserInfoService } from '@backstage/backend-plugin-api';
@@ -20,12 +22,16 @@ export interface RouterOptions {
// (undocumented)
auth?: AuthService;
// (undocumented)
config: Config;
// (undocumented)
discovery: PluginEndpointDiscovery;
// (undocumented)
events: EventsService;
// (undocumented)
identity: IdentityApi;
// (undocumented)
lifecycle?: LifecycleService;
// (undocumented)
logger: LoggerService;
// (undocumented)
userInfo?: UserInfoService;
+2 -1
View File
@@ -41,7 +41,7 @@
"node-fetch": "^2.6.7",
"uuid": "^9.0.0",
"winston": "^3.2.1",
"ws": "^8.14.2",
"ws": "^8.17.0",
"yn": "^4.0.0"
},
"devDependencies": {
@@ -51,6 +51,7 @@
"@backstage/plugin-auth-backend-module-guest-provider": "workspace:^",
"@backstage/plugin-events-backend": "workspace:^",
"@types/supertest": "^2.0.8",
"@types/ws": "^8.5.10",
"msw": "^1.0.0",
"supertest": "^6.2.4"
},
+6
View File
@@ -32,6 +32,8 @@ export const signalsPlugin = createBackendPlugin({
deps: {
httpRouter: coreServices.httpRouter,
logger: coreServices.logger,
config: coreServices.rootConfig,
lifecycle: coreServices.rootLifecycle,
identity: coreServices.identity,
discovery: coreServices.discovery,
userInfo: coreServices.userInfo,
@@ -41,6 +43,8 @@ export const signalsPlugin = createBackendPlugin({
async init({
httpRouter,
logger,
config,
lifecycle,
identity,
discovery,
userInfo,
@@ -50,7 +54,9 @@ export const signalsPlugin = createBackendPlugin({
httpRouter.use(
await createRouter({
logger,
config,
identity,
lifecycle,
discovery,
userInfo,
auth,
@@ -17,6 +17,7 @@ import { WebSocket } from 'ws';
import { EventsServiceSubscribeOptions } from '@backstage/plugin-events-node';
import { SignalManager } from './SignalManager';
import { getVoidLogger } from '@backstage/backend-common';
import { ConfigReader } from '@backstage/config';
class MockWebSocket {
closed: boolean = false;
@@ -30,6 +31,11 @@ class MockWebSocket {
this.closed = true;
}
terminate(): void {
this.readyState = WebSocket.CLOSED;
this.closed = true;
}
on(
event: string | symbol,
listener: (this: WebSocket, ...args: any[]) => void,
@@ -63,9 +69,23 @@ describe('SignalManager', () => {
},
};
const shutdownHooks: Function[] = [];
const mockLifecycle = {
addShutdownHook: (hook: Function) => shutdownHooks.push(hook),
};
const manager = SignalManager.create({
events: mockEvents,
logger: getVoidLogger(),
config: new ConfigReader({}),
lifecycle: mockLifecycle as any,
});
it('should close all connections when server is closed', () => {
const ws = new MockWebSocket();
manager.addConnection(ws as unknown as WebSocket);
shutdownHooks.forEach(hook => hook());
expect(ws.closed).toBeTruthy();
});
it('should close connection on error', () => {
@@ -20,8 +20,10 @@ import { v4 as uuid } from 'uuid';
import { JsonObject } from '@backstage/types';
import {
BackstageUserInfo,
LifecycleService,
LoggerService,
} from '@backstage/backend-plugin-api';
import { Config } from '@backstage/config';
/**
* @internal
@@ -32,6 +34,7 @@ export type SignalConnection = {
ws: WebSocket;
ownershipEntityRefs: string[];
subscriptions: Set<string>;
isAlive: boolean;
};
/**
@@ -39,7 +42,9 @@ export type SignalConnection = {
*/
export type SignalManagerOptions = {
events: EventsService;
config: Config;
logger: LoggerService;
lifecycle?: LifecycleService;
};
/** @internal */
@@ -50,6 +55,7 @@ export class SignalManager {
>();
private events: EventsService;
private logger: LoggerService;
private pingInterval: ReturnType<typeof setInterval> | undefined;
static create(options: SignalManagerOptions) {
return new SignalManager(options);
@@ -64,11 +70,43 @@ export class SignalManager {
onEvent: (params: EventParams) =>
this.onEventBrokerEvent(params.eventPayload as SignalPayload),
});
options.lifecycle?.addShutdownHook(() => this.onShutdown());
}
private ping() {
this.connections.forEach(conn => {
if (!conn.isAlive) {
this.logger.debug(`Connection ${conn.id} is not alive, terminating`);
conn.ws.terminate();
return;
}
conn.isAlive = false;
conn.ws.ping();
});
}
private onShutdown() {
if (this.pingInterval) {
clearInterval(this.pingInterval);
}
// TODO: Unsubscribe from events?
this.connections.forEach(conn => {
conn.ws.terminate();
});
this.connections.clear();
}
addConnection(ws: WebSocket, identity?: BackstageUserInfo) {
const id = uuid();
// Start pinging on first connection
if (!this.pingInterval) {
this.pingInterval = setInterval(() => this.ping(), 30000);
}
const id = uuid();
const conn = {
id,
user: identity?.userEntityRef ?? 'user:default/guest',
@@ -77,25 +115,37 @@ export class SignalManager {
'user:default/guest',
],
subscriptions: new Set<string>(),
isAlive: true,
};
this.connections.set(id, conn);
this.logger.debug(`Connection ${id} connected`);
ws.on('error', (err: Error) => {
this.logger.error(
`Error occurred with connection ${id}: ${err}, closing connection`,
);
ws.close();
ws.terminate();
this.connections.delete(id);
});
ws.on('close', (code: number, reason: Buffer) => {
this.logger.info(
this.logger.debug(
`Connection ${id} closed with code ${code}, reason: ${reason}`,
);
ws.terminate();
this.connections.delete(id);
});
ws.on('ping', () => {
conn.isAlive = true;
ws.pong();
});
ws.on('pong', () => {
conn.isAlive = true;
});
ws.on('message', (data: RawData, isBinary: boolean) => {
this.logger.debug(`Received message from connection ${id}: ${data}`);
if (isBinary) {
@@ -114,12 +164,12 @@ export class SignalManager {
private handleMessage(connection: SignalConnection, message: JsonObject) {
if (message.action === 'subscribe' && message.channel) {
this.logger.info(
this.logger.debug(
`Connection ${connection.id} subscribed to ${message.channel}`,
);
connection.subscriptions.add(message.channel as string);
} else if (message.action === 'unsubscribe' && message.channel) {
this.logger.info(
this.logger.debug(
`Connection ${connection.id} unsubscribed from ${message.channel}`,
);
connection.subscriptions.delete(message.channel as string);
@@ -24,6 +24,7 @@ import { createRouter } from './router';
import { EventsService } from '@backstage/plugin-events-node';
import { IdentityApi } from '@backstage/plugin-auth-node';
import { UserInfoService } from '@backstage/backend-plugin-api';
import { ConfigReader } from '@backstage/config';
const eventsServiceMock: jest.Mocked<EventsService> = {
subscribe: jest.fn(),
@@ -53,6 +54,7 @@ describe('createRouter', () => {
events: eventsServiceMock,
discovery,
userInfo,
config: new ConfigReader({}),
});
app = express().use(router);
});
+16 -10
View File
@@ -23,6 +23,7 @@ import Router from 'express-promise-router';
import {
AuthService,
BackstageUserInfo,
LifecycleService,
LoggerService,
UserInfoService,
} from '@backstage/backend-plugin-api';
@@ -33,6 +34,7 @@ import { IdentityApi } from '@backstage/plugin-auth-node';
import { EventsService } from '@backstage/plugin-events-node';
import { WebSocket, WebSocketServer } from 'ws';
import { Duplex } from 'stream';
import { Config } from '@backstage/config';
/** @public */
export interface RouterOptions {
@@ -40,6 +42,8 @@ export interface RouterOptions {
events: EventsService;
identity: IdentityApi;
discovery: PluginEndpointDiscovery;
config: Config;
lifecycle?: LifecycleService;
auth?: AuthService;
userInfo?: UserInfoService;
}
@@ -56,16 +60,12 @@ export async function createRouter(
let apiUrl: string | undefined = undefined;
const webSocketServer = new WebSocketServer({
noServer: true,
clientTracking: false,
noServer: true, // handle upgrade manually
clientTracking: false, // handle connections in SignalManager
});
webSocketServer.on('error', (error: Error) => {
logger.error('WebSocket server error', error);
});
webSocketServer.on('close', () => {
logger.info('WebSocket server closed');
logger.error(`WebSocket server error: ${error}`);
});
const handleUpgrade = async (
@@ -94,7 +94,7 @@ export async function createRouter(
}
}
} catch (e) {
logger.error('Failed to authenticate WebSocket connection', e);
logger.error(`Failed to authenticate WebSocket connection: ${e}`);
socket.write(
'HTTP/1.1 401 Web Socket Protocol Handshake\r\n' +
'Upgrade: WebSocket\r\n' +
@@ -115,7 +115,14 @@ export async function createRouter(
},
);
} catch (e) {
logger.error('Failed to handle WebSocket upgrade', e);
logger.error(`Failed to handle WebSocket upgrade: ${e}`);
socket.write(
'HTTP/1.1 500 Web Socket Protocol Handshake\r\n' +
'Upgrade: WebSocket\r\n' +
'Connection: Upgrade\r\n' +
'\r\n',
);
socket.destroy();
}
};
@@ -145,7 +152,6 @@ export async function createRouter(
router.use(upgradeMiddleware);
router.get('/health', (_, response) => {
logger.info('PONG!');
response.json({ status: 'ok' });
});
+6 -5
View File
@@ -7271,6 +7271,7 @@ __metadata:
"@backstage/types": "workspace:^"
"@types/express": "*"
"@types/supertest": ^2.0.8
"@types/ws": ^8.5.10
express: ^4.17.1
express-promise-router: ^4.1.0
http-proxy-middleware: ^2.0.0
@@ -7279,7 +7280,7 @@ __metadata:
supertest: ^6.2.4
uuid: ^9.0.0
winston: ^3.2.1
ws: ^8.14.2
ws: ^8.17.0
yn: ^4.0.0
languageName: unknown
linkType: soft
@@ -42139,9 +42140,9 @@ __metadata:
languageName: node
linkType: hard
"ws@npm:*, ws@npm:^8.11.0, ws@npm:^8.12.0, ws@npm:^8.13.0, ws@npm:^8.14.2, ws@npm:^8.16.0, ws@npm:^8.8.0":
version: 8.16.0
resolution: "ws@npm:8.16.0"
"ws@npm:*, ws@npm:^8.11.0, ws@npm:^8.12.0, ws@npm:^8.13.0, ws@npm:^8.14.2, ws@npm:^8.16.0, ws@npm:^8.17.0, ws@npm:^8.8.0":
version: 8.17.0
resolution: "ws@npm:8.17.0"
peerDependencies:
bufferutil: ^4.0.1
utf-8-validate: ">=5.0.2"
@@ -42150,7 +42151,7 @@ __metadata:
optional: true
utf-8-validate:
optional: true
checksum: feb3eecd2bae82fa8a8beef800290ce437d8b8063bdc69712725f21aef77c49cb2ff45c6e5e7fce622248f9c7abaee506bae0a9064067ffd6935460c7357321b
checksum: 147ef9eab0251364e1d2c55338ad0efb15e6913923ccbfdf20f7a8a6cb8f88432bcd7f4d8f66977135bfad35575644f9983201c1a361019594a4e53977bf6d4e
languageName: node
linkType: hard