feat(notifications): support notification send throttling
Signed-off-by: Hellgren Heikki <heikki.hellgren@op.fi>
This commit is contained in:
@@ -0,0 +1,5 @@
|
||||
---
|
||||
'@backstage/plugin-notifications-backend': patch
|
||||
---
|
||||
|
||||
Allow throttling notification sending not to block the system with huge amount of receiving users
|
||||
+32
@@ -0,0 +1,32 @@
|
||||
/*
|
||||
* Copyright 2024 The Backstage Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
import { HumanDuration } from '@backstage/types';
|
||||
|
||||
export interface Config {
|
||||
/**
|
||||
* Configuration options for notifications-backend
|
||||
*/
|
||||
notifications?: {
|
||||
/**
|
||||
* Concurrency limit for notification sending, defaults to 10
|
||||
*/
|
||||
concurrencyLimit?: number;
|
||||
/**
|
||||
* Throttle duration between notification sending, defaults to 50ms
|
||||
*/
|
||||
throttleInterval?: HumanDuration | string;
|
||||
};
|
||||
}
|
||||
@@ -26,6 +26,7 @@
|
||||
"types": "src/index.ts",
|
||||
"files": [
|
||||
"dist",
|
||||
"config.d.ts",
|
||||
"migrations/**/*.{js,d.ts}"
|
||||
],
|
||||
"scripts": {
|
||||
@@ -49,9 +50,11 @@
|
||||
"@backstage/plugin-notifications-common": "workspace:^",
|
||||
"@backstage/plugin-notifications-node": "workspace:^",
|
||||
"@backstage/plugin-signals-node": "workspace:^",
|
||||
"@backstage/types": "workspace:^",
|
||||
"express": "^4.17.1",
|
||||
"express-promise-router": "^4.1.0",
|
||||
"knex": "^3.0.0",
|
||||
"p-throttle": "^4.1.1",
|
||||
"uuid": "^11.0.0",
|
||||
"winston": "^3.2.1",
|
||||
"yn": "^4.0.0"
|
||||
@@ -68,5 +71,6 @@
|
||||
"@types/supertest": "^2.0.8",
|
||||
"msw": "^1.0.0",
|
||||
"supertest": "^7.0.0"
|
||||
}
|
||||
},
|
||||
"configSchema": "config.d.ts"
|
||||
}
|
||||
|
||||
@@ -48,7 +48,9 @@ import {
|
||||
} from '@backstage/plugin-notifications-common';
|
||||
import { parseEntityOrderFieldParams } from './parseEntityOrderFieldParams';
|
||||
import { getUsersForEntityRef } from './getUsersForEntityRef';
|
||||
import { Config } from '@backstage/config';
|
||||
import { Config, readDurationFromConfig } from '@backstage/config';
|
||||
import { durationToMilliseconds } from '@backstage/types';
|
||||
import pThrottle from 'p-throttle';
|
||||
|
||||
/** @internal */
|
||||
export interface RouterOptions {
|
||||
@@ -82,6 +84,19 @@ export async function createRouter(
|
||||
const WEB_NOTIFICATION_CHANNEL = 'Web';
|
||||
const store = await DatabaseNotificationsStore.create({ database });
|
||||
const frontendBaseUrl = config.getString('app.baseUrl');
|
||||
const concurrencyLimit =
|
||||
config.getOptionalNumber('notifications.concurrencyLimit') ?? 10;
|
||||
const throttleInterval = config.has('notifications.throttleInterval')
|
||||
? durationToMilliseconds(
|
||||
readDurationFromConfig(config, {
|
||||
key: 'notifications.throttleInterval',
|
||||
}),
|
||||
)
|
||||
: 50;
|
||||
const throttle = pThrottle({
|
||||
limit: concurrencyLimit,
|
||||
interval: throttleInterval,
|
||||
});
|
||||
|
||||
const getUser = async (req: Request<unknown>) => {
|
||||
const credentials = await httpAuth.credentials(req, { allow: ['user'] });
|
||||
@@ -470,71 +485,84 @@ export async function createRouter(
|
||||
},
|
||||
channel: 'notifications',
|
||||
});
|
||||
postProcessNotification(ret, opts);
|
||||
}
|
||||
postProcessNotification(ret, opts);
|
||||
return notification;
|
||||
};
|
||||
|
||||
const sendUserNotification = async (
|
||||
baseNotification: Omit<Notification, 'user' | 'id'>,
|
||||
user: string,
|
||||
opts: NotificationSendOptions,
|
||||
origin: string,
|
||||
scope?: string,
|
||||
): Promise<Notification | undefined> => {
|
||||
const userNotification = {
|
||||
...baseNotification,
|
||||
id: uuid(),
|
||||
user,
|
||||
};
|
||||
const notification = await preProcessNotification(userNotification, opts);
|
||||
|
||||
const enabled = await isNotificationsEnabled({
|
||||
user,
|
||||
channel: WEB_NOTIFICATION_CHANNEL,
|
||||
origin: userNotification.origin,
|
||||
});
|
||||
|
||||
let ret = notification;
|
||||
|
||||
if (!enabled) {
|
||||
postProcessNotification(ret, opts);
|
||||
return undefined;
|
||||
}
|
||||
|
||||
let existingNotification;
|
||||
if (scope) {
|
||||
existingNotification = await store.getExistingScopeNotification({
|
||||
user,
|
||||
scope,
|
||||
origin,
|
||||
});
|
||||
}
|
||||
|
||||
if (existingNotification) {
|
||||
const restored = await store.restoreExistingNotification({
|
||||
id: existingNotification.id,
|
||||
notification,
|
||||
});
|
||||
ret = restored ?? notification;
|
||||
} else {
|
||||
await store.saveNotification(notification);
|
||||
}
|
||||
|
||||
if (signals) {
|
||||
await signals.publish<NewNotificationSignal>({
|
||||
recipients: { type: 'user', entityRef: [user] },
|
||||
message: {
|
||||
action: 'new_notification',
|
||||
notification_id: ret.id,
|
||||
},
|
||||
channel: 'notifications',
|
||||
});
|
||||
}
|
||||
postProcessNotification(ret, opts);
|
||||
return ret;
|
||||
};
|
||||
|
||||
const sendUserNotifications = async (
|
||||
baseNotification: Omit<Notification, 'user' | 'id'>,
|
||||
users: string[],
|
||||
opts: NotificationSendOptions,
|
||||
origin: string,
|
||||
) => {
|
||||
const notifications = [];
|
||||
): Promise<Notification[]> => {
|
||||
const { scope } = opts.payload;
|
||||
const uniqueUsers = [...new Set(users)];
|
||||
for (const user of uniqueUsers) {
|
||||
const userNotification = {
|
||||
...baseNotification,
|
||||
id: uuid(),
|
||||
user,
|
||||
};
|
||||
const notification = await preProcessNotification(userNotification, opts);
|
||||
|
||||
const enabled = await isNotificationsEnabled({
|
||||
user,
|
||||
channel: WEB_NOTIFICATION_CHANNEL,
|
||||
origin: userNotification.origin,
|
||||
});
|
||||
|
||||
let ret = notification;
|
||||
if (enabled) {
|
||||
let existingNotification;
|
||||
if (scope) {
|
||||
existingNotification = await store.getExistingScopeNotification({
|
||||
user,
|
||||
scope,
|
||||
origin,
|
||||
});
|
||||
}
|
||||
|
||||
if (existingNotification) {
|
||||
const restored = await store.restoreExistingNotification({
|
||||
id: existingNotification.id,
|
||||
notification,
|
||||
});
|
||||
ret = restored ?? notification;
|
||||
} else {
|
||||
await store.saveNotification(notification);
|
||||
}
|
||||
|
||||
notifications.push(ret);
|
||||
|
||||
if (signals) {
|
||||
await signals.publish<NewNotificationSignal>({
|
||||
recipients: { type: 'user', entityRef: [user] },
|
||||
message: {
|
||||
action: 'new_notification',
|
||||
notification_id: ret.id,
|
||||
},
|
||||
channel: 'notifications',
|
||||
});
|
||||
}
|
||||
}
|
||||
postProcessNotification(ret, opts);
|
||||
}
|
||||
return notifications;
|
||||
const throttled = throttle((user: string) =>
|
||||
sendUserNotification(baseNotification, user, opts, origin, scope),
|
||||
);
|
||||
const sent = await Promise.all(uniqueUsers.map(user => throttled(user)));
|
||||
return sent.filter(n => n !== undefined);
|
||||
};
|
||||
|
||||
const createNotificationHandler = async (
|
||||
|
||||
@@ -7053,12 +7053,14 @@ __metadata:
|
||||
"@backstage/plugin-notifications-node": "workspace:^"
|
||||
"@backstage/plugin-signals-backend": "workspace:^"
|
||||
"@backstage/plugin-signals-node": "workspace:^"
|
||||
"@backstage/types": "workspace:^"
|
||||
"@types/express": ^4.17.6
|
||||
"@types/supertest": ^2.0.8
|
||||
express: ^4.17.1
|
||||
express-promise-router: ^4.1.0
|
||||
knex: ^3.0.0
|
||||
msw: ^1.0.0
|
||||
p-throttle: ^4.1.1
|
||||
supertest: ^7.0.0
|
||||
uuid: ^11.0.0
|
||||
winston: ^3.2.1
|
||||
|
||||
Reference in New Issue
Block a user