Fix for RemoteNotificationRuleProcessor

This commit is contained in:
ViacheslavKlimov 2023-04-07 13:14:23 +03:00
parent 20ec28b0c3
commit aef033b6c5
5 changed files with 33 additions and 15 deletions

View File

@ -35,12 +35,12 @@ import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.common.data.rpc.RpcError;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
import org.thingsboard.server.common.msg.notification.trigger.NotificationRuleTrigger;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse;
import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.DeviceStateServiceMsgProto;
@ -274,9 +274,6 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
TransportProtos.NotificationSchedulerServiceMsg notificationSchedulerServiceMsg = toCoreMsg.getNotificationSchedulerServiceMsg();
log.trace("[{}] Forwarding message to notification scheduler service {}", id, toCoreMsg.getNotificationSchedulerServiceMsg());
forwardToNotificationSchedulerService(notificationSchedulerServiceMsg, callback);
} else if (toCoreMsg.hasNotificationRuleProcessorMsg()) {
Optional<NotificationRuleTrigger> notificationRuleTrigger = encodingService.decode(toCoreMsg.getNotificationRuleProcessorMsg().getTrigger().toByteArray());
notificationRuleTrigger.ifPresent(notificationRuleProcessor::process);
}
} catch (Throwable e) {
log.warn("[{}] Failed to process message: {}", id, msg, e);
@ -361,6 +358,11 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
callback.onSuccess();
} else if (toCoreNotification.hasToSubscriptionMgrMsg()) {
forwardToSubMgrService(toCoreNotification.getToSubscriptionMgrMsg(), callback);
} else if (toCoreNotification.hasNotificationRuleProcessorMsg()) {
Optional<NotificationRuleTrigger> notificationRuleTrigger = encodingService.decode(toCoreNotification
.getNotificationRuleProcessorMsg().getTrigger().toByteArray());
notificationRuleTrigger.ifPresent(notificationRuleProcessor::process);
callback.onSuccess();
}
if (statsEnabled) {
stats.log(toCoreNotification);

View File

@ -968,7 +968,6 @@ message ToCoreMsg {
EdgeNotificationMsgProto edgeNotificationMsg = 5;
DeviceActivityProto deviceActivityMsg = 6;
NotificationSchedulerServiceMsg notificationSchedulerServiceMsg = 7;
NotificationRuleProcessorMsg notificationRuleProcessorMsg = 8;
}
/* High priority messages with low latency are handled by ThingsBoard Core Service separately */
@ -983,6 +982,7 @@ message ToCoreNotificationMsg {
bytes toEdgeSyncRequestMsg = 8;
bytes fromEdgeSyncResponseMsg = 9;
SubscriptionMgrMsgProto toSubscriptionMgrMsg = 10;
NotificationRuleProcessorMsg notificationRuleProcessorMsg = 11;
}
/* Messages that are handled by ThingsBoard RuleEngine Service */

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.queue.notification;
import com.google.protobuf.ByteString;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
@ -25,6 +26,7 @@ import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.NotificationsTopicService;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
@ -32,24 +34,33 @@ import org.thingsboard.server.queue.util.DataDecodingEncodingService;
import java.util.UUID;
@Service
@ConditionalOnMissingBean(NotificationRuleProcessor.class)
@ConditionalOnMissingBean(value = NotificationRuleProcessor.class, ignored = RemoteNotificationRuleProcessor.class)
@RequiredArgsConstructor
@Slf4j
public class RemoteNotificationRuleProcessor implements NotificationRuleProcessor {
private final TbQueueProducerProvider producerProvider;
private final NotificationsTopicService notificationsTopicService;
private final PartitionService partitionService;
private final DataDecodingEncodingService encodingService;
@Override
public void process(NotificationRuleTrigger trigger) {
TransportProtos.NotificationRuleProcessorMsg.Builder msg = TransportProtos.NotificationRuleProcessorMsg.newBuilder()
.setTrigger(ByteString.copyFrom(encodingService.encode(trigger)));
try {
log.trace("Submitting notification rule trigger: {}", trigger);
TransportProtos.NotificationRuleProcessorMsg.Builder msg = TransportProtos.NotificationRuleProcessorMsg.newBuilder()
.setTrigger(ByteString.copyFrom(encodingService.encode(trigger)));
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, trigger.getTenantId(), trigger.getOriginatorEntityId());
producerProvider.getTbCoreMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(),
TransportProtos.ToCoreMsg.newBuilder()
.setNotificationRuleProcessorMsg(msg)
.build()), null);
partitionService.getAllServiceIds(ServiceType.TB_CORE).stream().findAny().ifPresent(serviceId -> {
TopicPartitionInfo tpi = notificationsTopicService.getNotificationsTopic(ServiceType.TB_CORE, serviceId);
producerProvider.getTbCoreNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(),
TransportProtos.ToCoreNotificationMsg.newBuilder()
.setNotificationRuleProcessorMsg(msg)
.build()), null);
});
} catch (Exception e) {
log.error("Failed to submit notification rule trigger: {}", trigger, e);
}
}
}

View File

@ -42,8 +42,7 @@ public class DefaultNotificationRuleService extends AbstractEntityService implem
@Override
public NotificationRule saveNotificationRule(TenantId tenantId, NotificationRule notificationRule) {
boolean created = notificationRule.getId() == null;
if (!created) {
if (notificationRule.getId() != null) {
NotificationRule oldNotificationRule = findNotificationRuleById(tenantId, notificationRule.getId());
if (notificationRule.getTriggerType() != oldNotificationRule.getTriggerType()) {
throw new IllegalArgumentException("Notification rule trigger type cannot be updated");

View File

@ -48,6 +48,12 @@ public class DefaultNotificationTemplateService extends AbstractEntityService im
@Override
public NotificationTemplate saveNotificationTemplate(TenantId tenantId, NotificationTemplate notificationTemplate) {
if (notificationTemplate.getId() != null) {
NotificationTemplate oldNotificationTemplate = findNotificationTemplateById(tenantId, notificationTemplate.getId());
if (notificationTemplate.getNotificationType() != oldNotificationTemplate.getNotificationType()) {
throw new IllegalArgumentException("Notification type cannot be updated");
}
}
try {
return notificationTemplateDao.saveAndFlush(tenantId, notificationTemplate);
} catch (Exception e) {