feat(events): allow setting a timeout for event bus polling
Signed-off-by: Phil Kuang <pkuang@factset.com>
This commit is contained in:
@@ -0,0 +1,11 @@
|
||||
---
|
||||
'@backstage/plugin-events-backend': patch
|
||||
'@backstage/plugin-events-node': patch
|
||||
---
|
||||
|
||||
Allow configuring a timeout for event bus polling requests. This can be set like so in your app-config:
|
||||
|
||||
```yaml
|
||||
events:
|
||||
notifyTimeoutMs: 30000
|
||||
```
|
||||
Vendored
+6
@@ -16,6 +16,12 @@
|
||||
|
||||
export interface Config {
|
||||
events?: {
|
||||
/**
|
||||
* Timeout in milliseconds for how long to wait before closing subscription events
|
||||
* requests to ensure they don't stall or that events get stuck. Defaults to 55 seconds.
|
||||
*/
|
||||
notifyTimeoutMs?: number;
|
||||
|
||||
http?: {
|
||||
/**
|
||||
* Topics for which a route has to be registered
|
||||
|
||||
@@ -113,6 +113,10 @@ export const eventsPlugin = createBackendPlugin({
|
||||
// that is used there as part of the middleware stack.
|
||||
httpRouter.use(eventsRouter);
|
||||
|
||||
const notifyTimeoutMs = config.getOptionalNumber(
|
||||
'events.notifyTimeoutMs',
|
||||
);
|
||||
|
||||
httpRouter.use(
|
||||
await createEventBusRouter({
|
||||
database,
|
||||
@@ -120,6 +124,7 @@ export const eventsPlugin = createBackendPlugin({
|
||||
logger,
|
||||
httpAuth,
|
||||
scheduler,
|
||||
notifyTimeoutMs,
|
||||
}),
|
||||
);
|
||||
|
||||
|
||||
@@ -26,7 +26,10 @@ import { createOpenApiRouter } from '../../schema/openapi.generated';
|
||||
import { MemoryEventBusStore } from './MemoryEventBusStore';
|
||||
import { DatabaseEventBusStore } from './DatabaseEventBusStore';
|
||||
import { EventBusStore } from './types';
|
||||
import { EventParams } from '@backstage/plugin-events-node';
|
||||
import {
|
||||
EVENTS_NOTIFY_TIMEOUT_HEADER,
|
||||
EventParams,
|
||||
} from '@backstage/plugin-events-node';
|
||||
|
||||
const DEFAULT_NOTIFY_TIMEOUT_MS = 55_000; // Just below 60s, which is a common HTTP timeout
|
||||
|
||||
@@ -229,6 +232,10 @@ export async function createEventBusRouter(options: {
|
||||
})),
|
||||
});
|
||||
} else {
|
||||
res.setHeader(
|
||||
EVENTS_NOTIFY_TIMEOUT_HEADER,
|
||||
notifyTimeoutMs.toString(),
|
||||
);
|
||||
res.status(202);
|
||||
res.flushHeaders();
|
||||
|
||||
|
||||
@@ -74,6 +74,9 @@ export abstract class EventRouter {
|
||||
subscribe(): Promise<void>;
|
||||
}
|
||||
|
||||
// @public (undocumented)
|
||||
export const EVENTS_NOTIFY_TIMEOUT_HEADER = 'backstage-events-notify-timeout';
|
||||
|
||||
// @public
|
||||
export interface EventsService {
|
||||
publish(params: EventParams): Promise<void>;
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
import { DefaultEventsService } from './DefaultEventsService';
|
||||
import { EventParams } from './EventParams';
|
||||
import { EVENTS_NOTIFY_TIMEOUT_HEADER } from './EventsService';
|
||||
import { rest } from 'msw';
|
||||
import { setupServer } from 'msw/node';
|
||||
import {
|
||||
@@ -248,6 +249,101 @@ describe('DefaultEventsService', () => {
|
||||
blockingController!.close();
|
||||
});
|
||||
|
||||
it('should timeout polling if the response has a timeout header', async () => {
|
||||
const logger = mockServices.logger.mock();
|
||||
const service = DefaultEventsService.create({ logger }).forPlugin('a', {
|
||||
auth: mockServices.auth(),
|
||||
logger,
|
||||
discovery: mockServices.discovery(),
|
||||
lifecycle: mockServices.lifecycle.mock(),
|
||||
});
|
||||
|
||||
let callCount = 0;
|
||||
|
||||
let blockingController: ReadableStreamDefaultController;
|
||||
const blockingStream = new ReadableStream({
|
||||
start(controller) {
|
||||
blockingController = controller;
|
||||
},
|
||||
});
|
||||
|
||||
mswServer.use(
|
||||
rest.put(
|
||||
'http://localhost:0/api/events/bus/v1/subscriptions/a.tester',
|
||||
(_req, res, ctx) => res(ctx.status(200)),
|
||||
),
|
||||
// The first and third calls result in a blocking 202 that is resolved after 100ms
|
||||
// The second and fourth calls result in a 200 with an event
|
||||
// The fifth call blocks until the end of the test
|
||||
// No more than 5 calls should be made
|
||||
rest.get(
|
||||
'http://localhost:0/api/events/bus/v1/subscriptions/a.tester/events',
|
||||
(_req, res, ctx) => {
|
||||
callCount += 1;
|
||||
if (callCount === 1 || callCount === 3) {
|
||||
return res(
|
||||
ctx.status(202),
|
||||
ctx.body(
|
||||
new ReadableStream({
|
||||
start(controller) {
|
||||
setTimeout(() => controller.close(), 100);
|
||||
},
|
||||
}),
|
||||
),
|
||||
);
|
||||
} else if (callCount === 2 || callCount === 4) {
|
||||
return res(
|
||||
ctx.status(200),
|
||||
ctx.json({
|
||||
events: [{ topic: 'test', payload: { callCount } }],
|
||||
}),
|
||||
);
|
||||
} else if (callCount === 5) {
|
||||
// 5th call has a timeout header so polling should proceed to the next call
|
||||
return res(
|
||||
ctx.set(EVENTS_NOTIFY_TIMEOUT_HEADER, '100'),
|
||||
ctx.status(202),
|
||||
ctx.body(blockingStream),
|
||||
);
|
||||
} else if (callCount === 6) {
|
||||
return res(ctx.status(202), ctx.body(blockingStream));
|
||||
}
|
||||
throw new Error(`events endpoint called too many times`);
|
||||
},
|
||||
),
|
||||
);
|
||||
|
||||
const event = await new Promise(resolve => {
|
||||
const events = new Array<EventParams>();
|
||||
service.subscribe({
|
||||
id: 'tester',
|
||||
topics: ['test'],
|
||||
async onEvent(newEvent) {
|
||||
events.push(newEvent);
|
||||
if (events.length === 2) {
|
||||
resolve(events);
|
||||
}
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
expect(event).toEqual([
|
||||
{ topic: 'test', eventPayload: { callCount: 2 } },
|
||||
{ topic: 'test', eventPayload: { callCount: 4 } },
|
||||
]);
|
||||
|
||||
// Wait to allow timeout to trigger and make sure no additional calls happen
|
||||
await new Promise(resolve => setTimeout(resolve, 2000));
|
||||
|
||||
expect(callCount).toBe(6);
|
||||
|
||||
// Internal call to clean up subscriptions
|
||||
await (service as any).shutdown();
|
||||
|
||||
// Close the stream for the 5th call so that we don't leave the request hanging
|
||||
blockingController!.close();
|
||||
});
|
||||
|
||||
it('should not read events from bus if disabled', async () => {
|
||||
const logger = mockServices.logger.mock();
|
||||
const service = DefaultEventsService.create({
|
||||
|
||||
@@ -22,7 +22,11 @@ import {
|
||||
RootConfigService,
|
||||
} from '@backstage/backend-plugin-api';
|
||||
import { EventParams } from './EventParams';
|
||||
import { EventsService, EventsServiceSubscribeOptions } from './EventsService';
|
||||
import {
|
||||
EVENTS_NOTIFY_TIMEOUT_HEADER,
|
||||
EventsService,
|
||||
EventsServiceSubscribeOptions,
|
||||
} from './EventsService';
|
||||
import { DefaultApiClient } from '../generated';
|
||||
import { ResponseError } from '@backstage/errors';
|
||||
|
||||
@@ -214,10 +218,28 @@ class PluginEventsService implements EventsService {
|
||||
// immediately again
|
||||
|
||||
lock.release();
|
||||
// We don't actually expect any response body here, but waiting for
|
||||
// an empty body to be returned has been more reliable that waiting
|
||||
// for the response body stream to close.
|
||||
await res.text();
|
||||
|
||||
const notifyTimeoutHeader = res.headers.get(
|
||||
EVENTS_NOTIFY_TIMEOUT_HEADER,
|
||||
);
|
||||
|
||||
// Add 1s to the timeout to allow the server to potentially timeout first
|
||||
const notifyTimeoutMs =
|
||||
notifyTimeoutHeader && !isNaN(parseInt(notifyTimeoutHeader, 10))
|
||||
? Number(notifyTimeoutHeader) + 1_000
|
||||
: null;
|
||||
|
||||
await Promise.race(
|
||||
[
|
||||
// We don't actually expect any response body here, but waiting for
|
||||
// an empty body to be returned has been more reliable that waiting
|
||||
// for the response body stream to close.
|
||||
res.text(),
|
||||
notifyTimeoutMs
|
||||
? new Promise(resolve => setTimeout(resolve, notifyTimeoutMs))
|
||||
: null,
|
||||
].filter(Boolean),
|
||||
);
|
||||
} else if (res.status === 200) {
|
||||
const data = await res.json();
|
||||
if (data) {
|
||||
|
||||
@@ -55,3 +55,8 @@ export type EventsServiceSubscribeOptions = {
|
||||
* @public
|
||||
*/
|
||||
export type EventsServiceEventHandler = (params: EventParams) => Promise<void>;
|
||||
|
||||
/**
|
||||
* @public
|
||||
*/
|
||||
export const EVENTS_NOTIFY_TIMEOUT_HEADER = 'backstage-events-notify-timeout';
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
export type { EventParams } from './EventParams';
|
||||
export { EventRouter } from './EventRouter';
|
||||
export { EVENTS_NOTIFY_TIMEOUT_HEADER } from './EventsService';
|
||||
export type {
|
||||
EventsService,
|
||||
EventsServiceSubscribeOptions,
|
||||
|
||||
Reference in New Issue
Block a user