Resolve entity's owner users in TbNotificationNode

This commit is contained in:
ViacheslavKlimov 2023-12-06 12:57:44 +02:00
parent d39fff85ef
commit 5191b690b7
12 changed files with 78 additions and 46 deletions

View File

@ -15,6 +15,7 @@
*/ */
package org.thingsboard.server.service.notification; package org.thingsboard.server.service.notification;
import com.google.common.util.concurrent.FutureCallback;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -79,7 +80,6 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.function.Consumer;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@Service @Service
@ -101,7 +101,7 @@ public class DefaultNotificationCenter extends AbstractSubscriptionService imple
private Map<NotificationDeliveryMethod, NotificationChannel> channels; private Map<NotificationDeliveryMethod, NotificationChannel> channels;
@Override @Override
public NotificationRequest processNotificationRequest(TenantId tenantId, NotificationRequest request, Consumer<NotificationRequestStats> callback) { public NotificationRequest processNotificationRequest(TenantId tenantId, NotificationRequest request, FutureCallback<NotificationRequestStats> callback) {
if (request.getRuleId() == null) { if (request.getRuleId() == null) {
if (!rateLimitService.checkRateLimit(LimitedApi.NOTIFICATION_REQUESTS, tenantId)) { if (!rateLimitService.checkRateLimit(LimitedApi.NOTIFICATION_REQUESTS, tenantId)) {
throw new TbRateLimitsException(EntityType.TENANT); throw new TbRateLimitsException(EntityType.TENANT);
@ -200,7 +200,7 @@ public class DefaultNotificationCenter extends AbstractSubscriptionService imple
} }
} }
private void processNotificationRequestAsync(NotificationProcessingContext ctx, List<NotificationTarget> targets, Consumer<NotificationRequestStats> callback) { private void processNotificationRequestAsync(NotificationProcessingContext ctx, List<NotificationTarget> targets, FutureCallback<NotificationRequestStats> callback) {
notificationExecutor.submit(() -> { notificationExecutor.submit(() -> {
NotificationRequestId requestId = ctx.getRequest().getId(); NotificationRequestId requestId = ctx.getRequest().getId();
for (NotificationTarget target : targets) { for (NotificationTarget target : targets) {
@ -208,33 +208,39 @@ public class DefaultNotificationCenter extends AbstractSubscriptionService imple
processForTarget(target, ctx); processForTarget(target, ctx);
} catch (Exception e) { } catch (Exception e) {
log.error("[{}] Failed to process notification request for target {}", requestId, target.getId(), e); log.error("[{}] Failed to process notification request for target {}", requestId, target.getId(), e);
ctx.getStats().setError(e.getMessage());
updateRequestStats(ctx, requestId, ctx.getStats());
if (callback != null) {
callback.onFailure(e);
}
return;
} }
} }
log.debug("[{}] Notification request processing is finished", requestId); log.debug("[{}] Notification request processing is finished", requestId);
NotificationRequestStats stats = ctx.getStats(); NotificationRequestStats stats = ctx.getStats();
try { updateRequestStats(ctx, requestId, stats);
notificationRequestService.updateNotificationRequest(ctx.getTenantId(), requestId, NotificationRequestStatus.SENT, stats);
} catch (Exception e) {
log.error("[{}] Failed to update stats for notification request", requestId, e);
}
if (callback != null) { if (callback != null) {
try { callback.onSuccess(stats);
callback.accept(stats);
} catch (Exception e) {
log.error("Failed to process callback for notification request {}", requestId, e);
}
} }
}); });
} }
private void updateRequestStats(NotificationProcessingContext ctx, NotificationRequestId requestId, NotificationRequestStats stats) {
try {
notificationRequestService.updateNotificationRequest(ctx.getTenantId(), requestId, NotificationRequestStatus.SENT, stats);
} catch (Exception e) {
log.error("[{}] Failed to update stats for notification request", requestId, e);
}
}
private void processForTarget(NotificationTarget target, NotificationProcessingContext ctx) { private void processForTarget(NotificationTarget target, NotificationProcessingContext ctx) {
Iterable<? extends NotificationRecipient> recipients; Iterable<? extends NotificationRecipient> recipients;
switch (target.getConfiguration().getType()) { switch (target.getConfiguration().getType()) {
case PLATFORM_USERS: { case PLATFORM_USERS: {
PlatformUsersNotificationTargetConfig targetConfig = (PlatformUsersNotificationTargetConfig) target.getConfiguration(); PlatformUsersNotificationTargetConfig targetConfig = (PlatformUsersNotificationTargetConfig) target.getConfiguration();
if (targetConfig.getUsersFilter().getType().isForRules()) { if (targetConfig.getUsersFilter().getType().isForRules() && ctx.getRequest().getInfo() instanceof RuleOriginatedNotificationInfo) {
recipients = new PageDataIterable<>(pageLink -> { recipients = new PageDataIterable<>(pageLink -> {
return notificationTargetService.findRecipientsForRuleNotificationTargetConfig(ctx.getTenantId(), targetConfig, (RuleOriginatedNotificationInfo) ctx.getRequest().getInfo(), pageLink); return notificationTargetService.findRecipientsForRuleNotificationTargetConfig(ctx.getTenantId(), targetConfig, (RuleOriginatedNotificationInfo) ctx.getRequest().getInfo(), pageLink);
}, 500); }, 500);

View File

@ -15,6 +15,7 @@
*/ */
package org.thingsboard.server.service.notification; package org.thingsboard.server.service.notification;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.SettableFuture;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.assertj.core.data.Offset; import org.assertj.core.data.Offset;
@ -709,7 +710,17 @@ public class NotificationApiTest extends AbstractNotificationApiTest {
private NotificationRequestStats submitNotificationRequestAndWait(NotificationRequest notificationRequest) throws Exception { private NotificationRequestStats submitNotificationRequestAndWait(NotificationRequest notificationRequest) throws Exception {
SettableFuture<NotificationRequestStats> future = SettableFuture.create(); SettableFuture<NotificationRequestStats> future = SettableFuture.create();
notificationCenter.processNotificationRequest(notificationRequest.getTenantId(), notificationRequest, future::set); notificationCenter.processNotificationRequest(notificationRequest.getTenantId(), notificationRequest, new FutureCallback<>() {
@Override
public void onSuccess(NotificationRequestStats result) {
future.set(result);
}
@Override
public void onFailure(Throwable t) {
future.setException(t);
}
});
return future.get(30, TimeUnit.SECONDS); return future.get(30, TimeUnit.SECONDS);
} }

View File

@ -19,6 +19,7 @@ import lombok.AllArgsConstructor;
import lombok.Builder; import lombok.Builder;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityId;
import java.util.HashMap; import java.util.HashMap;
@ -28,9 +29,10 @@ import java.util.Map;
@AllArgsConstructor @AllArgsConstructor
@NoArgsConstructor @NoArgsConstructor
@Builder @Builder
public class RuleEngineOriginatedNotificationInfo implements NotificationInfo { public class RuleEngineOriginatedNotificationInfo implements RuleOriginatedNotificationInfo {
private EntityId msgOriginator; private EntityId msgOriginator;
private CustomerId msgCustomerId;
private String msgType; private String msgType;
private Map<String, String> msgMetadata; private Map<String, String> msgMetadata;
private Map<String, String> msgData; private Map<String, String> msgData;
@ -43,6 +45,7 @@ public class RuleEngineOriginatedNotificationInfo implements NotificationInfo {
templateData.put("originatorType", msgOriginator.getEntityType().getNormalName()); templateData.put("originatorType", msgOriginator.getEntityType().getNormalName());
templateData.put("originatorId", msgOriginator.getId().toString()); templateData.put("originatorId", msgOriginator.getId().toString());
templateData.put("msgType", msgType); templateData.put("msgType", msgType);
templateData.put("customerId", msgCustomerId != null ? msgCustomerId.getId().toString() : "");
return templateData; return templateData;
} }
@ -51,4 +54,9 @@ public class RuleEngineOriginatedNotificationInfo implements NotificationInfo {
return msgOriginator; return msgOriginator;
} }
@Override
public CustomerId getAffectedCustomerId() {
return msgCustomerId;
}
} }

View File

@ -150,8 +150,9 @@ public class DefaultNotificationTargetService extends AbstractEntityService impl
return userService.findAllUsers(pageLink); return userService.findAllUsers(pageLink);
} }
} }
default:
throw new IllegalArgumentException("Recipient type not supported");
} }
return new PageData<>();
} }
@Override @Override
@ -178,6 +179,8 @@ public class DefaultNotificationTargetService extends AbstractEntityService impl
return userService.findTenantAdmins(affectedTenantId, pageLink); return userService.findTenantAdmins(affectedTenantId, pageLink);
} }
break; break;
default:
throw new IllegalArgumentException("Recipient type not supported");
} }
return new PageData<>(); return new PageData<>();
} }

View File

@ -15,6 +15,7 @@
*/ */
package org.thingsboard.rule.engine.api; package org.thingsboard.rule.engine.api;
import com.google.common.util.concurrent.FutureCallback;
import org.thingsboard.server.common.data.id.NotificationId; import org.thingsboard.server.common.data.id.NotificationId;
import org.thingsboard.server.common.data.id.NotificationRequestId; import org.thingsboard.server.common.data.id.NotificationRequestId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
@ -26,11 +27,10 @@ import org.thingsboard.server.common.data.notification.targets.platform.UsersFil
import org.thingsboard.server.common.data.notification.template.NotificationTemplate; import org.thingsboard.server.common.data.notification.template.NotificationTemplate;
import java.util.Set; import java.util.Set;
import java.util.function.Consumer;
public interface NotificationCenter { public interface NotificationCenter {
NotificationRequest processNotificationRequest(TenantId tenantId, NotificationRequest notificationRequest, Consumer<NotificationRequestStats> callback); NotificationRequest processNotificationRequest(TenantId tenantId, NotificationRequest notificationRequest, FutureCallback<NotificationRequestStats> callback);
void sendGeneralWebNotification(TenantId tenantId, UsersFilter recipients, NotificationTemplate template); void sendGeneralWebNotification(TenantId tenantId, UsersFilter recipients, NotificationTemplate template);

View File

@ -15,6 +15,7 @@
*/ */
package org.thingsboard.rule.engine.notification; package org.thingsboard.rule.engine.notification;
import com.google.common.util.concurrent.FutureCallback;
import org.thingsboard.common.util.DonAsynchron; import org.thingsboard.common.util.DonAsynchron;
import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.RuleNode;
@ -23,8 +24,10 @@ import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils; import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.rule.engine.external.TbAbstractExternalNode; import org.thingsboard.rule.engine.external.TbAbstractExternalNode;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.notification.NotificationRequest; import org.thingsboard.server.common.data.notification.NotificationRequest;
import org.thingsboard.server.common.data.notification.NotificationRequestConfig; import org.thingsboard.server.common.data.notification.NotificationRequestConfig;
import org.thingsboard.server.common.data.notification.NotificationRequestStats;
import org.thingsboard.server.common.data.notification.info.RuleEngineOriginatedNotificationInfo; import org.thingsboard.server.common.data.notification.info.RuleEngineOriginatedNotificationInfo;
import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsg;
@ -56,6 +59,8 @@ public class TbNotificationNode extends TbAbstractExternalNode {
public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException { public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException {
RuleEngineOriginatedNotificationInfo notificationInfo = RuleEngineOriginatedNotificationInfo.builder() RuleEngineOriginatedNotificationInfo notificationInfo = RuleEngineOriginatedNotificationInfo.builder()
.msgOriginator(msg.getOriginator()) .msgOriginator(msg.getOriginator())
.msgCustomerId(msg.getOriginator().getEntityType() == EntityType.CUSTOMER
&& msg.getOriginator().equals(msg.getCustomerId()) ? null : msg.getCustomerId())
.msgMetadata(msg.getMetaData().getData()) .msgMetadata(msg.getMetaData().getData())
.msgData(JacksonUtil.toFlatMap(JacksonUtil.toJsonNode(msg.getData()))) .msgData(JacksonUtil.toFlatMap(JacksonUtil.toJsonNode(msg.getData())))
.msgType(msg.getType()) .msgType(msg.getType())
@ -72,15 +77,23 @@ public class TbNotificationNode extends TbAbstractExternalNode {
var tbMsg = ackIfNeeded(ctx, msg); var tbMsg = ackIfNeeded(ctx, msg);
DonAsynchron.withCallback(ctx.getNotificationExecutor().executeAsync(() -> var callback = new FutureCallback<NotificationRequestStats>() {
ctx.getNotificationCenter().processNotificationRequest(ctx.getTenantId(), notificationRequest, stats -> { @Override
TbMsgMetaData metaData = tbMsg.getMetaData().copy(); public void onSuccess(NotificationRequestStats stats) {
metaData.putValue("notificationRequestResult", JacksonUtil.toString(stats)); TbMsgMetaData metaData = tbMsg.getMetaData().copy();
tellSuccess(ctx, TbMsg.transformMsgMetadata(tbMsg, metaData)); metaData.putValue("notificationRequestResult", JacksonUtil.toString(stats));
})), tellSuccess(ctx, TbMsg.transformMsgMetadata(tbMsg, metaData));
r -> { }
},
e -> tellFailure(ctx, tbMsg, e)); @Override
public void onFailure(Throwable e) {
tellFailure(ctx, tbMsg, e);
}
};
var future = ctx.getNotificationExecutor().executeAsync(() ->
ctx.getNotificationCenter().processNotificationRequest(ctx.getTenantId(), notificationRequest, callback));
DonAsynchron.withCallback(future, r -> {}, callback::onFailure);
} }
} }

View File

@ -442,14 +442,12 @@ export const NotificationTargetConfigTypeInfoMap = new Map<NotificationTargetCon
], ],
[NotificationTargetConfigType.ORIGINATOR_ENTITY_OWNER_USERS, [NotificationTargetConfigType.ORIGINATOR_ENTITY_OWNER_USERS,
{ {
name: 'notification.recipient-type.users-entity-owner', name: 'notification.recipient-type.users-entity-owner'
hint: 'notification.recipient-type.users-entity-owner-hint'
} }
], ],
[NotificationTargetConfigType.AFFECTED_USER, [NotificationTargetConfigType.AFFECTED_USER,
{ {
name: 'notification.recipient-type.affected-user', name: 'notification.recipient-type.affected-user'
hint: 'notification.recipient-type.affected-user-hint'
} }
], ],
[NotificationTargetConfigType.SYSTEM_ADMINISTRATORS, [NotificationTargetConfigType.SYSTEM_ADMINISTRATORS,

View File

@ -13,6 +13,7 @@ Available template parameters:
* values from the incoming message data referenced using the data key name; * values from the incoming message data referenced using the data key name;
* `originatorType` - type of the originator, e.g. 'Device'; * `originatorType` - type of the originator, e.g. 'Device';
* `originatorId` - id of the originator * `originatorId` - id of the originator
* `customerId` - id of the customer if any
* `msgType` - type of the message * `msgType` - type of the message
* `recipientTitle` - title of the recipient (first and last name if specified, email otherwise); * `recipientTitle` - title of the recipient (first and last name if specified, email otherwise);
* `recipientEmail` - email of the recipient; * `recipientEmail` - email of the recipient;

View File

@ -3331,15 +3331,13 @@
"recipient-type": { "recipient-type": {
"affected-tenant-administrators": "Affected tenant administrators", "affected-tenant-administrators": "Affected tenant administrators",
"affected-user": "Affected user", "affected-user": "Affected user",
"affected-user-hint": "Affected user hint",
"all-users": "All users", "all-users": "All users",
"customer-users": "Customer users", "customer-users": "Customer users",
"system-administrators": "System administrators", "system-administrators": "System administrators",
"tenant-administrators": "Tenant administrators", "tenant-administrators": "Tenant administrators",
"user-filters": "User filter", "user-filters": "User filter",
"user-list": "User list", "user-list": "User list",
"users-entity-owner": "Users of the entity owner", "users-entity-owner": "Users of the entity owner"
"users-entity-owner-hint": "Users of the entity owner hint"
}, },
"recipients": "Recipients", "recipients": "Recipients",
"notification-recipients": "Notifications / Recipients", "notification-recipients": "Notifications / Recipients",

View File

@ -3247,15 +3247,13 @@
"recipient-type": { "recipient-type": {
"affected-tenant-administrators": "Administradores afectados", "affected-tenant-administrators": "Administradores afectados",
"affected-user": "Usuario afectado", "affected-user": "Usuario afectado",
"affected-user-hint": "Sugerencia en usuario afectado",
"all-users": "Todos los usuarios", "all-users": "Todos los usuarios",
"customer-users": "Usuarios del cliente", "customer-users": "Usuarios del cliente",
"system-administrators": "Administradores del sistema", "system-administrators": "Administradores del sistema",
"tenant-administrators": "Administradores de propietarios", "tenant-administrators": "Administradores de propietarios",
"user-filters": "Filtro de usuarios", "user-filters": "Filtro de usuarios",
"user-list": "Lista de usuarios", "user-list": "Lista de usuarios",
"users-entity-owner": "Usuarios que sean propietarios de la entidad", "users-entity-owner": "Usuarios que sean propietarios de la entidad"
"users-entity-owner-hint": "Sugerencia en usuarios propietarios de la entidad"
}, },
"recipients": "Destinatarios", "recipients": "Destinatarios",
"notification-recipients": "Notificaciones / Destinatarios", "notification-recipients": "Notificaciones / Destinatarios",

View File

@ -3902,7 +3902,6 @@
"recipient-type": { "recipient-type": {
"affected-tenant-administrators": "Betrokken tenantbeheerders", "affected-tenant-administrators": "Betrokken tenantbeheerders",
"affected-user": "Betrokken gebruiker", "affected-user": "Betrokken gebruiker",
"affected-user-hint": "Betrokken gebruikershint",
"all-users": "Alle gebruikers", "all-users": "Alle gebruikers",
"customer-users": "Klant-gebruikers", "customer-users": "Klant-gebruikers",
"system-administrators": "Systeembeheerders", "system-administrators": "Systeembeheerders",
@ -3911,8 +3910,7 @@
"user-group-list": "Lijst met gebruikersgroepen", "user-group-list": "Lijst met gebruikersgroepen",
"user-list": "Lijst met gebruikers", "user-list": "Lijst met gebruikers",
"user-role": "Rol van de gebruiker", "user-role": "Rol van de gebruiker",
"users-entity-owner": "Gebruikers van de entiteitseigenaar", "users-entity-owner": "Gebruikers van de entiteitseigenaar"
"users-entity-owner-hint": "Hint voor gebruikers van de eigenaar van de entiteit"
}, },
"recipients": "Ontvangers", "recipients": "Ontvangers",
"notification-recipients": "Meldingen / Ontvangers", "notification-recipients": "Meldingen / Ontvangers",
@ -6950,4 +6948,4 @@
"zh_TW": "繁體中文" "zh_TW": "繁體中文"
} }
} }
} }

View File

@ -2915,15 +2915,13 @@
"recipient-type": { "recipient-type": {
"affected-tenant-administrators": "受影响的租户管理员", "affected-tenant-administrators": "受影响的租户管理员",
"affected-user": "受影响的用户", "affected-user": "受影响的用户",
"affected-user-hint": "受影响用户的提示",
"all-users": "所有用户", "all-users": "所有用户",
"customer-users": "客户用户", "customer-users": "客户用户",
"system-administrators": "系统管理员", "system-administrators": "系统管理员",
"tenant-administrators": "租户管理员", "tenant-administrators": "租户管理员",
"user-filters": "用户筛选器", "user-filters": "用户筛选器",
"user-list": "用户列表", "user-list": "用户列表",
"users-entity-owner": "实体所有者的用户", "users-entity-owner": "实体所有者的用户"
"users-entity-owner-hint": "实体所有者用户的提示"
}, },
"recipients": "收件人", "recipients": "收件人",
"notification-recipients": "通知 / 收件人", "notification-recipients": "通知 / 收件人",