events-node: fix event bus poll duplication
Signed-off-by: Patrik Oldsberg <poldsberg@gmail.com>
This commit is contained in:
@@ -0,0 +1,5 @@
|
||||
---
|
||||
'@backstage/plugin-events-node': patch
|
||||
---
|
||||
|
||||
Fixed an issue where the event bus polling would duplicate and increase exponentially over time.
|
||||
@@ -160,6 +160,94 @@ describe('DefaultEventsService', () => {
|
||||
await (service as any).shutdown();
|
||||
});
|
||||
|
||||
it('should wait an poll on timeout', async () => {
|
||||
const logger = mockServices.logger.mock();
|
||||
const service = DefaultEventsService.create({ logger }).forPlugin('a', {
|
||||
auth: mockServices.auth(),
|
||||
logger,
|
||||
discovery: mockServices.discovery(),
|
||||
lifecycle: mockServices.lifecycle.mock(),
|
||||
});
|
||||
|
||||
let callCount = 0;
|
||||
|
||||
let blockingController: ReadableStreamDefaultController;
|
||||
const blockingStream = new ReadableStream({
|
||||
start(controller) {
|
||||
blockingController = controller;
|
||||
},
|
||||
});
|
||||
|
||||
mswServer.use(
|
||||
rest.put(
|
||||
'http://localhost:0/api/events/bus/v1/subscriptions/a.tester',
|
||||
(_req, res, ctx) => res(ctx.status(200)),
|
||||
),
|
||||
// The first and third calls result in a blocking 202 that is resolved after 100ms
|
||||
// The second and fourth calls result in a 200 with an event
|
||||
// The fifth call blocks until the end of the test
|
||||
// No more than 5 calls should be made
|
||||
rest.get(
|
||||
'http://localhost:0/api/events/bus/v1/subscriptions/a.tester/events',
|
||||
(_req, res, ctx) => {
|
||||
callCount += 1;
|
||||
if (callCount === 1 || callCount === 3) {
|
||||
return res(
|
||||
ctx.status(202),
|
||||
ctx.body(
|
||||
new ReadableStream({
|
||||
start(controller) {
|
||||
setTimeout(() => controller.close(), 100);
|
||||
},
|
||||
}),
|
||||
),
|
||||
);
|
||||
} else if (callCount === 2 || callCount === 4) {
|
||||
return res(
|
||||
ctx.status(200),
|
||||
ctx.json({
|
||||
events: [{ topic: 'test', payload: { callCount } }],
|
||||
}),
|
||||
);
|
||||
} else if (callCount === 5) {
|
||||
return res(ctx.status(202), ctx.body(blockingStream));
|
||||
}
|
||||
throw new Error(`events endpoint called too many times`);
|
||||
},
|
||||
),
|
||||
);
|
||||
|
||||
const event = await new Promise(resolve => {
|
||||
const events = new Array<EventParams>();
|
||||
service.subscribe({
|
||||
id: 'tester',
|
||||
topics: ['test'],
|
||||
async onEvent(newEvent) {
|
||||
events.push(newEvent);
|
||||
if (events.length === 2) {
|
||||
resolve(events);
|
||||
}
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
expect(event).toEqual([
|
||||
{ topic: 'test', eventPayload: { callCount: 2 } },
|
||||
{ topic: 'test', eventPayload: { callCount: 4 } },
|
||||
]);
|
||||
|
||||
// Wait to make sure no additional calls happen
|
||||
await new Promise(resolve => setTimeout(resolve, 100));
|
||||
|
||||
expect(callCount).toBe(5);
|
||||
|
||||
// Internal call to clean up subscriptions
|
||||
await (service as any).shutdown();
|
||||
|
||||
// Close the stream for the 5th call so that we don't leave the request hanging
|
||||
blockingController!.close();
|
||||
});
|
||||
|
||||
it('should not read events from bus if disabled', async () => {
|
||||
const logger = mockServices.logger.mock();
|
||||
const service = DefaultEventsService.create({
|
||||
|
||||
@@ -207,28 +207,14 @@ class PluginEventsService implements EventsService {
|
||||
{ token },
|
||||
);
|
||||
|
||||
if (!res.ok) {
|
||||
if (res.status === 404) {
|
||||
this.logger.info(
|
||||
`Polling event subscription resulted in a 404, recreating subscription`,
|
||||
);
|
||||
hasSubscription = false;
|
||||
} else {
|
||||
throw await ResponseError.fromResponse(res);
|
||||
}
|
||||
}
|
||||
|
||||
// Successful response, reset backoff
|
||||
backoffMs = POLL_BACKOFF_START_MS;
|
||||
|
||||
// 202 means there were no immediately available events, but the
|
||||
// response will block until either new events are available or the
|
||||
// request times out. In both cases we should should try to read events
|
||||
// immediately again
|
||||
if (res.status === 202) {
|
||||
// 202 means there were no immediately available events, but the
|
||||
// response will block until either new events are available or the
|
||||
// request times out. In both cases we should should try to read events
|
||||
// immediately again
|
||||
|
||||
lock.release();
|
||||
await res.body?.getReader()?.closed;
|
||||
process.nextTick(poll);
|
||||
} else if (res.status === 200) {
|
||||
const data = await res.json();
|
||||
if (data) {
|
||||
@@ -245,10 +231,15 @@ class PluginEventsService implements EventsService {
|
||||
);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
this.logger.warn(
|
||||
`Unexpected response status ${res.status} from events backend for subscription "${subscriptionId}"`,
|
||||
}
|
||||
} else {
|
||||
if (res.status === 404) {
|
||||
this.logger.info(
|
||||
`Polling event subscription resulted in a 404, recreating subscription`,
|
||||
);
|
||||
hasSubscription = false;
|
||||
} else {
|
||||
throw await ResponseError.fromResponse(res);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -276,6 +267,9 @@ class PluginEventsService implements EventsService {
|
||||
}
|
||||
}
|
||||
|
||||
// No errors, reset backoff
|
||||
backoffMs = POLL_BACKOFF_START_MS;
|
||||
|
||||
process.nextTick(poll);
|
||||
} catch (error) {
|
||||
this.logger.warn(
|
||||
|
||||
Reference in New Issue
Block a user