events-node: fix event bus poll duplication

Signed-off-by: Patrik Oldsberg <poldsberg@gmail.com>
This commit is contained in:
Patrik Oldsberg
2024-10-31 10:46:50 +01:00
parent b8683b83c1
commit 0b57aa1495
3 changed files with 109 additions and 22 deletions
+5
View File
@@ -0,0 +1,5 @@
---
'@backstage/plugin-events-node': patch
---
Fixed an issue where the event bus polling would duplicate and increase exponentially over time.
@@ -160,6 +160,94 @@ describe('DefaultEventsService', () => {
await (service as any).shutdown();
});
it('should wait an poll on timeout', async () => {
const logger = mockServices.logger.mock();
const service = DefaultEventsService.create({ logger }).forPlugin('a', {
auth: mockServices.auth(),
logger,
discovery: mockServices.discovery(),
lifecycle: mockServices.lifecycle.mock(),
});
let callCount = 0;
let blockingController: ReadableStreamDefaultController;
const blockingStream = new ReadableStream({
start(controller) {
blockingController = controller;
},
});
mswServer.use(
rest.put(
'http://localhost:0/api/events/bus/v1/subscriptions/a.tester',
(_req, res, ctx) => res(ctx.status(200)),
),
// The first and third calls result in a blocking 202 that is resolved after 100ms
// The second and fourth calls result in a 200 with an event
// The fifth call blocks until the end of the test
// No more than 5 calls should be made
rest.get(
'http://localhost:0/api/events/bus/v1/subscriptions/a.tester/events',
(_req, res, ctx) => {
callCount += 1;
if (callCount === 1 || callCount === 3) {
return res(
ctx.status(202),
ctx.body(
new ReadableStream({
start(controller) {
setTimeout(() => controller.close(), 100);
},
}),
),
);
} else if (callCount === 2 || callCount === 4) {
return res(
ctx.status(200),
ctx.json({
events: [{ topic: 'test', payload: { callCount } }],
}),
);
} else if (callCount === 5) {
return res(ctx.status(202), ctx.body(blockingStream));
}
throw new Error(`events endpoint called too many times`);
},
),
);
const event = await new Promise(resolve => {
const events = new Array<EventParams>();
service.subscribe({
id: 'tester',
topics: ['test'],
async onEvent(newEvent) {
events.push(newEvent);
if (events.length === 2) {
resolve(events);
}
},
});
});
expect(event).toEqual([
{ topic: 'test', eventPayload: { callCount: 2 } },
{ topic: 'test', eventPayload: { callCount: 4 } },
]);
// Wait to make sure no additional calls happen
await new Promise(resolve => setTimeout(resolve, 100));
expect(callCount).toBe(5);
// Internal call to clean up subscriptions
await (service as any).shutdown();
// Close the stream for the 5th call so that we don't leave the request hanging
blockingController!.close();
});
it('should not read events from bus if disabled', async () => {
const logger = mockServices.logger.mock();
const service = DefaultEventsService.create({
@@ -207,28 +207,14 @@ class PluginEventsService implements EventsService {
{ token },
);
if (!res.ok) {
if (res.status === 404) {
this.logger.info(
`Polling event subscription resulted in a 404, recreating subscription`,
);
hasSubscription = false;
} else {
throw await ResponseError.fromResponse(res);
}
}
// Successful response, reset backoff
backoffMs = POLL_BACKOFF_START_MS;
// 202 means there were no immediately available events, but the
// response will block until either new events are available or the
// request times out. In both cases we should should try to read events
// immediately again
if (res.status === 202) {
// 202 means there were no immediately available events, but the
// response will block until either new events are available or the
// request times out. In both cases we should should try to read events
// immediately again
lock.release();
await res.body?.getReader()?.closed;
process.nextTick(poll);
} else if (res.status === 200) {
const data = await res.json();
if (data) {
@@ -245,10 +231,15 @@ class PluginEventsService implements EventsService {
);
}
}
} else {
this.logger.warn(
`Unexpected response status ${res.status} from events backend for subscription "${subscriptionId}"`,
}
} else {
if (res.status === 404) {
this.logger.info(
`Polling event subscription resulted in a 404, recreating subscription`,
);
hasSubscription = false;
} else {
throw await ResponseError.fromResponse(res);
}
}
}
@@ -276,6 +267,9 @@ class PluginEventsService implements EventsService {
}
}
// No errors, reset backoff
backoffMs = POLL_BACKOFF_START_MS;
process.nextTick(poll);
} catch (error) {
this.logger.warn(