feat(notifications): support notification send throttling

Signed-off-by: Hellgren Heikki <heikki.hellgren@op.fi>
This commit is contained in:
Hellgren Heikki
2025-03-18 12:16:14 +02:00
parent 37416e06d6
commit 9a6080e677
5 changed files with 127 additions and 56 deletions
+5
View File
@@ -0,0 +1,5 @@
---
'@backstage/plugin-notifications-backend': patch
---
Allow throttling notification sending not to block the system with huge amount of receiving users
+32
View File
@@ -0,0 +1,32 @@
/*
* Copyright 2024 The Backstage Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { HumanDuration } from '@backstage/types';
export interface Config {
/**
* Configuration options for notifications-backend
*/
notifications?: {
/**
* Concurrency limit for notification sending, defaults to 10
*/
concurrencyLimit?: number;
/**
* Throttle duration between notification sending, defaults to 50ms
*/
throttleInterval?: HumanDuration | string;
};
}
+5 -1
View File
@@ -26,6 +26,7 @@
"types": "src/index.ts",
"files": [
"dist",
"config.d.ts",
"migrations/**/*.{js,d.ts}"
],
"scripts": {
@@ -49,9 +50,11 @@
"@backstage/plugin-notifications-common": "workspace:^",
"@backstage/plugin-notifications-node": "workspace:^",
"@backstage/plugin-signals-node": "workspace:^",
"@backstage/types": "workspace:^",
"express": "^4.17.1",
"express-promise-router": "^4.1.0",
"knex": "^3.0.0",
"p-throttle": "^4.1.1",
"uuid": "^11.0.0",
"winston": "^3.2.1",
"yn": "^4.0.0"
@@ -68,5 +71,6 @@
"@types/supertest": "^2.0.8",
"msw": "^1.0.0",
"supertest": "^7.0.0"
}
},
"configSchema": "config.d.ts"
}
@@ -48,7 +48,9 @@ import {
} from '@backstage/plugin-notifications-common';
import { parseEntityOrderFieldParams } from './parseEntityOrderFieldParams';
import { getUsersForEntityRef } from './getUsersForEntityRef';
import { Config } from '@backstage/config';
import { Config, readDurationFromConfig } from '@backstage/config';
import { durationToMilliseconds } from '@backstage/types';
import pThrottle from 'p-throttle';
/** @internal */
export interface RouterOptions {
@@ -82,6 +84,19 @@ export async function createRouter(
const WEB_NOTIFICATION_CHANNEL = 'Web';
const store = await DatabaseNotificationsStore.create({ database });
const frontendBaseUrl = config.getString('app.baseUrl');
const concurrencyLimit =
config.getOptionalNumber('notifications.concurrencyLimit') ?? 10;
const throttleInterval = config.has('notifications.throttleInterval')
? durationToMilliseconds(
readDurationFromConfig(config, {
key: 'notifications.throttleInterval',
}),
)
: 50;
const throttle = pThrottle({
limit: concurrencyLimit,
interval: throttleInterval,
});
const getUser = async (req: Request<unknown>) => {
const credentials = await httpAuth.credentials(req, { allow: ['user'] });
@@ -470,71 +485,84 @@ export async function createRouter(
},
channel: 'notifications',
});
postProcessNotification(ret, opts);
}
postProcessNotification(ret, opts);
return notification;
};
const sendUserNotification = async (
baseNotification: Omit<Notification, 'user' | 'id'>,
user: string,
opts: NotificationSendOptions,
origin: string,
scope?: string,
): Promise<Notification | undefined> => {
const userNotification = {
...baseNotification,
id: uuid(),
user,
};
const notification = await preProcessNotification(userNotification, opts);
const enabled = await isNotificationsEnabled({
user,
channel: WEB_NOTIFICATION_CHANNEL,
origin: userNotification.origin,
});
let ret = notification;
if (!enabled) {
postProcessNotification(ret, opts);
return undefined;
}
let existingNotification;
if (scope) {
existingNotification = await store.getExistingScopeNotification({
user,
scope,
origin,
});
}
if (existingNotification) {
const restored = await store.restoreExistingNotification({
id: existingNotification.id,
notification,
});
ret = restored ?? notification;
} else {
await store.saveNotification(notification);
}
if (signals) {
await signals.publish<NewNotificationSignal>({
recipients: { type: 'user', entityRef: [user] },
message: {
action: 'new_notification',
notification_id: ret.id,
},
channel: 'notifications',
});
}
postProcessNotification(ret, opts);
return ret;
};
const sendUserNotifications = async (
baseNotification: Omit<Notification, 'user' | 'id'>,
users: string[],
opts: NotificationSendOptions,
origin: string,
) => {
const notifications = [];
): Promise<Notification[]> => {
const { scope } = opts.payload;
const uniqueUsers = [...new Set(users)];
for (const user of uniqueUsers) {
const userNotification = {
...baseNotification,
id: uuid(),
user,
};
const notification = await preProcessNotification(userNotification, opts);
const enabled = await isNotificationsEnabled({
user,
channel: WEB_NOTIFICATION_CHANNEL,
origin: userNotification.origin,
});
let ret = notification;
if (enabled) {
let existingNotification;
if (scope) {
existingNotification = await store.getExistingScopeNotification({
user,
scope,
origin,
});
}
if (existingNotification) {
const restored = await store.restoreExistingNotification({
id: existingNotification.id,
notification,
});
ret = restored ?? notification;
} else {
await store.saveNotification(notification);
}
notifications.push(ret);
if (signals) {
await signals.publish<NewNotificationSignal>({
recipients: { type: 'user', entityRef: [user] },
message: {
action: 'new_notification',
notification_id: ret.id,
},
channel: 'notifications',
});
}
}
postProcessNotification(ret, opts);
}
return notifications;
const throttled = throttle((user: string) =>
sendUserNotification(baseNotification, user, opts, origin, scope),
);
const sent = await Promise.all(uniqueUsers.map(user => throttled(user)));
return sent.filter(n => n !== undefined);
};
const createNotificationHandler = async (
+2
View File
@@ -7053,12 +7053,14 @@ __metadata:
"@backstage/plugin-notifications-node": "workspace:^"
"@backstage/plugin-signals-backend": "workspace:^"
"@backstage/plugin-signals-node": "workspace:^"
"@backstage/types": "workspace:^"
"@types/express": ^4.17.6
"@types/supertest": ^2.0.8
express: ^4.17.1
express-promise-router: ^4.1.0
knex: ^3.0.0
msw: ^1.0.0
p-throttle: ^4.1.1
supertest: ^7.0.0
uuid: ^11.0.0
winston: ^3.2.1