fix: signal disconnect loop on server start

this fixes the disconnect loop when the server has just started by
waiting for the connection to open before subsribing to events.

also includes other improvements:
- rename types to interfaces
- allow to input single receiver for signal
- use discovery api to check for upgrade path instead hard coded one

Signed-off-by: Heikki Hellgren <heikki.hellgren@op.fi>
This commit is contained in:
Heikki Hellgren
2024-01-26 09:31:06 +02:00
parent 9a8d556661
commit 447d21045b
17 changed files with 89 additions and 46 deletions
+8
View File
@@ -0,0 +1,8 @@
---
'@backstage/plugin-signals-backend': patch
'@backstage/plugin-signals-react': patch
'@backstage/plugin-signals-node': patch
'@backstage/plugin-signals': patch
---
Fix to disconnect loop on server start
+1
View File
@@ -24,5 +24,6 @@ export default async function createPlugin(
logger: env.logger,
eventBroker: env.eventBroker,
identity: env.identity,
discovery: env.discovery,
});
}
+1
View File
@@ -22,6 +22,7 @@ export default async function createPlugin(
logger: env.logger,
eventBroker: env.eventBroker,
identity: env.identity,
discovery: env.discovery,
});
}
```
+3
View File
@@ -8,12 +8,15 @@ import { EventBroker } from '@backstage/plugin-events-node';
import express from 'express';
import { IdentityApi } from '@backstage/plugin-auth-node';
import { LoggerService } from '@backstage/backend-plugin-api';
import { PluginEndpointDiscovery } from '@backstage/backend-common';
// @public (undocumented)
export function createRouter(options: RouterOptions): Promise<express.Router>;
// @public (undocumented)
export interface RouterOptions {
// (undocumented)
discovery: PluginEndpointDiscovery;
// (undocumented)
eventBroker?: EventBroker;
// (undocumented)
+3 -1
View File
@@ -32,14 +32,16 @@ export const signalsPlugin = createBackendPlugin({
httpRouter: coreServices.httpRouter,
logger: coreServices.logger,
identity: coreServices.identity,
discovery: coreServices.discovery,
// TODO: EventBroker. It is optional for now but it's actually required so waiting for the new backend system
// for the events-backend for this to work.
},
async init({ httpRouter, logger, identity }) {
async init({ httpRouter, logger, identity, discovery }) {
httpRouter.use(
await createRouter({
logger,
identity,
discovery,
}),
);
},
@@ -89,7 +89,7 @@ describe('SignalManager', () => {
await onEvent({
topic: 'signals',
eventPayload: {
recipients: null,
receivers: null,
channel: 'test',
message: { msg: 'test' },
},
@@ -109,7 +109,7 @@ describe('SignalManager', () => {
await onEvent({
topic: 'signals',
eventPayload: {
recipients: null,
receivers: null,
channel: 'test',
message: { msg: 'test' },
},
@@ -162,7 +162,7 @@ describe('SignalManager', () => {
await onEvent({
topic: 'signals',
eventPayload: {
recipients: 'user:default/john.doe',
receivers: 'user:default/john.doe',
channel: 'test',
message: { msg: 'test' },
},
@@ -71,7 +71,9 @@ export class SignalManager {
id,
user: identity?.identity.userEntityRef ?? 'user:default/guest',
ws,
ownershipEntityRefs: identity?.identity.ownershipEntityRefs ?? [],
ownershipEntityRefs: identity?.identity.ownershipEntityRefs ?? [
'user:default/guest',
],
subscriptions: new Set<string>(),
};
@@ -130,8 +132,12 @@ export class SignalManager {
return;
}
const { channel, recipients, message } = eventPayload;
const { channel, receivers, message } = eventPayload;
const jsonMessage = JSON.stringify({ channel, message });
let users: string[] = [];
if (receivers !== null) {
users = Array.isArray(receivers) ? receivers : [receivers];
}
// Actual websocket message sending
this.connections.forEach(conn => {
@@ -140,10 +146,8 @@ export class SignalManager {
}
// Sending to all users can be done with null
if (
recipients !== null &&
!conn.ownershipEntityRefs.some((ref: string) =>
recipients.includes(ref),
)
receivers !== null &&
!conn.ownershipEntityRefs.some((ref: string) => users.includes(ref))
) {
return;
}
@@ -13,7 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { getVoidLogger } from '@backstage/backend-common';
import {
getVoidLogger,
PluginEndpointDiscovery,
} from '@backstage/backend-common';
import express from 'express';
import request from 'supertest';
@@ -30,6 +33,11 @@ const identityApiMock: jest.Mocked<IdentityApi> = {
getIdentity: jest.fn(),
};
const discovery: jest.Mocked<PluginEndpointDiscovery> = {
getBaseUrl: jest.fn().mockResolvedValue('/api/signals'),
getExternalBaseUrl: jest.fn(),
};
describe('createRouter', () => {
let app: express.Express;
@@ -38,6 +46,7 @@ describe('createRouter', () => {
logger: getVoidLogger(),
identity: identityApiMock,
eventBroker: eventBrokerMock,
discovery,
});
app = express().use(router);
});
@@ -13,7 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { errorHandler } from '@backstage/backend-common';
import {
errorHandler,
PluginEndpointDiscovery,
} from '@backstage/backend-common';
import express, { NextFunction, Request, Response } from 'express';
import Router from 'express-promise-router';
import { LoggerService } from '@backstage/backend-plugin-api';
@@ -33,13 +36,14 @@ export interface RouterOptions {
logger: LoggerService;
eventBroker?: EventBroker;
identity: IdentityApi;
discovery: PluginEndpointDiscovery;
}
/** @public */
export async function createRouter(
options: RouterOptions,
): Promise<express.Router> {
const { logger, identity } = options;
const { logger, identity, discovery } = options;
const manager = SignalManager.create(options);
let subscribedToUpgradeRequests = false;
@@ -66,9 +70,9 @@ export async function createRouter(
}
subscribedToUpgradeRequests = true;
const apiUrl = await discovery.getBaseUrl('signals');
server.on('upgrade', async (request, socket, head) => {
// TODO: Find a way to make this more generic
if (request.url !== '/api/signals') {
if (!request.url || !apiUrl.endsWith(request.url)) {
return;
}
@@ -68,6 +68,7 @@ export async function startStandaloneServer(
logger,
identity,
eventBroker,
discovery,
});
let service = createServiceBuilder(module)
@@ -83,7 +84,7 @@ export async function startStandaloneServer(
setInterval(() => {
signals.publish({
recipients: null,
receivers: null,
channel: 'test',
message: { hello: 'world' },
});
+3 -3
View File
@@ -16,15 +16,15 @@ export class DefaultSignalService implements SignalService {
// @public (undocumented)
export type SignalPayload = {
recipients: string[] | null;
receivers: string[] | string | null;
channel: string;
message: JsonObject;
};
// @public (undocumented)
export type SignalService = {
export interface SignalService {
publish(signal: SignalPayload): Promise<void>;
};
}
// @public (undocumented)
export const signalService: ServiceRef<SignalService, 'plugin'>;
@@ -37,11 +37,11 @@ export class DefaultSignalService implements SignalService {
* @param message - message to publish
*/
async publish(signal: SignalPayload) {
const { recipients, channel, message } = signal;
const { receivers, channel, message } = signal;
await this.eventBroker?.publish({
topic: 'signals',
eventPayload: {
recipients,
receivers,
message,
channel,
},
+2 -2
View File
@@ -16,9 +16,9 @@
import { SignalPayload } from './types';
/** @public */
export type SignalService = {
export interface SignalService {
/**
* Publishes a message to user refs to specific topic
*/
publish(signal: SignalPayload): Promise<void>;
};
}
+1 -1
View File
@@ -25,7 +25,7 @@ export type SignalServiceOptions = {
/** @public */
export type SignalPayload = {
recipients: string[] | null;
receivers: string[] | string | null;
channel: string;
message: JsonObject;
};
+10 -5
View File
@@ -7,18 +7,23 @@ import { ApiRef } from '@backstage/core-plugin-api';
import { JsonObject } from '@backstage/types';
// @public (undocumented)
export type SignalApi = {
export interface SignalApi {
// (undocumented)
subscribe(
channel: string,
onMessage: (message: JsonObject) => void,
): {
unsubscribe: () => void;
};
};
): SignalSubscriber;
}
// @public (undocumented)
export const signalApiRef: ApiRef<SignalApi>;
// @public (undocumented)
export interface SignalSubscriber {
// (undocumented)
unsubscribe(): void;
}
// @public (undocumented)
export const useSignal: (channel: string) => {
lastSignal: JsonObject | null;
+8 -3
View File
@@ -22,9 +22,14 @@ export const signalApiRef = createApiRef<SignalApi>({
});
/** @public */
export type SignalApi = {
export interface SignalSubscriber {
unsubscribe(): void;
}
/** @public */
export interface SignalApi {
subscribe(
channel: string,
onMessage: (message: JsonObject) => void,
): { unsubscribe: () => void };
};
): SignalSubscriber;
}
+14 -14
View File
@@ -146,20 +146,6 @@ export class SignalClient implements SignalApi {
url.protocol = url.protocol === 'http:' ? 'ws:' : 'wss:';
this.ws = new WebSocket(url.toString(), token);
this.ws.onmessage = (data: MessageEvent) => {
this.handleMessage(data);
};
this.ws.onerror = () => {
this.reconnect();
};
this.ws.onclose = (ev: CloseEvent) => {
if (ev.code !== WS_CLOSE_NORMAL && ev.code !== WS_CLOSE_GOING_AWAY) {
this.reconnect();
}
};
// Wait until connection is open
let connectSleep = 0;
while (
@@ -174,6 +160,20 @@ export class SignalClient implements SignalApi {
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
throw new Error('Connect timeout');
}
this.ws.onmessage = (data: MessageEvent) => {
this.handleMessage(data);
};
this.ws.onerror = () => {
this.reconnect();
};
this.ws.onclose = (ev: CloseEvent) => {
if (ev.code !== WS_CLOSE_NORMAL && ev.code !== WS_CLOSE_GOING_AWAY) {
this.reconnect();
}
};
}
private handleMessage(data: MessageEvent) {