Notification rules processing; refactoring

This commit is contained in:
ViacheslavKlimov 2022-11-08 16:04:50 +02:00
parent b73e89d691
commit 165e86c82b
35 changed files with 379 additions and 107 deletions

View File

@ -19,12 +19,18 @@ CREATE TABLE IF NOT EXISTS notification_target (
created_time BIGINT NOT NULL, created_time BIGINT NOT NULL,
tenant_id UUID NOT NULL, tenant_id UUID NOT NULL,
name VARCHAR(255) NOT NULL, name VARCHAR(255) NOT NULL,
configuration varchar(1000) NOT NULL configuration VARCHAR(1000) NOT NULL
); );
CREATE INDEX IF NOT EXISTS idx_notification_target_tenant_id_created_time ON notification_target(tenant_id, created_time DESC); CREATE INDEX IF NOT EXISTS idx_notification_target_tenant_id_created_time ON notification_target(tenant_id, created_time DESC);
CREATE TABLE IF NOT EXISTS notification_rule ( CREATE TABLE IF NOT EXISTS notification_rule (
id UUID NOT NULL CONSTRAINT notification_rule_pkey PRIMARY KEY id UUID NOT NULL CONSTRAINT notification_rule_pkey PRIMARY KEY,
created_time BIGINT NOT NULL,
tenant_id UUID NOT NULL,
name VARCHAR(255) NOT NULL,
notification_text_template VARCHAR NOT NULL,
initial_notification_target_id UUID NULL CONSTRAINT fk_notification_rule_target_id REFERENCES notification_target(id),
escalation_config VARCHAR(500)
); );
ALTER TABLE alarm ADD COLUMN IF NOT EXISTS notification_rule_id UUID; ALTER TABLE alarm ADD COLUMN IF NOT EXISTS notification_rule_id UUID;

View File

@ -0,0 +1,81 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.controller;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.security.core.annotation.AuthenticationPrincipal;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.common.data.id.NotificationRuleId;
import org.thingsboard.server.common.data.notification.rule.NotificationRule;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.dao.notification.NotificationRuleService;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.security.model.SecurityUser;
import org.thingsboard.server.service.security.permission.Operation;
import org.thingsboard.server.service.security.permission.Resource;
import java.util.UUID;
@RestController
@TbCoreComponent
@RequestMapping("/api")
@RequiredArgsConstructor
@Slf4j
public class NotificationRuleController extends BaseController {
// todo: logEntityAction
private final NotificationRuleService notificationRuleService;
@PostMapping("/notification/rule")
@PreAuthorize("hasAnyAuthority('TENANT_ADMIN')")
public NotificationRule saveNotificationRule(@RequestBody NotificationRule notificationRule,
@AuthenticationPrincipal SecurityUser user) throws ThingsboardException {
// accessControlService.checkPermission(user, Resource.NOTIFICATION_RULE, notificationRule.getId() == null ? Operation.CREATE : Operation.WRITE, notificationRule.getId(), );
return notificationRuleService.saveNotificationRule(user.getTenantId(), notificationRule);
}
@GetMapping("/notification/rule/{id}")
@PreAuthorize("hasAnyAuthority('TENANT_ADMIN')")
public NotificationRule getNotificationRuleById(@PathVariable UUID id,
@AuthenticationPrincipal SecurityUser user) throws ThingsboardException {
NotificationRuleId notificationRuleId = new NotificationRuleId(id);
NotificationRule notificationRule = notificationRuleService.findNotificationRuleById(user.getTenantId(), notificationRuleId);
accessControlService.checkPermission(user, Resource.NOTIFICATION_RULE, Operation.READ, notificationRuleId, notificationRule);
return notificationRule;
}
@GetMapping("/notification/rules")
@PreAuthorize("hasAnyAuthority('TENANT_ADMIN')")
public PageData<NotificationRule> getNotificationRules(@RequestParam int pageSize,
@RequestParam int page,
@RequestParam(required = false) String textSearch,
@RequestParam(required = false) String sortProperty,
@RequestParam(required = false) String sortOrder,
@AuthenticationPrincipal SecurityUser user) throws ThingsboardException {
PageLink pageLink = createPageLink(pageSize, page, textSearch, sortProperty, sortOrder);
return notificationRuleService.findNotificationRulesByTenantId(user.getTenantId(), pageLink);
}
}

View File

@ -32,20 +32,17 @@ import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.audit.ActionType; import org.thingsboard.server.common.data.audit.ActionType;
import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.common.data.id.NotificationTargetId; import org.thingsboard.server.common.data.id.NotificationTargetId;
import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.common.data.notification.targets.NotificationTarget; import org.thingsboard.server.common.data.notification.targets.NotificationTarget;
import org.thingsboard.server.common.data.notification.targets.NotificationTargetConfig; import org.thingsboard.server.common.data.notification.targets.NotificationTargetConfig;
import org.thingsboard.server.common.data.notification.targets.NotificationTargetConfigType; import org.thingsboard.server.common.data.notification.targets.NotificationTargetConfigType;
import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.dao.DaoUtil;
import org.thingsboard.server.dao.notification.NotificationTargetService; import org.thingsboard.server.dao.notification.NotificationTargetService;
import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.security.model.SecurityUser; import org.thingsboard.server.service.security.model.SecurityUser;
import org.thingsboard.server.service.security.permission.Operation; import org.thingsboard.server.service.security.permission.Operation;
import org.thingsboard.server.service.security.permission.Resource; import org.thingsboard.server.service.security.permission.Resource;
import java.util.List;
import java.util.UUID; import java.util.UUID;
@RestController @RestController
@ -120,7 +117,7 @@ public class NotificationTargetController extends BaseController {
@RequestParam(required = false) String sortOrder, @RequestParam(required = false) String sortOrder,
@AuthenticationPrincipal SecurityUser user) throws ThingsboardException { @AuthenticationPrincipal SecurityUser user) throws ThingsboardException {
PageLink pageLink = createPageLink(pageSize, page, textSearch, sortProperty, sortOrder); PageLink pageLink = createPageLink(pageSize, page, textSearch, sortProperty, sortOrder);
return notificationTargetService.findNotificationTargetsByTenantIdAndPageLink(user.getTenantId(), pageLink); return notificationTargetService.findNotificationTargetsByTenantId(user.getTenantId(), pageLink);
} }
@DeleteMapping("/target/{id}") @DeleteMapping("/target/{id}")

View File

@ -41,9 +41,9 @@ import org.thingsboard.server.service.security.model.SecurityUser;
import org.thingsboard.server.service.security.model.UserPrincipal; import org.thingsboard.server.service.security.model.UserPrincipal;
import org.thingsboard.server.service.ws.SessionEvent; import org.thingsboard.server.service.ws.SessionEvent;
import org.thingsboard.server.service.ws.WebSocketMsgEndpoint; import org.thingsboard.server.service.ws.WebSocketMsgEndpoint;
import org.thingsboard.server.service.ws.WebSocketSessionType;
import org.thingsboard.server.service.ws.WebSocketService; import org.thingsboard.server.service.ws.WebSocketService;
import org.thingsboard.server.service.ws.WebSocketSessionRef; import org.thingsboard.server.service.ws.WebSocketSessionRef;
import org.thingsboard.server.service.ws.WebSocketSessionType;
import javax.websocket.RemoteEndpoint; import javax.websocket.RemoteEndpoint;
import javax.websocket.SendHandler; import javax.websocket.SendHandler;
@ -58,6 +58,7 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.thingsboard.server.service.ws.DefaultWebSocketService.NUMBER_OF_PING_ATTEMPTS; import static org.thingsboard.server.service.ws.DefaultWebSocketService.NUMBER_OF_PING_ATTEMPTS;
@ -215,7 +216,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
private final RemoteEndpoint.Async asyncRemote; private final RemoteEndpoint.Async asyncRemote;
private final WebSocketSessionRef sessionRef; private final WebSocketSessionRef sessionRef;
private volatile boolean isSending = false; private final AtomicBoolean isSending = new AtomicBoolean(false);
private final Queue<TbWebSocketMsg<?>> msgQueue; private final Queue<TbWebSocketMsg<?>> msgQueue;
private volatile long lastActivityTime; private volatile long lastActivityTime;
@ -262,7 +263,9 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
} }
synchronized void sendMsg(TbWebSocketMsg<?> msg) { synchronized void sendMsg(TbWebSocketMsg<?> msg) {
if (isSending) { if (isSending.compareAndSet(false, true)) {
sendMsgInternal(msg);
} else {
try { try {
msgQueue.add(msg); msgQueue.add(msg);
} catch (RuntimeException e) { } catch (RuntimeException e) {
@ -273,9 +276,6 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
} }
closeSession(CloseStatus.POLICY_VIOLATION.withReason("Max pending updates limit reached!")); closeSession(CloseStatus.POLICY_VIOLATION.withReason("Max pending updates limit reached!"));
} }
} else {
isSending = true;
sendMsgInternal(msg);
} }
} }
@ -310,7 +310,7 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements WebSocke
if (msg != null) { if (msg != null) {
sendMsgInternal(msg); sendMsgInternal(msg);
} else { } else {
isSending = false; isSending.set(false);
} }
} }
} }

View File

@ -39,8 +39,10 @@ import org.thingsboard.server.dao.notification.NotificationRequestService;
import org.thingsboard.server.dao.notification.NotificationService; import org.thingsboard.server.dao.notification.NotificationService;
import org.thingsboard.server.dao.notification.NotificationTargetService; import org.thingsboard.server.dao.notification.NotificationTargetService;
import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.NotificationsTopicService; import org.thingsboard.server.queue.discovery.NotificationsTopicService;
import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
import org.thingsboard.server.service.executors.DbCallbackExecutorService; import org.thingsboard.server.service.executors.DbCallbackExecutorService;
import org.thingsboard.server.service.subscription.TbSubscriptionUtils; import org.thingsboard.server.service.subscription.TbSubscriptionUtils;
import org.thingsboard.server.service.telemetry.AbstractSubscriptionService; import org.thingsboard.server.service.telemetry.AbstractSubscriptionService;
@ -61,24 +63,27 @@ public class DefaultNotificationManager extends AbstractSubscriptionService impl
private final NotificationService notificationService; private final NotificationService notificationService;
private final DbCallbackExecutorService dbCallbackExecutorService; private final DbCallbackExecutorService dbCallbackExecutorService;
private final NotificationsTopicService notificationsTopicService; private final NotificationsTopicService notificationsTopicService;
private final TbQueueProducerProvider producerProvider;
public DefaultNotificationManager(TbClusterService clusterService, PartitionService partitionService, public DefaultNotificationManager(TbClusterService clusterService, PartitionService partitionService,
NotificationTargetService notificationTargetService, NotificationTargetService notificationTargetService,
NotificationRequestService notificationRequestService, NotificationRequestService notificationRequestService,
NotificationService notificationService, NotificationService notificationService,
DbCallbackExecutorService dbCallbackExecutorService, DbCallbackExecutorService dbCallbackExecutorService,
NotificationsTopicService notificationsTopicService) { NotificationsTopicService notificationsTopicService,
TbQueueProducerProvider producerProvider) {
super(clusterService, partitionService); super(clusterService, partitionService);
this.notificationTargetService = notificationTargetService; this.notificationTargetService = notificationTargetService;
this.notificationRequestService = notificationRequestService; this.notificationRequestService = notificationRequestService;
this.notificationService = notificationService; this.notificationService = notificationService;
this.dbCallbackExecutorService = dbCallbackExecutorService; this.dbCallbackExecutorService = dbCallbackExecutorService;
this.notificationsTopicService = notificationsTopicService; this.notificationsTopicService = notificationsTopicService;
this.producerProvider = producerProvider;
} }
@Override @Override
public NotificationRequest processNotificationRequest(TenantId tenantId, NotificationRequest notificationRequest) { public NotificationRequest processNotificationRequest(TenantId tenantId, NotificationRequest notificationRequest) {
log.info("Processing notification request (tenant id: {}, notification target id: {})", tenantId, notificationRequest.getTargetId()); log.debug("Processing notification request (tenant id: {}, notification target id: {})", tenantId, notificationRequest.getTargetId());
notificationRequest.setTenantId(tenantId); notificationRequest.setTenantId(tenantId);
if (notificationRequest.getAdditionalConfig() != null) { if (notificationRequest.getAdditionalConfig() != null) {
NotificationRequestConfig config = notificationRequest.getAdditionalConfig(); NotificationRequestConfig config = notificationRequest.getAdditionalConfig();
@ -201,11 +206,11 @@ public class DefaultNotificationManager extends AbstractSubscriptionService impl
private void onNotificationRequestUpdate(TenantId tenantId, NotificationRequestUpdate update) { private void onNotificationRequestUpdate(TenantId tenantId, NotificationRequestUpdate update) {
log.trace("Submitting notification request update: {}", update); log.trace("Submitting notification request update: {}", update);
wsCallBackExecutor.submit(() -> { wsCallBackExecutor.submit(() -> {
TransportProtos.ToCoreMsg notificationRequestDeletedProto = TbSubscriptionUtils.notificationRequestUpdateToProto(tenantId, update); TransportProtos.ToCoreNotificationMsg notificationRequestUpdateProto = TbSubscriptionUtils.notificationRequestUpdateToProto(tenantId, update);
Set<String> coreServices = new HashSet<>(partitionService.getAllServiceIds(ServiceType.TB_CORE)); Set<String> coreServices = new HashSet<>(partitionService.getAllServiceIds(ServiceType.TB_CORE));
for (String serviceId : coreServices) { for (String serviceId : coreServices) {
TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_CORE, serviceId); TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_CORE, serviceId);
clusterService.pushMsgToCore(tpi, UUID.randomUUID(), notificationRequestDeletedProto, null); producerProvider.getTbCoreNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), notificationRequestUpdateProto), null);
} }
}); });
} }

View File

@ -29,7 +29,6 @@ import org.thingsboard.server.common.data.notification.NotificationInfo;
import org.thingsboard.server.common.data.notification.NotificationOriginatorType; import org.thingsboard.server.common.data.notification.NotificationOriginatorType;
import org.thingsboard.server.common.data.notification.NotificationRequest; import org.thingsboard.server.common.data.notification.NotificationRequest;
import org.thingsboard.server.common.data.notification.NotificationRequestConfig; import org.thingsboard.server.common.data.notification.NotificationRequestConfig;
import org.thingsboard.server.common.data.notification.NotificationRequestStatus;
import org.thingsboard.server.common.data.notification.NotificationSeverity; import org.thingsboard.server.common.data.notification.NotificationSeverity;
import org.thingsboard.server.common.data.notification.rule.NonConfirmedNotificationEscalation; import org.thingsboard.server.common.data.notification.rule.NonConfirmedNotificationEscalation;
import org.thingsboard.server.common.data.notification.rule.NotificationRule; import org.thingsboard.server.common.data.notification.rule.NotificationRule;
@ -51,42 +50,54 @@ public class DefaultNotificationRuleProcessingService implements NotificationRul
private final DbCallbackExecutorService dbCallbackExecutorService; private final DbCallbackExecutorService dbCallbackExecutorService;
@Override @Override
public ListenableFuture<?> onAlarmCreatedOrUpdated(TenantId tenantId, Alarm alarm) { public ListenableFuture<Void> onAlarmCreatedOrUpdated(TenantId tenantId, Alarm alarm) {
return processAlarmUpdate(tenantId, alarm); return processAlarmUpdate(tenantId, alarm, false);
} }
@Override @Override
public ListenableFuture<?> onAlarmAcknowledged(TenantId tenantId, Alarm alarm) { public ListenableFuture<Void> onAlarmDeleted(TenantId tenantId, Alarm alarm) {
return processAlarmUpdate(tenantId, alarm); return processAlarmUpdate(tenantId, alarm, true);
} }
private ListenableFuture<?> processAlarmUpdate(TenantId tenantId, Alarm alarm) { private ListenableFuture<Void> processAlarmUpdate(TenantId tenantId, Alarm alarm, boolean deleted) {
if (alarm.getNotificationRuleId() == null) return Futures.immediateFuture(null); if (alarm.getNotificationRuleId() == null) return Futures.immediateFuture(null);
return dbCallbackExecutorService.submit(() -> { return dbCallbackExecutorService.submit(() -> {
onAlarmUpdate(tenantId, alarm.getNotificationRuleId(), alarm); onAlarmUpdate(tenantId, alarm.getNotificationRuleId(), alarm, deleted);
return null;
}); });
} }
private void onAlarmUpdate(TenantId tenantId, NotificationRuleId notificationRuleId, Alarm alarm) { @Override
public ListenableFuture<Void> onNotificationRuleDeleted(TenantId tenantId, NotificationRuleId ruleId) {
return dbCallbackExecutorService.submit(() -> {
// need to remove fk constraint in notificationRequest to rule
// todo: do we need to remove all notifications when notification request is deleted?
return null;
});
}
// todo: think about: what if notification rule was updated?
private void onAlarmUpdate(TenantId tenantId, NotificationRuleId notificationRuleId, Alarm alarm, boolean deleted) {
List<NotificationRequest> notificationRequests = notificationRequestService.findNotificationRequestsByRuleIdAndOriginatorEntityId(tenantId, notificationRuleId, alarm.getId()); List<NotificationRequest> notificationRequests = notificationRequestService.findNotificationRequestsByRuleIdAndOriginatorEntityId(tenantId, notificationRuleId, alarm.getId());
NotificationRule notificationRule = notificationRuleService.findNotificationRuleById(tenantId, notificationRuleId); NotificationRule notificationRule = notificationRuleService.findNotificationRuleById(tenantId, notificationRuleId);
if (notificationRule == null) return;
if (notificationRequests.isEmpty()) { // in case it is first notification for alarm, or it was previously acked and now we need to send notifications again if (alarmAcknowledged(alarm) || deleted) {
NotificationTargetId initialNotificationTargetId = notificationRule.getInitialNotificationTargetId();
submitNotificationRequest(tenantId, initialNotificationTargetId, notificationRule, alarm, 0);
for (NonConfirmedNotificationEscalation escalation : notificationRule.getEscalations()) {
submitNotificationRequest(tenantId, escalation.getNotificationTargetId(), notificationRule, alarm, escalation.getDelayInMinutes());
}
} else {
if (alarmAcknowledged(alarm)) {
for (NotificationRequest notificationRequest : notificationRequests) { for (NotificationRequest notificationRequest : notificationRequests) {
if (notificationRequest.getStatus() == NotificationRequestStatus.SCHEDULED) {
// using regular service due to no need to send an update to subscription manager
notificationRequestService.deleteNotificationRequestById(tenantId, notificationRequest.getId());
} else {
notificationManager.deleteNotificationRequest(tenantId, notificationRequest.getId()); notificationManager.deleteNotificationRequest(tenantId, notificationRequest.getId());
// todo: or should we mark already sent notifications as read? // todo: or should we mark already sent notifications as read and delete only scheduled?
}
return;
}
if (notificationRequests.isEmpty()) {
NotificationTargetId initialNotificationTargetId = notificationRule.getInitialNotificationTargetId();
if (initialNotificationTargetId != null) {
submitNotificationRequest(tenantId, initialNotificationTargetId, notificationRule, alarm, 0);
}
if (notificationRule.getEscalationConfig() != null) {
for (NonConfirmedNotificationEscalation escalation : notificationRule.getEscalationConfig().getEscalations()) {
submitNotificationRequest(tenantId, escalation.getNotificationTargetId(), notificationRule, alarm, escalation.getDelayInMinutes());
} }
} }
} else { } else {
@ -97,8 +108,6 @@ public class DefaultNotificationRuleProcessingService implements NotificationRul
notificationRequest.setNotificationInfo(newNotificationInfo); notificationRequest.setNotificationInfo(newNotificationInfo);
notificationManager.updateNotificationRequest(tenantId, notificationRequest); notificationManager.updateNotificationRequest(tenantId, notificationRequest);
} }
// fixme: no need to send an update event for scheduled requests, only for sent
}
} }
} }
} }

View File

@ -17,12 +17,15 @@ package org.thingsboard.server.service.notification;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.server.common.data.alarm.Alarm; import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.id.NotificationRuleId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
public interface NotificationRuleProcessingService { public interface NotificationRuleProcessingService {
ListenableFuture<?> onAlarmCreatedOrUpdated(TenantId tenantId, Alarm alarm); ListenableFuture<Void> onAlarmCreatedOrUpdated(TenantId tenantId, Alarm alarm);
ListenableFuture<?> onAlarmAcknowledged(TenantId tenantId, Alarm alarm); ListenableFuture<Void> onAlarmDeleted(TenantId tenantId, Alarm alarm);
ListenableFuture<Void> onNotificationRuleDeleted(TenantId tenantId, NotificationRuleId ruleId);
} }

View File

@ -347,6 +347,8 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
} else if (toCoreNotification.hasVcResponseMsg()) { } else if (toCoreNotification.hasVcResponseMsg()) {
vcQueueService.processResponse(toCoreNotification.getVcResponseMsg()); vcQueueService.processResponse(toCoreNotification.getVcResponseMsg());
callback.onSuccess(); callback.onSuccess();
} else if (toCoreNotification.hasToSubscriptionMgrMsg()) {
forwardToSubMgrService(toCoreNotification.getToSubscriptionMgrMsg(), callback);
} }
if (statsEnabled) { if (statsEnabled) {
stats.log(toCoreNotification); stats.log(toCoreNotification);

View File

@ -45,7 +45,8 @@ public enum Resource {
QUEUE(EntityType.QUEUE), QUEUE(EntityType.QUEUE),
VERSION_CONTROL, VERSION_CONTROL,
NOTIFICATION_TARGET(EntityType.NOTIFICATION_TARGET), NOTIFICATION_TARGET(EntityType.NOTIFICATION_TARGET),
NOTIFICATION_REQUEST(EntityType.NOTIFICATION_REQUEST); NOTIFICATION_REQUEST(EntityType.NOTIFICATION_REQUEST),
NOTIFICATION_RULE(EntityType.NOTIFICATION_RULE);
private final EntityType entityType; private final EntityType entityType;

View File

@ -51,6 +51,7 @@ public class TenantAdminPermissions extends AbstractPermissions {
put(Resource.VERSION_CONTROL, PermissionChecker.allowAllPermissionChecker); put(Resource.VERSION_CONTROL, PermissionChecker.allowAllPermissionChecker);
put(Resource.NOTIFICATION_TARGET, tenantEntityPermissionChecker); put(Resource.NOTIFICATION_TARGET, tenantEntityPermissionChecker);
put(Resource.NOTIFICATION_REQUEST, tenantEntityPermissionChecker); put(Resource.NOTIFICATION_REQUEST, tenantEntityPermissionChecker);
put(Resource.NOTIFICATION_RULE, tenantEntityPermissionChecker);
} }
public static final PermissionChecker tenantEntityPermissionChecker = new PermissionChecker() { public static final PermissionChecker tenantEntityPermissionChecker = new PermissionChecker() {

View File

@ -331,6 +331,7 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene
s -> alarm, s -> alarm,
true true
); );
notificationRuleProcessingService.onAlarmDeleted(tenantId, alarm);
callback.onSuccess(); callback.onSuccess();
} }

View File

@ -50,6 +50,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesDeletePr
import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesSubscriptionProto; import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesSubscriptionProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesUpdateProto; import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesUpdateProto;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto; import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto;
import org.thingsboard.server.service.ws.notification.sub.NotificationRequestUpdate; import org.thingsboard.server.service.ws.notification.sub.NotificationRequestUpdate;
import org.thingsboard.server.service.ws.notification.sub.NotificationUpdate; import org.thingsboard.server.service.ws.notification.sub.NotificationUpdate;
@ -380,7 +381,7 @@ public class TbSubscriptionUtils {
return ToCoreMsg.newBuilder().setToSubscriptionMgrMsg(msgBuilder.build()).build(); return ToCoreMsg.newBuilder().setToSubscriptionMgrMsg(msgBuilder.build()).build();
} }
public static TransportProtos.ToCoreNotificationMsg notificationsSubUpdateToProto(TbSubscription subscription, NotificationsSubscriptionUpdate update) { public static ToCoreNotificationMsg notificationsSubUpdateToProto(TbSubscription subscription, NotificationsSubscriptionUpdate update) {
TransportProtos.NotificationsSubscriptionUpdateProto.Builder updateProto = TransportProtos.NotificationsSubscriptionUpdateProto.newBuilder() TransportProtos.NotificationsSubscriptionUpdateProto.Builder updateProto = TransportProtos.NotificationsSubscriptionUpdateProto.newBuilder()
.setSessionId(subscription.getSessionId()) .setSessionId(subscription.getSessionId())
.setSubscriptionId(subscription.getSubscriptionId()); .setSubscriptionId(subscription.getSubscriptionId());
@ -390,7 +391,7 @@ public class TbSubscriptionUtils {
if (update.getNotificationRequestUpdate() != null) { if (update.getNotificationRequestUpdate() != null) {
updateProto.setNotificationRequestUpdate(JacksonUtil.toString(update.getNotificationRequestUpdate())); updateProto.setNotificationRequestUpdate(JacksonUtil.toString(update.getNotificationRequestUpdate()));
} }
return TransportProtos.ToCoreNotificationMsg.newBuilder() return ToCoreNotificationMsg.newBuilder()
.setToLocalSubscriptionServiceMsg(TransportProtos.LocalSubscriptionServiceMsgProto.newBuilder() .setToLocalSubscriptionServiceMsg(TransportProtos.LocalSubscriptionServiceMsgProto.newBuilder()
.setNotificationsSubUpdate(updateProto) .setNotificationsSubUpdate(updateProto)
.build()) .build())
@ -412,13 +413,13 @@ public class TbSubscriptionUtils {
.build(); .build();
} }
public static ToCoreMsg notificationRequestUpdateToProto(TenantId tenantId, NotificationRequestUpdate notificationRequestUpdate) { public static ToCoreNotificationMsg notificationRequestUpdateToProto(TenantId tenantId, NotificationRequestUpdate notificationRequestUpdate) {
TransportProtos.NotificationRequestUpdateProto updateProto = TransportProtos.NotificationRequestUpdateProto.newBuilder() TransportProtos.NotificationRequestUpdateProto updateProto = TransportProtos.NotificationRequestUpdateProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits()) .setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) .setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setUpdate(JacksonUtil.toString(notificationRequestUpdate)) .setUpdate(JacksonUtil.toString(notificationRequestUpdate))
.build(); .build();
return ToCoreMsg.newBuilder() return ToCoreNotificationMsg.newBuilder()
.setToSubscriptionMgrMsg(SubscriptionMgrMsgProto.newBuilder() .setToSubscriptionMgrMsg(SubscriptionMgrMsgProto.newBuilder()
.setNotificationRequestUpdate(updateProto) .setNotificationRequestUpdate(updateProto)
.build()) .build())

View File

@ -168,7 +168,6 @@ public class DefaultAlarmSubscriptionService extends AbstractSubscriptionService
}); });
} }
}); });
// todo: handle notification rule
} }
private void onAlarmDeleted(AlarmOperationResult result) { private void onAlarmDeleted(AlarmOperationResult result) {

View File

@ -105,8 +105,10 @@ public class DefaultNotificationCommandsHandler implements NotificationCommandsH
log.trace("[{}, subId: {}] Fetching unread notifications from DB", subscription.getSessionId(), subscription.getSubscriptionId()); log.trace("[{}, subId: {}] Fetching unread notifications from DB", subscription.getSessionId(), subscription.getSubscriptionId());
PageData<Notification> notifications = notificationService.findLatestUnreadNotificationsByUserId(subscription.getTenantId(), PageData<Notification> notifications = notificationService.findLatestUnreadNotificationsByUserId(subscription.getTenantId(),
(UserId) subscription.getEntityId(), subscription.getLimit()); (UserId) subscription.getEntityId(), subscription.getLimit());
subscription.getUnreadNotifications().clear(); subscription.getLatestUnreadNotifications().clear();
subscription.getUnreadNotifications().putAll(notifications.getData().stream().collect(Collectors.toMap(IdBased::getUuidId, n -> n))); notifications.getData().forEach(notification -> {
subscription.getLatestUnreadNotifications().put(notification.getUuidId(), notification);
});
subscription.getTotalUnreadCounter().set((int) notifications.getTotalElements()); subscription.getTotalUnreadCounter().set((int) notifications.getTotalElements());
} }
@ -129,19 +131,30 @@ public class DefaultNotificationCommandsHandler implements NotificationCommandsH
private void handleNotificationUpdate(NotificationsSubscription subscription, NotificationUpdate update) { private void handleNotificationUpdate(NotificationsSubscription subscription, NotificationUpdate update) {
log.trace("[{}, subId: {}] Handling notification update: {}", subscription.getSessionId(), subscription.getSubscriptionId(), update); log.trace("[{}, subId: {}] Handling notification update: {}", subscription.getSessionId(), subscription.getSubscriptionId(), update);
Notification notification = update.getNotification(); Notification notification = update.getNotification();
if (notification.getStatus() == NotificationStatus.READ) { if (update.isNew()) {
subscription.getLatestUnreadNotifications().put(notification.getUuidId(), notification);
subscription.getTotalUnreadCounter().incrementAndGet();
if (subscription.getLatestUnreadNotifications().size() > subscription.getLimit()) {
Set<UUID> beyondLimit = subscription.getSortedNotifications().stream().skip(subscription.getLimit())
.map(IdBased::getUuidId).collect(Collectors.toSet());
beyondLimit.forEach(notificationId -> subscription.getLatestUnreadNotifications().remove(notificationId));
}
sendUpdate(subscription.getSessionId(), subscription.createPartialUpdate(notification));
} else {
if (notification.getStatus() != NotificationStatus.READ) {
if (subscription.getLatestUnreadNotifications().containsKey(notification.getUuidId())) {
subscription.getLatestUnreadNotifications().put(notification.getUuidId(), notification);
sendUpdate(subscription.getSessionId(), subscription.createPartialUpdate(notification));
}
} else {
if (subscription.getLatestUnreadNotifications().containsKey(notification.getUuidId())) {
fetchUnreadNotifications(subscription); fetchUnreadNotifications(subscription);
sendUpdate(subscription.getSessionId(), subscription.createFullUpdate()); sendUpdate(subscription.getSessionId(), subscription.createFullUpdate());
} else { } else {
subscription.getUnreadNotifications().put(notification.getUuidId(), notification); subscription.getTotalUnreadCounter().decrementAndGet();
if (update.isNew()) { sendUpdate(subscription.getSessionId(), subscription.createCountUpdate());
subscription.getTotalUnreadCounter().incrementAndGet(); }
Set<UUID> beyondLimit = subscription.getUnreadNotifications().keySet().stream()
.skip(subscription.getLimit())
.collect(Collectors.toSet());
beyondLimit.forEach(notificationId -> subscription.getUnreadNotifications().remove(notificationId));
} }
sendUpdate(subscription.getSessionId(), subscription.createPartialUpdate(notification));
} }
} }
@ -149,14 +162,14 @@ public class DefaultNotificationCommandsHandler implements NotificationCommandsH
log.trace("[{}, subId: {}] Handling notification request update: {}", subscription.getSessionId(), subscription.getSubscriptionId(), update); log.trace("[{}, subId: {}] Handling notification request update: {}", subscription.getSessionId(), subscription.getSubscriptionId(), update);
NotificationRequestId notificationRequestId = update.getNotificationRequestId(); NotificationRequestId notificationRequestId = update.getNotificationRequestId();
if (update.isDeleted()) { if (update.isDeleted()) {
if (subscription.getUnreadNotifications().values().stream() if (subscription.getLatestUnreadNotifications().values().stream()
.anyMatch(notification -> notification.getRequestId().equals(notificationRequestId))) { .anyMatch(notification -> notification.getRequestId().equals(notificationRequestId))) {
fetchUnreadNotifications(subscription); fetchUnreadNotifications(subscription);
sendUpdate(subscription.getSessionId(), subscription.createFullUpdate()); sendUpdate(subscription.getSessionId(), subscription.createFullUpdate());
} }
} else { } else {
NotificationInfo notificationInfo = update.getNotificationInfo(); NotificationInfo notificationInfo = update.getNotificationInfo();
subscription.getUnreadNotifications().values().stream() subscription.getLatestUnreadNotifications().values().stream()
.filter(notification -> notification.getRequestId().equals(notificationRequestId)) .filter(notification -> notification.getRequestId().equals(notificationRequestId))
.forEach(notification -> { .forEach(notification -> {
notification.setInfo(notificationInfo); notification.setInfo(notificationInfo);
@ -197,7 +210,7 @@ public class DefaultNotificationCommandsHandler implements NotificationCommandsH
private void sendUpdate(String sessionId, CmdUpdate update) { private void sendUpdate(String sessionId, CmdUpdate update) {
log.trace("[{}] Sending WS update: {}", sessionId, update); log.trace("[{}, cmdId: {}] Sending WS update: {}", sessionId, update.getCmdId(), update);
wsService.sendWsMsg(sessionId, update); wsService.sendWsMsg(sessionId, update);
} }

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.service.ws.notification.sub;
import lombok.Builder; import lombok.Builder;
import lombok.Getter; import lombok.Getter;
import org.thingsboard.server.common.data.BaseData;
import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.notification.Notification; import org.thingsboard.server.common.data.notification.Notification;
@ -24,16 +25,19 @@ import org.thingsboard.server.service.subscription.TbSubscription;
import org.thingsboard.server.service.subscription.TbSubscriptionType; import org.thingsboard.server.service.subscription.TbSubscriptionType;
import org.thingsboard.server.service.ws.notification.cmd.UnreadNotificationsUpdate; import org.thingsboard.server.service.ws.notification.cmd.UnreadNotificationsUpdate;
import java.util.LinkedHashMap; import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.stream.Collectors;
@Getter @Getter
public class NotificationsSubscription extends TbSubscription<NotificationsSubscriptionUpdate> { public class NotificationsSubscription extends TbSubscription<NotificationsSubscriptionUpdate> {
private final Map<UUID, Notification> unreadNotifications = new LinkedHashMap<>(); private final Map<UUID, Notification> latestUnreadNotifications = new HashMap<>();
private final int limit; private final int limit;
private final AtomicInteger totalUnreadCounter = new AtomicInteger(); private final AtomicInteger totalUnreadCounter = new AtomicInteger();
@ -48,11 +52,17 @@ public class NotificationsSubscription extends TbSubscription<NotificationsSubsc
public UnreadNotificationsUpdate createFullUpdate() { public UnreadNotificationsUpdate createFullUpdate() {
return UnreadNotificationsUpdate.builder() return UnreadNotificationsUpdate.builder()
.cmdId(getSubscriptionId()) .cmdId(getSubscriptionId())
.notifications(unreadNotifications.values()) .notifications(getSortedNotifications())
.totalUnreadCount(totalUnreadCounter.get()) .totalUnreadCount(totalUnreadCounter.get())
.build(); .build();
} }
public List<Notification> getSortedNotifications() {
return latestUnreadNotifications.values().stream()
.sorted(Comparator.comparing(BaseData::getCreatedTime, Comparator.reverseOrder()))
.collect(Collectors.toList());
}
public UnreadNotificationsUpdate createPartialUpdate(Notification notification) { public UnreadNotificationsUpdate createPartialUpdate(Notification notification) {
return UnreadNotificationsUpdate.builder() return UnreadNotificationsUpdate.builder()
.cmdId(getSubscriptionId()) .cmdId(getSubscriptionId())
@ -61,4 +71,11 @@ public class NotificationsSubscription extends TbSubscription<NotificationsSubsc
.build(); .build();
} }
public UnreadNotificationsUpdate createCountUpdate() {
return UnreadNotificationsUpdate.builder()
.cmdId(getSubscriptionId())
.totalUnreadCount(totalUnreadCounter.get())
.build();
}
} }

View File

@ -112,20 +112,24 @@ public class TbTestWebSocketClient extends WebSocketClient {
public String waitForUpdate(long ms) { public String waitForUpdate(long ms) {
try { try {
update.await(ms, TimeUnit.MILLISECONDS); if (update.await(ms, TimeUnit.MILLISECONDS)) {
return lastMsg;
}
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.warn("Failed to await reply", e); log.warn("Failed to await reply", e);
} }
return lastMsg; return null;
} }
public String waitForReply() { public String waitForReply() {
try { try {
reply.await(3, TimeUnit.SECONDS); if (reply.await(3, TimeUnit.SECONDS)) {
return lastMsg;
}
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.warn("Failed to await reply", e); log.warn("Failed to await reply", e);
} }
return lastMsg; return null;
} }
public EntityDataUpdate parseDataReply(String msg) { public EntityDataUpdate parseDataReply(String msg) {

View File

@ -108,8 +108,8 @@ public class NotificationApiTest extends AbstractControllerTest {
NotificationTarget notificationTarget = createNotificationTarget(tenantAdminUserId); NotificationTarget notificationTarget = createNotificationTarget(tenantAdminUserId);
String notificationText = "Notification 1"; String notificationText = "Notification 1";
submitNotificationRequest(notificationTarget.getId(), "Just a test", notificationText); submitNotificationRequest(notificationTarget.getId(), "Just a test", notificationText);
getWsClient().waitForUpdate(); assertThat(getWsClient().waitForUpdate()).isNotNull();
getAnotherWsClient().waitForUpdate(); assertThat(getAnotherWsClient().waitForUpdate()).isNotNull();
checkPartialNotificationsUpdate(getWsClient().getLastDataUpdate(), notificationText, 1); checkPartialNotificationsUpdate(getWsClient().getLastDataUpdate(), notificationText, 1);
checkPartialNotificationsUpdate(getAnotherWsClient().getLastDataUpdate(), notificationText, 1); checkPartialNotificationsUpdate(getAnotherWsClient().getLastDataUpdate(), notificationText, 1);
@ -129,16 +129,16 @@ public class NotificationApiTest extends AbstractControllerTest {
getAnotherWsClient().registerWaitForUpdate(2); getAnotherWsClient().registerWaitForUpdate(2);
String notificationText1 = "Notification 1"; String notificationText1 = "Notification 1";
submitNotificationRequest(notificationTarget.getId(), "Just a test", notificationText1); submitNotificationRequest(notificationTarget.getId(), "Just a test", notificationText1);
getWsClient().waitForUpdate(); assertThat(getWsClient().waitForUpdate()).isNotNull();
getAnotherWsClient().waitForUpdate(); assertThat(getAnotherWsClient().waitForUpdate()).isNotNull();
Notification notification1 = getWsClient().getLastDataUpdate().getUpdate(); Notification notification1 = getWsClient().getLastDataUpdate().getUpdate();
getWsClient().registerWaitForUpdate(); getWsClient().registerWaitForUpdate();
getAnotherWsClient().registerWaitForUpdate(2); getAnotherWsClient().registerWaitForUpdate(2);
String notificationText2 = "Notification 2"; String notificationText2 = "Notification 2";
submitNotificationRequest(notificationTarget.getId(), "Just a test", notificationText2); submitNotificationRequest(notificationTarget.getId(), "Just a test", notificationText2);
getWsClient().waitForUpdate(); assertThat(getWsClient().waitForUpdate()).isNotNull();
getAnotherWsClient().waitForUpdate(); assertThat(getAnotherWsClient().waitForUpdate()).isNotNull();
assertThat(getWsClient().getLastDataUpdate().getTotalUnreadCount()).isEqualTo(2); assertThat(getWsClient().getLastDataUpdate().getTotalUnreadCount()).isEqualTo(2);
assertThat(getAnotherWsClient().getLastDataUpdate().getTotalUnreadCount()).isEqualTo(2); assertThat(getAnotherWsClient().getLastDataUpdate().getTotalUnreadCount()).isEqualTo(2);
assertThat(getAnotherWsClient().getLastCountUpdate().getTotalUnreadCount()).isEqualTo(2); assertThat(getAnotherWsClient().getLastCountUpdate().getTotalUnreadCount()).isEqualTo(2);
@ -146,8 +146,8 @@ public class NotificationApiTest extends AbstractControllerTest {
getWsClient().registerWaitForUpdate(); getWsClient().registerWaitForUpdate();
getAnotherWsClient().registerWaitForUpdate(2); getAnotherWsClient().registerWaitForUpdate(2);
getWsClient().markNotificationAsRead(notification1.getUuidId()); getWsClient().markNotificationAsRead(notification1.getUuidId());
getWsClient().waitForUpdate(); assertThat(getWsClient().waitForUpdate()).isNotNull();
getAnotherWsClient().waitForUpdate(); assertThat(getAnotherWsClient().waitForUpdate()).isNotNull();
checkFullNotificationsUpdate(getWsClient().getLastDataUpdate(), notificationText2); checkFullNotificationsUpdate(getWsClient().getLastDataUpdate(), notificationText2);
checkFullNotificationsUpdate(getAnotherWsClient().getLastDataUpdate(), notificationText2); checkFullNotificationsUpdate(getAnotherWsClient().getLastDataUpdate(), notificationText2);

View File

@ -20,6 +20,7 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomUtils; import org.apache.commons.lang3.RandomUtils;
import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.notification.Notification;
import org.thingsboard.server.controller.TbTestWebSocketClient; import org.thingsboard.server.controller.TbTestWebSocketClient;
import org.thingsboard.server.service.ws.notification.cmd.MarkNotificationsAsReadCmd; import org.thingsboard.server.service.ws.notification.cmd.MarkNotificationsAsReadCmd;
import org.thingsboard.server.service.ws.notification.cmd.NotificationCmdsWrapper; import org.thingsboard.server.service.ws.notification.cmd.NotificationCmdsWrapper;
@ -31,17 +32,22 @@ import org.thingsboard.server.service.ws.telemetry.cmd.v2.CmdUpdateType;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List;
import java.util.UUID; import java.util.UUID;
@Slf4j @Slf4j
@Getter
public class NotificationApiWsClient extends TbTestWebSocketClient { public class NotificationApiWsClient extends TbTestWebSocketClient {
@Getter
private UnreadNotificationsUpdate lastDataUpdate; private UnreadNotificationsUpdate lastDataUpdate;
@Getter
private UnreadNotificationsCountUpdate lastCountUpdate; private UnreadNotificationsCountUpdate lastCountUpdate;
private int limit;
private int unreadCount;
private List<Notification> notifications;
public NotificationApiWsClient(String wsUrl, String token) throws URISyntaxException { public NotificationApiWsClient(String wsUrl, String token) throws URISyntaxException {
super(new URI(wsUrl + "/api/ws/plugins/notifications?token=" + token)); super(new URI(wsUrl + "/api/ws/plugins/notifications?token=" + token));
} }
@ -50,6 +56,7 @@ public class NotificationApiWsClient extends TbTestWebSocketClient {
NotificationCmdsWrapper cmdsWrapper = new NotificationCmdsWrapper(); NotificationCmdsWrapper cmdsWrapper = new NotificationCmdsWrapper();
cmdsWrapper.setUnreadSubCmd(new NotificationsSubCmd(1, limit)); cmdsWrapper.setUnreadSubCmd(new NotificationsSubCmd(1, limit));
sendCmd(cmdsWrapper); sendCmd(cmdsWrapper);
this.limit = limit;
} }
public void subscribeForUnreadNotificationsCount() { public void subscribeForUnreadNotificationsCount() {
@ -75,8 +82,30 @@ public class NotificationApiWsClient extends TbTestWebSocketClient {
CmdUpdateType updateType = CmdUpdateType.valueOf(update.get("cmdUpdateType").asText()); CmdUpdateType updateType = CmdUpdateType.valueOf(update.get("cmdUpdateType").asText());
if (updateType == CmdUpdateType.NOTIFICATIONS) { if (updateType == CmdUpdateType.NOTIFICATIONS) {
lastDataUpdate = JacksonUtil.treeToValue(update, UnreadNotificationsUpdate.class); lastDataUpdate = JacksonUtil.treeToValue(update, UnreadNotificationsUpdate.class);
unreadCount = lastDataUpdate.getTotalUnreadCount();
if (lastDataUpdate.getNotifications() != null) {
notifications = new ArrayList<>(lastDataUpdate.getNotifications());
} else {
Notification notificationUpdate = lastDataUpdate.getUpdate();
boolean updated = false;
for (int i = 0; i < notifications.size(); i++) {
Notification existing = notifications.get(i);
if (existing.getId().equals(notificationUpdate.getId())) {
notifications.set(i, notificationUpdate);
updated = true;
break;
}
}
if (!updated) {
notifications.add(0, notificationUpdate);
if (notifications.size() > limit) {
notifications = notifications.subList(0, limit);
}
}
}
} else if (updateType == CmdUpdateType.NOTIFICATIONS_COUNT) { } else if (updateType == CmdUpdateType.NOTIFICATIONS_COUNT) {
lastCountUpdate = JacksonUtil.treeToValue(update, UnreadNotificationsCountUpdate.class); lastCountUpdate = JacksonUtil.treeToValue(update, UnreadNotificationsCountUpdate.class);
unreadCount = lastCountUpdate.getTotalUnreadCount();
} }
super.onMessage(s); super.onMessage(s);
} }

View File

@ -970,6 +970,7 @@ message ToCoreNotificationMsg {
VersionControlResponseMsg vcResponseMsg = 7; VersionControlResponseMsg vcResponseMsg = 7;
bytes toEdgeSyncRequestMsg = 8; bytes toEdgeSyncRequestMsg = 8;
bytes fromEdgeSyncResponseMsg = 9; bytes fromEdgeSyncResponseMsg = 9;
SubscriptionMgrMsgProto toSubscriptionMgrMsg = 10;
} }
/* Messages that are handled by ThingsBoard RuleEngine Service */ /* Messages that are handled by ThingsBoard RuleEngine Service */

View File

@ -18,9 +18,15 @@ package org.thingsboard.server.dao.notification;
import org.thingsboard.server.common.data.id.NotificationRuleId; import org.thingsboard.server.common.data.id.NotificationRuleId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.notification.rule.NotificationRule; import org.thingsboard.server.common.data.notification.rule.NotificationRule;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
public interface NotificationRuleService { public interface NotificationRuleService {
NotificationRule saveNotificationRule(TenantId tenantId, NotificationRule notificationRule);
NotificationRule findNotificationRuleById(TenantId tenantId, NotificationRuleId notificationRuleId); NotificationRule findNotificationRuleById(TenantId tenantId, NotificationRuleId notificationRuleId);
PageData<NotificationRule> findNotificationRulesByTenantId(TenantId tenantId, PageLink pageLink);
} }

View File

@ -29,7 +29,7 @@ public interface NotificationTargetService {
NotificationTarget findNotificationTargetById(TenantId tenantId, NotificationTargetId id); NotificationTarget findNotificationTargetById(TenantId tenantId, NotificationTargetId id);
PageData<NotificationTarget> findNotificationTargetsByTenantIdAndPageLink(TenantId tenantId, PageLink pageLink); PageData<NotificationTarget> findNotificationTargetsByTenantId(TenantId tenantId, PageLink pageLink);
PageData<User> findRecipientsForNotificationTarget(TenantId tenantId, NotificationTargetId notificationTargetId, PageLink pageLink); PageData<User> findRecipientsForNotificationTarget(TenantId tenantId, NotificationTargetId notificationTargetId, PageLink pageLink);

View File

@ -21,5 +21,5 @@ package org.thingsboard.server.common.data;
public enum EntityType { public enum EntityType {
TENANT, CUSTOMER, USER, DASHBOARD, ASSET, DEVICE, ALARM, RULE_CHAIN, RULE_NODE, ENTITY_VIEW, WIDGETS_BUNDLE, WIDGET_TYPE, TENANT, CUSTOMER, USER, DASHBOARD, ASSET, DEVICE, ALARM, RULE_CHAIN, RULE_NODE, ENTITY_VIEW, WIDGETS_BUNDLE, WIDGET_TYPE,
TENANT_PROFILE, DEVICE_PROFILE, ASSET_PROFILE, API_USAGE_STATE, TB_RESOURCE, OTA_PACKAGE, EDGE, RPC, QUEUE, TENANT_PROFILE, DEVICE_PROFILE, ASSET_PROFILE, API_USAGE_STATE, TB_RESOURCE, OTA_PACKAGE, EDGE, RPC, QUEUE,
NOTIFICATION_TARGET, NOTIFICATION_REQUEST; NOTIFICATION_TARGET, NOTIFICATION_REQUEST, NOTIFICATION_RULE;
} }

View File

@ -85,6 +85,8 @@ public class EntityIdFactory {
return new NotificationTargetId(uuid); return new NotificationTargetId(uuid);
case NOTIFICATION_REQUEST: case NOTIFICATION_REQUEST:
return new NotificationRequestId(uuid); return new NotificationRequestId(uuid);
case NOTIFICATION_RULE:
return new NotificationRuleId(uuid);
} }
throw new IllegalArgumentException("EntityType " + type + " is not supported!"); throw new IllegalArgumentException("EntityType " + type + " is not supported!");
} }

View File

@ -21,16 +21,16 @@ import org.thingsboard.server.common.data.EntityType;
import java.util.UUID; import java.util.UUID;
public class NotificationRuleId extends UUIDBased { public class NotificationRuleId extends UUIDBased implements EntityId {
@JsonCreator @JsonCreator
public NotificationRuleId(@JsonProperty("id") UUID id) { public NotificationRuleId(@JsonProperty("id") UUID id) {
super(id); super(id);
} }
// @Override @Override
// public EntityType getEntityType() { public EntityType getEntityType() {
// return EntityType.NOTIFICATION_TARGET; return EntityType.NOTIFICATION_RULE;
// } }
} }

View File

@ -56,7 +56,7 @@ public class NotificationRequest extends BaseData<NotificationRequestId> impleme
private NotificationOriginatorType originatorType; private NotificationOriginatorType originatorType;
private EntityId originatorEntityId; // userId, alarmId or tenantId private EntityId originatorEntityId; // userId, alarmId or tenantId
private NotificationRuleId ruleId; // maybe move to child class private NotificationRuleId ruleId;
private NotificationRequestConfig additionalConfig; private NotificationRequestConfig additionalConfig;
private NotificationRequestStatus status; private NotificationRequestStatus status;

View File

@ -0,0 +1,31 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.data.notification.rule;
import lombok.Data;
import javax.validation.Valid;
import javax.validation.constraints.NotNull;
import java.util.List;
@Data
public class NotificationEscalationConfig {
@NotNull
@Valid
private List<NonConfirmedNotificationEscalation> escalations;
}

View File

@ -27,10 +27,6 @@ import org.thingsboard.server.common.data.id.TenantId;
import javax.validation.Valid; import javax.validation.Valid;
import javax.validation.constraints.NotBlank; import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull; import javax.validation.constraints.NotNull;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
@Data @Data
@EqualsAndHashCode(callSuper = true) @EqualsAndHashCode(callSuper = true)
@ -44,8 +40,7 @@ public class NotificationRule extends BaseData<NotificationRuleId> implements Ha
private String notificationTextTemplate; private String notificationTextTemplate;
@NotNull @NotNull
private NotificationTargetId initialNotificationTargetId; private NotificationTargetId initialNotificationTargetId;
@NotNull
@Valid @Valid
private List<NonConfirmedNotificationEscalation> escalations; private NotificationEscalationConfig escalationConfig;
} }

View File

@ -677,7 +677,9 @@ public class ModelConstants {
public static final String NOTIFICATION_REQUEST_RULE_ID_PROPERTY = "rule_id"; public static final String NOTIFICATION_REQUEST_RULE_ID_PROPERTY = "rule_id";
public static final String NOTIFICATION_RULE_TABLE_NAME = "notification_rule"; public static final String NOTIFICATION_RULE_TABLE_NAME = "notification_rule";
// ... public static final String NOTIFICATION_RULE_NOTIFICATION_TEXT_TEMPLATE_PROPERTY = "notification_text_template";
public static final String NOTIFICATION_RULE_INITIAL_NOTIFICATION_TARGET_ID_PROPERTY = "initial_notification_target_id";
public static final String NOTIFICATION_RULE_ESCALATION_CONFIG_PROPERTY = "escalation_config";
protected static final String[] NONE_AGGREGATION_COLUMNS = new String[]{LONG_VALUE_COLUMN, DOUBLE_VALUE_COLUMN, BOOLEAN_VALUE_COLUMN, STRING_VALUE_COLUMN, JSON_VALUE_COLUMN, KEY_COLUMN, TS_COLUMN}; protected static final String[] NONE_AGGREGATION_COLUMNS = new String[]{LONG_VALUE_COLUMN, DOUBLE_VALUE_COLUMN, BOOLEAN_VALUE_COLUMN, STRING_VALUE_COLUMN, JSON_VALUE_COLUMN, KEY_COLUMN, TS_COLUMN};

View File

@ -15,11 +15,15 @@
*/ */
package org.thingsboard.server.dao.model.sql; package org.thingsboard.server.dao.model.sql;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode; import lombok.EqualsAndHashCode;
import org.hibernate.annotations.Type;
import org.hibernate.annotations.TypeDef; import org.hibernate.annotations.TypeDef;
import org.thingsboard.server.common.data.id.NotificationRuleId; import org.thingsboard.server.common.data.id.NotificationRuleId;
import org.thingsboard.server.common.data.id.NotificationTargetId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.notification.rule.NotificationEscalationConfig;
import org.thingsboard.server.common.data.notification.rule.NotificationRule; import org.thingsboard.server.common.data.notification.rule.NotificationRule;
import org.thingsboard.server.dao.model.BaseSqlEntity; import org.thingsboard.server.dao.model.BaseSqlEntity;
import org.thingsboard.server.dao.model.ModelConstants; import org.thingsboard.server.dao.model.ModelConstants;
@ -42,6 +46,15 @@ public class NotificationRuleEntity extends BaseSqlEntity<NotificationRule> {
@Column(name = ModelConstants.NAME_PROPERTY, nullable = false) @Column(name = ModelConstants.NAME_PROPERTY, nullable = false)
private String name; private String name;
@Column(name = ModelConstants.NOTIFICATION_RULE_NOTIFICATION_TEXT_TEMPLATE_PROPERTY, nullable = false)
private String notificationTextTemplate;
@Column(name = ModelConstants.NOTIFICATION_RULE_INITIAL_NOTIFICATION_TARGET_ID_PROPERTY)
private UUID initialNotificationTargetId;
@Type(type = "json")
@Column(name = ModelConstants.NOTIFICATION_RULE_ESCALATION_CONFIG_PROPERTY)
private JsonNode escalationConfig;
public NotificationRuleEntity() {} public NotificationRuleEntity() {}
@ -50,6 +63,9 @@ public class NotificationRuleEntity extends BaseSqlEntity<NotificationRule> {
setCreatedTime(notificationRule.getCreatedTime()); setCreatedTime(notificationRule.getCreatedTime());
setTenantId(getUuid(notificationRule.getTenantId())); setTenantId(getUuid(notificationRule.getTenantId()));
setName(notificationRule.getName()); setName(notificationRule.getName());
setNotificationTextTemplate(notificationRule.getNotificationTextTemplate());
setInitialNotificationTargetId(getUuid(notificationRule.getInitialNotificationTargetId()));
setEscalationConfig(toJson(notificationRule.getEscalationConfig()));
} }
@Override @Override
@ -59,6 +75,9 @@ public class NotificationRuleEntity extends BaseSqlEntity<NotificationRule> {
notificationRule.setCreatedTime(createdTime); notificationRule.setCreatedTime(createdTime);
notificationRule.setTenantId(createId(tenantId, TenantId::fromUUID)); notificationRule.setTenantId(createId(tenantId, TenantId::fromUUID));
notificationRule.setName(name); notificationRule.setName(name);
notificationRule.setNotificationTextTemplate(notificationTextTemplate);
notificationRule.setInitialNotificationTargetId(createId(initialNotificationTargetId, NotificationTargetId::new));
notificationRule.setEscalationConfig(fromJson(escalationConfig, NotificationEscalationConfig.class));
return notificationRule; return notificationRule;
} }

View File

@ -20,6 +20,9 @@ import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.id.NotificationRuleId; import org.thingsboard.server.common.data.id.NotificationRuleId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.notification.rule.NotificationRule; import org.thingsboard.server.common.data.notification.rule.NotificationRule;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.dao.service.DataValidator;
@Service @Service
@RequiredArgsConstructor @RequiredArgsConstructor
@ -27,9 +30,25 @@ public class DefaultNotificationRuleService implements NotificationRuleService {
private final NotificationRuleDao notificationRuleDao; private final NotificationRuleDao notificationRuleDao;
private final NotificationRuleValidator validator = new NotificationRuleValidator();
@Override
public NotificationRule saveNotificationRule(TenantId tenantId, NotificationRule notificationRule) {
validator.validate(notificationRule, NotificationRule::getTenantId);
return notificationRuleDao.save(tenantId, notificationRule);
}
@Override @Override
public NotificationRule findNotificationRuleById(TenantId tenantId, NotificationRuleId notificationRuleId) { public NotificationRule findNotificationRuleById(TenantId tenantId, NotificationRuleId notificationRuleId) {
return notificationRuleDao.findById(tenantId, notificationRuleId.getId()); return notificationRuleDao.findById(tenantId, notificationRuleId.getId());
} }
@Override
public PageData<NotificationRule> findNotificationRulesByTenantId(TenantId tenantId, PageLink pageLink) {
return notificationRuleDao.findByTenantIdAndPageLink(tenantId, pageLink);
}
private static class NotificationRuleValidator extends DataValidator<NotificationRule> {
}
} }

View File

@ -57,7 +57,7 @@ public class DefaultNotificationTargetService implements NotificationTargetServi
} }
@Override @Override
public PageData<NotificationTarget> findNotificationTargetsByTenantIdAndPageLink(TenantId tenantId, PageLink pageLink) { public PageData<NotificationTarget> findNotificationTargetsByTenantId(TenantId tenantId, PageLink pageLink) {
return notificationTargetDao.findByTenantIdAndPageLink(tenantId, pageLink); return notificationTargetDao.findByTenantIdAndPageLink(tenantId, pageLink);
} }

View File

@ -15,8 +15,14 @@
*/ */
package org.thingsboard.server.dao.notification; package org.thingsboard.server.dao.notification;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.notification.rule.NotificationRule; import org.thingsboard.server.common.data.notification.rule.NotificationRule;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.dao.Dao; import org.thingsboard.server.dao.Dao;
public interface NotificationRuleDao extends Dao<NotificationRule> { public interface NotificationRuleDao extends Dao<NotificationRule> {
PageData<NotificationRule> findByTenantIdAndPageLink(TenantId tenantId, PageLink pageLink);
} }

View File

@ -15,10 +15,15 @@
*/ */
package org.thingsboard.server.dao.sql.notification; package org.thingsboard.server.dao.sql.notification;
import com.google.common.base.Strings;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.notification.rule.NotificationRule; import org.thingsboard.server.common.data.notification.rule.NotificationRule;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.dao.DaoUtil;
import org.thingsboard.server.dao.model.sql.NotificationRuleEntity; import org.thingsboard.server.dao.model.sql.NotificationRuleEntity;
import org.thingsboard.server.dao.notification.NotificationRuleDao; import org.thingsboard.server.dao.notification.NotificationRuleDao;
import org.thingsboard.server.dao.sql.JpaAbstractDao; import org.thingsboard.server.dao.sql.JpaAbstractDao;
@ -33,6 +38,12 @@ public class JpaNotificationRuleDao extends JpaAbstractDao<NotificationRuleEntit
private final NotificationRuleRepository notificationRuleRepository; private final NotificationRuleRepository notificationRuleRepository;
@Override
public PageData<NotificationRule> findByTenantIdAndPageLink(TenantId tenantId, PageLink pageLink) {
return DaoUtil.toPageData(notificationRuleRepository.findByTenantIdAndNameContainingIgnoreCase(tenantId.getId(),
Strings.nullToEmpty(pageLink.getTextSearch()), DaoUtil.toPageable(pageLink)));
}
@Override @Override
protected Class<NotificationRuleEntity> getEntityClass() { protected Class<NotificationRuleEntity> getEntityClass() {
return NotificationRuleEntity.class; return NotificationRuleEntity.class;

View File

@ -15,6 +15,8 @@
*/ */
package org.thingsboard.server.dao.sql.notification; package org.thingsboard.server.dao.sql.notification;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository; import org.springframework.stereotype.Repository;
import org.thingsboard.server.dao.model.sql.NotificationRuleEntity; import org.thingsboard.server.dao.model.sql.NotificationRuleEntity;
@ -23,4 +25,7 @@ import java.util.UUID;
@Repository @Repository
public interface NotificationRuleRepository extends JpaRepository<NotificationRuleEntity, UUID> { public interface NotificationRuleRepository extends JpaRepository<NotificationRuleEntity, UUID> {
Page<NotificationRuleEntity> findByTenantIdAndNameContainingIgnoreCase(UUID tenantId, String searchText, Pageable pageable);
} }

View File

@ -788,7 +788,13 @@ CREATE TABLE IF NOT EXISTS notification_target (
); );
CREATE TABLE IF NOT EXISTS notification_rule ( CREATE TABLE IF NOT EXISTS notification_rule (
id UUID NOT NULL CONSTRAINT notification_rule_pkey PRIMARY KEY id UUID NOT NULL CONSTRAINT notification_rule_pkey PRIMARY KEY,
created_time BIGINT NOT NULL,
tenant_id UUID NOT NULL,
name VARCHAR(255) NOT NULL,
notification_text_template VARCHAR NOT NULL,
initial_notification_target_id UUID NULL CONSTRAINT fk_notification_rule_target_id REFERENCES notification_target(id),
escalation_config VARCHAR(500)
); );
CREATE TABLE IF NOT EXISTS notification_request ( CREATE TABLE IF NOT EXISTS notification_request (