fix: signal disconnect loop on server start
this fixes the disconnect loop when the server has just started by waiting for the connection to open before subsribing to events. also includes other improvements: - rename types to interfaces - allow to input single receiver for signal - use discovery api to check for upgrade path instead hard coded one Signed-off-by: Heikki Hellgren <heikki.hellgren@op.fi>
This commit is contained in:
@@ -0,0 +1,8 @@
|
||||
---
|
||||
'@backstage/plugin-signals-backend': patch
|
||||
'@backstage/plugin-signals-react': patch
|
||||
'@backstage/plugin-signals-node': patch
|
||||
'@backstage/plugin-signals': patch
|
||||
---
|
||||
|
||||
Fix to disconnect loop on server start
|
||||
@@ -24,5 +24,6 @@ export default async function createPlugin(
|
||||
logger: env.logger,
|
||||
eventBroker: env.eventBroker,
|
||||
identity: env.identity,
|
||||
discovery: env.discovery,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -22,6 +22,7 @@ export default async function createPlugin(
|
||||
logger: env.logger,
|
||||
eventBroker: env.eventBroker,
|
||||
identity: env.identity,
|
||||
discovery: env.discovery,
|
||||
});
|
||||
}
|
||||
```
|
||||
|
||||
@@ -8,12 +8,15 @@ import { EventBroker } from '@backstage/plugin-events-node';
|
||||
import express from 'express';
|
||||
import { IdentityApi } from '@backstage/plugin-auth-node';
|
||||
import { LoggerService } from '@backstage/backend-plugin-api';
|
||||
import { PluginEndpointDiscovery } from '@backstage/backend-common';
|
||||
|
||||
// @public (undocumented)
|
||||
export function createRouter(options: RouterOptions): Promise<express.Router>;
|
||||
|
||||
// @public (undocumented)
|
||||
export interface RouterOptions {
|
||||
// (undocumented)
|
||||
discovery: PluginEndpointDiscovery;
|
||||
// (undocumented)
|
||||
eventBroker?: EventBroker;
|
||||
// (undocumented)
|
||||
|
||||
@@ -32,14 +32,16 @@ export const signalsPlugin = createBackendPlugin({
|
||||
httpRouter: coreServices.httpRouter,
|
||||
logger: coreServices.logger,
|
||||
identity: coreServices.identity,
|
||||
discovery: coreServices.discovery,
|
||||
// TODO: EventBroker. It is optional for now but it's actually required so waiting for the new backend system
|
||||
// for the events-backend for this to work.
|
||||
},
|
||||
async init({ httpRouter, logger, identity }) {
|
||||
async init({ httpRouter, logger, identity, discovery }) {
|
||||
httpRouter.use(
|
||||
await createRouter({
|
||||
logger,
|
||||
identity,
|
||||
discovery,
|
||||
}),
|
||||
);
|
||||
},
|
||||
|
||||
@@ -89,7 +89,7 @@ describe('SignalManager', () => {
|
||||
await onEvent({
|
||||
topic: 'signals',
|
||||
eventPayload: {
|
||||
recipients: null,
|
||||
receivers: null,
|
||||
channel: 'test',
|
||||
message: { msg: 'test' },
|
||||
},
|
||||
@@ -109,7 +109,7 @@ describe('SignalManager', () => {
|
||||
await onEvent({
|
||||
topic: 'signals',
|
||||
eventPayload: {
|
||||
recipients: null,
|
||||
receivers: null,
|
||||
channel: 'test',
|
||||
message: { msg: 'test' },
|
||||
},
|
||||
@@ -162,7 +162,7 @@ describe('SignalManager', () => {
|
||||
await onEvent({
|
||||
topic: 'signals',
|
||||
eventPayload: {
|
||||
recipients: 'user:default/john.doe',
|
||||
receivers: 'user:default/john.doe',
|
||||
channel: 'test',
|
||||
message: { msg: 'test' },
|
||||
},
|
||||
|
||||
@@ -71,7 +71,9 @@ export class SignalManager {
|
||||
id,
|
||||
user: identity?.identity.userEntityRef ?? 'user:default/guest',
|
||||
ws,
|
||||
ownershipEntityRefs: identity?.identity.ownershipEntityRefs ?? [],
|
||||
ownershipEntityRefs: identity?.identity.ownershipEntityRefs ?? [
|
||||
'user:default/guest',
|
||||
],
|
||||
subscriptions: new Set<string>(),
|
||||
};
|
||||
|
||||
@@ -130,8 +132,12 @@ export class SignalManager {
|
||||
return;
|
||||
}
|
||||
|
||||
const { channel, recipients, message } = eventPayload;
|
||||
const { channel, receivers, message } = eventPayload;
|
||||
const jsonMessage = JSON.stringify({ channel, message });
|
||||
let users: string[] = [];
|
||||
if (receivers !== null) {
|
||||
users = Array.isArray(receivers) ? receivers : [receivers];
|
||||
}
|
||||
|
||||
// Actual websocket message sending
|
||||
this.connections.forEach(conn => {
|
||||
@@ -140,10 +146,8 @@ export class SignalManager {
|
||||
}
|
||||
// Sending to all users can be done with null
|
||||
if (
|
||||
recipients !== null &&
|
||||
!conn.ownershipEntityRefs.some((ref: string) =>
|
||||
recipients.includes(ref),
|
||||
)
|
||||
receivers !== null &&
|
||||
!conn.ownershipEntityRefs.some((ref: string) => users.includes(ref))
|
||||
) {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -13,7 +13,10 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
import { getVoidLogger } from '@backstage/backend-common';
|
||||
import {
|
||||
getVoidLogger,
|
||||
PluginEndpointDiscovery,
|
||||
} from '@backstage/backend-common';
|
||||
import express from 'express';
|
||||
import request from 'supertest';
|
||||
|
||||
@@ -30,6 +33,11 @@ const identityApiMock: jest.Mocked<IdentityApi> = {
|
||||
getIdentity: jest.fn(),
|
||||
};
|
||||
|
||||
const discovery: jest.Mocked<PluginEndpointDiscovery> = {
|
||||
getBaseUrl: jest.fn().mockResolvedValue('/api/signals'),
|
||||
getExternalBaseUrl: jest.fn(),
|
||||
};
|
||||
|
||||
describe('createRouter', () => {
|
||||
let app: express.Express;
|
||||
|
||||
@@ -38,6 +46,7 @@ describe('createRouter', () => {
|
||||
logger: getVoidLogger(),
|
||||
identity: identityApiMock,
|
||||
eventBroker: eventBrokerMock,
|
||||
discovery,
|
||||
});
|
||||
app = express().use(router);
|
||||
});
|
||||
|
||||
@@ -13,7 +13,10 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
import { errorHandler } from '@backstage/backend-common';
|
||||
import {
|
||||
errorHandler,
|
||||
PluginEndpointDiscovery,
|
||||
} from '@backstage/backend-common';
|
||||
import express, { NextFunction, Request, Response } from 'express';
|
||||
import Router from 'express-promise-router';
|
||||
import { LoggerService } from '@backstage/backend-plugin-api';
|
||||
@@ -33,13 +36,14 @@ export interface RouterOptions {
|
||||
logger: LoggerService;
|
||||
eventBroker?: EventBroker;
|
||||
identity: IdentityApi;
|
||||
discovery: PluginEndpointDiscovery;
|
||||
}
|
||||
|
||||
/** @public */
|
||||
export async function createRouter(
|
||||
options: RouterOptions,
|
||||
): Promise<express.Router> {
|
||||
const { logger, identity } = options;
|
||||
const { logger, identity, discovery } = options;
|
||||
const manager = SignalManager.create(options);
|
||||
let subscribedToUpgradeRequests = false;
|
||||
|
||||
@@ -66,9 +70,9 @@ export async function createRouter(
|
||||
}
|
||||
|
||||
subscribedToUpgradeRequests = true;
|
||||
const apiUrl = await discovery.getBaseUrl('signals');
|
||||
server.on('upgrade', async (request, socket, head) => {
|
||||
// TODO: Find a way to make this more generic
|
||||
if (request.url !== '/api/signals') {
|
||||
if (!request.url || !apiUrl.endsWith(request.url)) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
@@ -68,6 +68,7 @@ export async function startStandaloneServer(
|
||||
logger,
|
||||
identity,
|
||||
eventBroker,
|
||||
discovery,
|
||||
});
|
||||
|
||||
let service = createServiceBuilder(module)
|
||||
@@ -83,7 +84,7 @@ export async function startStandaloneServer(
|
||||
|
||||
setInterval(() => {
|
||||
signals.publish({
|
||||
recipients: null,
|
||||
receivers: null,
|
||||
channel: 'test',
|
||||
message: { hello: 'world' },
|
||||
});
|
||||
|
||||
@@ -16,15 +16,15 @@ export class DefaultSignalService implements SignalService {
|
||||
|
||||
// @public (undocumented)
|
||||
export type SignalPayload = {
|
||||
recipients: string[] | null;
|
||||
receivers: string[] | string | null;
|
||||
channel: string;
|
||||
message: JsonObject;
|
||||
};
|
||||
|
||||
// @public (undocumented)
|
||||
export type SignalService = {
|
||||
export interface SignalService {
|
||||
publish(signal: SignalPayload): Promise<void>;
|
||||
};
|
||||
}
|
||||
|
||||
// @public (undocumented)
|
||||
export const signalService: ServiceRef<SignalService, 'plugin'>;
|
||||
|
||||
@@ -37,11 +37,11 @@ export class DefaultSignalService implements SignalService {
|
||||
* @param message - message to publish
|
||||
*/
|
||||
async publish(signal: SignalPayload) {
|
||||
const { recipients, channel, message } = signal;
|
||||
const { receivers, channel, message } = signal;
|
||||
await this.eventBroker?.publish({
|
||||
topic: 'signals',
|
||||
eventPayload: {
|
||||
recipients,
|
||||
receivers,
|
||||
message,
|
||||
channel,
|
||||
},
|
||||
|
||||
@@ -16,9 +16,9 @@
|
||||
import { SignalPayload } from './types';
|
||||
|
||||
/** @public */
|
||||
export type SignalService = {
|
||||
export interface SignalService {
|
||||
/**
|
||||
* Publishes a message to user refs to specific topic
|
||||
*/
|
||||
publish(signal: SignalPayload): Promise<void>;
|
||||
};
|
||||
}
|
||||
|
||||
@@ -25,7 +25,7 @@ export type SignalServiceOptions = {
|
||||
|
||||
/** @public */
|
||||
export type SignalPayload = {
|
||||
recipients: string[] | null;
|
||||
receivers: string[] | string | null;
|
||||
channel: string;
|
||||
message: JsonObject;
|
||||
};
|
||||
|
||||
@@ -7,18 +7,23 @@ import { ApiRef } from '@backstage/core-plugin-api';
|
||||
import { JsonObject } from '@backstage/types';
|
||||
|
||||
// @public (undocumented)
|
||||
export type SignalApi = {
|
||||
export interface SignalApi {
|
||||
// (undocumented)
|
||||
subscribe(
|
||||
channel: string,
|
||||
onMessage: (message: JsonObject) => void,
|
||||
): {
|
||||
unsubscribe: () => void;
|
||||
};
|
||||
};
|
||||
): SignalSubscriber;
|
||||
}
|
||||
|
||||
// @public (undocumented)
|
||||
export const signalApiRef: ApiRef<SignalApi>;
|
||||
|
||||
// @public (undocumented)
|
||||
export interface SignalSubscriber {
|
||||
// (undocumented)
|
||||
unsubscribe(): void;
|
||||
}
|
||||
|
||||
// @public (undocumented)
|
||||
export const useSignal: (channel: string) => {
|
||||
lastSignal: JsonObject | null;
|
||||
|
||||
@@ -22,9 +22,14 @@ export const signalApiRef = createApiRef<SignalApi>({
|
||||
});
|
||||
|
||||
/** @public */
|
||||
export type SignalApi = {
|
||||
export interface SignalSubscriber {
|
||||
unsubscribe(): void;
|
||||
}
|
||||
|
||||
/** @public */
|
||||
export interface SignalApi {
|
||||
subscribe(
|
||||
channel: string,
|
||||
onMessage: (message: JsonObject) => void,
|
||||
): { unsubscribe: () => void };
|
||||
};
|
||||
): SignalSubscriber;
|
||||
}
|
||||
|
||||
@@ -146,20 +146,6 @@ export class SignalClient implements SignalApi {
|
||||
url.protocol = url.protocol === 'http:' ? 'ws:' : 'wss:';
|
||||
this.ws = new WebSocket(url.toString(), token);
|
||||
|
||||
this.ws.onmessage = (data: MessageEvent) => {
|
||||
this.handleMessage(data);
|
||||
};
|
||||
|
||||
this.ws.onerror = () => {
|
||||
this.reconnect();
|
||||
};
|
||||
|
||||
this.ws.onclose = (ev: CloseEvent) => {
|
||||
if (ev.code !== WS_CLOSE_NORMAL && ev.code !== WS_CLOSE_GOING_AWAY) {
|
||||
this.reconnect();
|
||||
}
|
||||
};
|
||||
|
||||
// Wait until connection is open
|
||||
let connectSleep = 0;
|
||||
while (
|
||||
@@ -174,6 +160,20 @@ export class SignalClient implements SignalApi {
|
||||
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
|
||||
throw new Error('Connect timeout');
|
||||
}
|
||||
|
||||
this.ws.onmessage = (data: MessageEvent) => {
|
||||
this.handleMessage(data);
|
||||
};
|
||||
|
||||
this.ws.onerror = () => {
|
||||
this.reconnect();
|
||||
};
|
||||
|
||||
this.ws.onclose = (ev: CloseEvent) => {
|
||||
if (ev.code !== WS_CLOSE_NORMAL && ev.code !== WS_CLOSE_GOING_AWAY) {
|
||||
this.reconnect();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private handleMessage(data: MessageEvent) {
|
||||
|
||||
Reference in New Issue
Block a user