feat(events): allow setting a timeout for event bus polling

Signed-off-by: Phil Kuang <pkuang@factset.com>
This commit is contained in:
Phil Kuang
2024-10-23 21:30:07 -04:00
parent b2070f3eee
commit 1577511333
9 changed files with 162 additions and 6 deletions
+11
View File
@@ -0,0 +1,11 @@
---
'@backstage/plugin-events-backend': patch
'@backstage/plugin-events-node': patch
---
Allow configuring a timeout for event bus polling requests. This can be set like so in your app-config:
```yaml
events:
notifyTimeoutMs: 30000
```
+6
View File
@@ -16,6 +16,12 @@
export interface Config {
events?: {
/**
* Timeout in milliseconds for how long to wait before closing subscription events
* requests to ensure they don't stall or that events get stuck. Defaults to 55 seconds.
*/
notifyTimeoutMs?: number;
http?: {
/**
* Topics for which a route has to be registered
@@ -113,6 +113,10 @@ export const eventsPlugin = createBackendPlugin({
// that is used there as part of the middleware stack.
httpRouter.use(eventsRouter);
const notifyTimeoutMs = config.getOptionalNumber(
'events.notifyTimeoutMs',
);
httpRouter.use(
await createEventBusRouter({
database,
@@ -120,6 +124,7 @@ export const eventsPlugin = createBackendPlugin({
logger,
httpAuth,
scheduler,
notifyTimeoutMs,
}),
);
@@ -26,7 +26,10 @@ import { createOpenApiRouter } from '../../schema/openapi.generated';
import { MemoryEventBusStore } from './MemoryEventBusStore';
import { DatabaseEventBusStore } from './DatabaseEventBusStore';
import { EventBusStore } from './types';
import { EventParams } from '@backstage/plugin-events-node';
import {
EVENTS_NOTIFY_TIMEOUT_HEADER,
EventParams,
} from '@backstage/plugin-events-node';
const DEFAULT_NOTIFY_TIMEOUT_MS = 55_000; // Just below 60s, which is a common HTTP timeout
@@ -229,6 +232,10 @@ export async function createEventBusRouter(options: {
})),
});
} else {
res.setHeader(
EVENTS_NOTIFY_TIMEOUT_HEADER,
notifyTimeoutMs.toString(),
);
res.status(202);
res.flushHeaders();
+3
View File
@@ -74,6 +74,9 @@ export abstract class EventRouter {
subscribe(): Promise<void>;
}
// @public (undocumented)
export const EVENTS_NOTIFY_TIMEOUT_HEADER = 'backstage-events-notify-timeout';
// @public
export interface EventsService {
publish(params: EventParams): Promise<void>;
@@ -16,6 +16,7 @@
import { DefaultEventsService } from './DefaultEventsService';
import { EventParams } from './EventParams';
import { EVENTS_NOTIFY_TIMEOUT_HEADER } from './EventsService';
import { rest } from 'msw';
import { setupServer } from 'msw/node';
import {
@@ -248,6 +249,101 @@ describe('DefaultEventsService', () => {
blockingController!.close();
});
it('should timeout polling if the response has a timeout header', async () => {
const logger = mockServices.logger.mock();
const service = DefaultEventsService.create({ logger }).forPlugin('a', {
auth: mockServices.auth(),
logger,
discovery: mockServices.discovery(),
lifecycle: mockServices.lifecycle.mock(),
});
let callCount = 0;
let blockingController: ReadableStreamDefaultController;
const blockingStream = new ReadableStream({
start(controller) {
blockingController = controller;
},
});
mswServer.use(
rest.put(
'http://localhost:0/api/events/bus/v1/subscriptions/a.tester',
(_req, res, ctx) => res(ctx.status(200)),
),
// The first and third calls result in a blocking 202 that is resolved after 100ms
// The second and fourth calls result in a 200 with an event
// The fifth call blocks until the end of the test
// No more than 5 calls should be made
rest.get(
'http://localhost:0/api/events/bus/v1/subscriptions/a.tester/events',
(_req, res, ctx) => {
callCount += 1;
if (callCount === 1 || callCount === 3) {
return res(
ctx.status(202),
ctx.body(
new ReadableStream({
start(controller) {
setTimeout(() => controller.close(), 100);
},
}),
),
);
} else if (callCount === 2 || callCount === 4) {
return res(
ctx.status(200),
ctx.json({
events: [{ topic: 'test', payload: { callCount } }],
}),
);
} else if (callCount === 5) {
// 5th call has a timeout header so polling should proceed to the next call
return res(
ctx.set(EVENTS_NOTIFY_TIMEOUT_HEADER, '100'),
ctx.status(202),
ctx.body(blockingStream),
);
} else if (callCount === 6) {
return res(ctx.status(202), ctx.body(blockingStream));
}
throw new Error(`events endpoint called too many times`);
},
),
);
const event = await new Promise(resolve => {
const events = new Array<EventParams>();
service.subscribe({
id: 'tester',
topics: ['test'],
async onEvent(newEvent) {
events.push(newEvent);
if (events.length === 2) {
resolve(events);
}
},
});
});
expect(event).toEqual([
{ topic: 'test', eventPayload: { callCount: 2 } },
{ topic: 'test', eventPayload: { callCount: 4 } },
]);
// Wait to allow timeout to trigger and make sure no additional calls happen
await new Promise(resolve => setTimeout(resolve, 2000));
expect(callCount).toBe(6);
// Internal call to clean up subscriptions
await (service as any).shutdown();
// Close the stream for the 5th call so that we don't leave the request hanging
blockingController!.close();
});
it('should not read events from bus if disabled', async () => {
const logger = mockServices.logger.mock();
const service = DefaultEventsService.create({
@@ -22,7 +22,11 @@ import {
RootConfigService,
} from '@backstage/backend-plugin-api';
import { EventParams } from './EventParams';
import { EventsService, EventsServiceSubscribeOptions } from './EventsService';
import {
EVENTS_NOTIFY_TIMEOUT_HEADER,
EventsService,
EventsServiceSubscribeOptions,
} from './EventsService';
import { DefaultApiClient } from '../generated';
import { ResponseError } from '@backstage/errors';
@@ -214,10 +218,28 @@ class PluginEventsService implements EventsService {
// immediately again
lock.release();
// We don't actually expect any response body here, but waiting for
// an empty body to be returned has been more reliable that waiting
// for the response body stream to close.
await res.text();
const notifyTimeoutHeader = res.headers.get(
EVENTS_NOTIFY_TIMEOUT_HEADER,
);
// Add 1s to the timeout to allow the server to potentially timeout first
const notifyTimeoutMs =
notifyTimeoutHeader && !isNaN(parseInt(notifyTimeoutHeader, 10))
? Number(notifyTimeoutHeader) + 1_000
: null;
await Promise.race(
[
// We don't actually expect any response body here, but waiting for
// an empty body to be returned has been more reliable that waiting
// for the response body stream to close.
res.text(),
notifyTimeoutMs
? new Promise(resolve => setTimeout(resolve, notifyTimeoutMs))
: null,
].filter(Boolean),
);
} else if (res.status === 200) {
const data = await res.json();
if (data) {
@@ -55,3 +55,8 @@ export type EventsServiceSubscribeOptions = {
* @public
*/
export type EventsServiceEventHandler = (params: EventParams) => Promise<void>;
/**
* @public
*/
export const EVENTS_NOTIFY_TIMEOUT_HEADER = 'backstage-events-notify-timeout';
+1
View File
@@ -16,6 +16,7 @@
export type { EventParams } from './EventParams';
export { EventRouter } from './EventRouter';
export { EVENTS_NOTIFY_TIMEOUT_HEADER } from './EventsService';
export type {
EventsService,
EventsServiceSubscribeOptions,