Merge branch 'improvements/notification-system' of github.com:thingsboard/thingsboard into improvements/notification-system

This commit is contained in:
Vladyslav_Prykhodko 2023-04-04 18:09:19 +03:00
commit 3b08ce56e0
15 changed files with 107 additions and 75 deletions

View File

@ -71,7 +71,6 @@ import org.thingsboard.server.dao.event.EventService;
import org.thingsboard.server.dao.nosql.CassandraBufferedRateReadExecutor; import org.thingsboard.server.dao.nosql.CassandraBufferedRateReadExecutor;
import org.thingsboard.server.dao.nosql.CassandraBufferedRateWriteExecutor; import org.thingsboard.server.dao.nosql.CassandraBufferedRateWriteExecutor;
import org.thingsboard.server.dao.notification.NotificationRequestService; import org.thingsboard.server.dao.notification.NotificationRequestService;
import org.thingsboard.server.dao.notification.NotificationRuleProcessingService;
import org.thingsboard.server.dao.notification.NotificationRuleService; import org.thingsboard.server.dao.notification.NotificationRuleService;
import org.thingsboard.server.dao.notification.NotificationTargetService; import org.thingsboard.server.dao.notification.NotificationTargetService;
import org.thingsboard.server.dao.notification.NotificationTemplateService; import org.thingsboard.server.dao.notification.NotificationTemplateService;
@ -90,6 +89,7 @@ import org.thingsboard.server.dao.widget.WidgetTypeService;
import org.thingsboard.server.dao.widget.WidgetsBundleService; import org.thingsboard.server.dao.widget.WidgetsBundleService;
import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
import org.thingsboard.server.queue.util.DataDecodingEncodingService; import org.thingsboard.server.queue.util.DataDecodingEncodingService;
import org.thingsboard.server.service.apiusage.TbApiUsageStateService; import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
import org.thingsboard.server.service.component.ComponentDiscoveryService; import org.thingsboard.server.service.component.ComponentDiscoveryService;
@ -341,7 +341,7 @@ public class ActorSystemContext {
@Autowired @Autowired
@Getter @Getter
private NotificationRuleProcessingService notificationRuleProcessingService; private NotificationRuleProcessor notificationRuleProcessor;
@Autowired @Autowired
@Getter @Getter

View File

@ -50,7 +50,7 @@ public abstract class RuleEngineComponentActor<T extends EntityId, P extends Com
} }
private void processNotificationRule(ComponentLifecycleEvent event, Throwable e) { private void processNotificationRule(ComponentLifecycleEvent event, Throwable e) {
systemContext.getNotificationRuleProcessingService().process(RuleEngineComponentLifecycleEventTrigger.builder() systemContext.getNotificationRuleProcessor().process(RuleEngineComponentLifecycleEventTrigger.builder()
.tenantId(tenantId) .tenantId(tenantId)
.ruleChainId(getRuleChainId()) .ruleChainId(getRuleChainId())
.ruleChainName(getRuleChainName()) .ruleChainName(getRuleChainName())

View File

@ -49,6 +49,7 @@ import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.aware.DeviceAwareMsg; import org.thingsboard.server.common.msg.aware.DeviceAwareMsg;
import org.thingsboard.server.common.msg.aware.RuleChainAwareMsg; import org.thingsboard.server.common.msg.aware.RuleChainAwareMsg;
import org.thingsboard.server.common.msg.edge.EdgeSessionMsg; import org.thingsboard.server.common.msg.edge.EdgeSessionMsg;
import org.thingsboard.server.common.msg.notification.trigger.RuleEngineMsgTrigger;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.queue.PartitionChangeMsg; import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg; import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg;
@ -209,7 +210,10 @@ public class TenantActor extends RuleChainManagerActor {
log.trace("[{}] Ack message because Rule Engine is disabled", tenantId); log.trace("[{}] Ack message because Rule Engine is disabled", tenantId);
tbMsg.getCallback().onSuccess(); tbMsg.getCallback().onSuccess();
} }
systemContext.getNotificationRuleProcessingService().process(tenantId, tbMsg); systemContext.getNotificationRuleProcessor().process(RuleEngineMsgTrigger.builder()
.tenantId(tenantId)
.msg(tbMsg)
.build());
} }
private void onRuleChainMsg(RuleChainAwareMsg msg) { private void onRuleChainMsg(RuleChainAwareMsg msg) {

View File

@ -52,7 +52,7 @@ import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.common.msg.tools.SchedulerUtils; import org.thingsboard.server.common.msg.tools.SchedulerUtils;
import org.thingsboard.server.dao.notification.NotificationRuleProcessingService; import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
import org.thingsboard.server.common.msg.notification.trigger.ApiUsageLimitTrigger; import org.thingsboard.server.common.msg.notification.trigger.ApiUsageLimitTrigger;
import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.dao.tenant.TenantService;
@ -107,7 +107,7 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService
private final ApiUsageStateService apiUsageStateService; private final ApiUsageStateService apiUsageStateService;
private final TbTenantProfileCache tenantProfileCache; private final TbTenantProfileCache tenantProfileCache;
private final MailService mailService; private final MailService mailService;
private final NotificationRuleProcessingService notificationRuleProcessingService; private final NotificationRuleProcessor notificationRuleProcessor;
private final DbCallbackExecutorService dbExecutor; private final DbCallbackExecutorService dbExecutor;
private final MailExecutorService mailExecutor; private final MailExecutorService mailExecutor;
@ -341,7 +341,7 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService
String email = tenantService.findTenantById(state.getTenantId()).getEmail(); String email = tenantService.findTenantById(state.getTenantId()).getEmail();
result.forEach((apiFeature, stateValue) -> { result.forEach((apiFeature, stateValue) -> {
ApiUsageRecordState recordState = createApiUsageRecordState((TenantApiUsageState) state, apiFeature, stateValue); ApiUsageRecordState recordState = createApiUsageRecordState((TenantApiUsageState) state, apiFeature, stateValue);
notificationRuleProcessingService.process(ApiUsageLimitTrigger.builder() notificationRuleProcessor.process(ApiUsageLimitTrigger.builder()
.tenantId(state.getTenantId()) .tenantId(state.getTenantId())
.state(recordState) .state(recordState)
.status(stateValue) .status(stateValue)

View File

@ -35,13 +35,12 @@ import org.thingsboard.server.common.data.notification.rule.NotificationRule;
import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTriggerConfig; import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTriggerConfig;
import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTriggerType; import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTriggerType;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.dao.notification.NotificationRequestService;
import org.thingsboard.server.dao.notification.NotificationRuleProcessingService;
import org.thingsboard.server.dao.notification.NotificationRuleService;
import org.thingsboard.server.common.msg.notification.trigger.NotificationRuleTrigger; import org.thingsboard.server.common.msg.notification.trigger.NotificationRuleTrigger;
import org.thingsboard.server.common.msg.notification.trigger.RuleEngineMsgTrigger; import org.thingsboard.server.common.msg.notification.trigger.RuleEngineMsgTrigger;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.dao.notification.NotificationRequestService;
import org.thingsboard.server.dao.notification.NotificationRuleService;
import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
import org.thingsboard.server.service.executors.NotificationExecutorService; import org.thingsboard.server.service.executors.NotificationExecutorService;
import org.thingsboard.server.service.notification.rule.trigger.NotificationRuleTriggerProcessor; import org.thingsboard.server.service.notification.rule.trigger.NotificationRuleTriggerProcessor;
import org.thingsboard.server.service.notification.rule.trigger.RuleEngineMsgNotificationRuleTriggerProcessor; import org.thingsboard.server.service.notification.rule.trigger.RuleEngineMsgNotificationRuleTriggerProcessor;
@ -59,7 +58,7 @@ import java.util.stream.Collectors;
@RequiredArgsConstructor @RequiredArgsConstructor
@Slf4j @Slf4j
@SuppressWarnings({"rawtypes", "unchecked"}) @SuppressWarnings({"rawtypes", "unchecked"})
public class DefaultNotificationRuleProcessingService implements NotificationRuleProcessingService { public class DefaultNotificationRuleProcessor implements NotificationRuleProcessor {
private final NotificationRuleService notificationRuleService; private final NotificationRuleService notificationRuleService;
private final NotificationRequestService notificationRequestService; private final NotificationRequestService notificationRequestService;
@ -69,12 +68,13 @@ public class DefaultNotificationRuleProcessingService implements NotificationRul
private final Map<NotificationRuleTriggerType, NotificationRuleTriggerProcessor> triggerProcessors = new EnumMap<>(NotificationRuleTriggerType.class); private final Map<NotificationRuleTriggerType, NotificationRuleTriggerProcessor> triggerProcessors = new EnumMap<>(NotificationRuleTriggerType.class);
private final Map<String, NotificationRuleTriggerType> ruleEngineMsgTypeToTriggerType = new HashMap<>();
@Override @Override
public void process(NotificationRuleTrigger trigger) { public void process(NotificationRuleTrigger trigger) {
List<NotificationRule> rules = notificationRuleService.findNotificationRulesByTenantIdAndTriggerType( NotificationRuleTriggerType triggerType = trigger.getType();
trigger.getType().isTenantLevel() ? trigger.getTenantId() : TenantId.SYS_TENANT_ID, trigger.getType()); if (triggerType == null) return;
TenantId tenantId = triggerType.isTenantLevel() ? trigger.getTenantId() : TenantId.SYS_TENANT_ID;
List<NotificationRule> rules = notificationRuleService.findNotificationRulesByTenantIdAndTriggerType(tenantId, triggerType);
for (NotificationRule rule : rules) { for (NotificationRule rule : rules) {
notificationExecutor.submit(() -> { notificationExecutor.submit(() -> {
try { try {
@ -86,19 +86,6 @@ public class DefaultNotificationRuleProcessingService implements NotificationRul
} }
} }
@Override
public void process(TenantId tenantId, TbMsg ruleEngineMsg) {
NotificationRuleTriggerType triggerType = ruleEngineMsgTypeToTriggerType.get(ruleEngineMsg.getType());
if (triggerType == null) {
return;
}
process(RuleEngineMsgTrigger.builder()
.tenantId(tenantId)
.msg(ruleEngineMsg)
.triggerType(triggerType)
.build());
}
private void processNotificationRule(NotificationRule rule, NotificationRuleTrigger trigger) { private void processNotificationRule(NotificationRule rule, NotificationRuleTrigger trigger) {
NotificationRuleTriggerConfig triggerConfig = rule.getTriggerConfig(); NotificationRuleTriggerConfig triggerConfig = rule.getTriggerConfig();
log.debug("Processing notification rule '{}' for trigger type {}", rule.getName(), rule.getTriggerType()); log.debug("Processing notification rule '{}' for trigger type {}", rule.getName(), rule.getTriggerType());
@ -192,6 +179,7 @@ public class DefaultNotificationRuleProcessingService implements NotificationRul
@Autowired @Autowired
public void setTriggerProcessors(Collection<NotificationRuleTriggerProcessor> processors) { public void setTriggerProcessors(Collection<NotificationRuleTriggerProcessor> processors) {
Map<String, NotificationRuleTriggerType> ruleEngineMsgTypeToTriggerType = new HashMap<>();
processors.forEach(processor -> { processors.forEach(processor -> {
triggerProcessors.put(processor.getTriggerType(), processor); triggerProcessors.put(processor.getTriggerType(), processor);
if (processor instanceof RuleEngineMsgNotificationRuleTriggerProcessor) { if (processor instanceof RuleEngineMsgNotificationRuleTriggerProcessor) {
@ -201,6 +189,7 @@ public class DefaultNotificationRuleProcessingService implements NotificationRul
}); });
} }
}); });
RuleEngineMsgTrigger.msgTypeToTriggerType = ruleEngineMsgTypeToTriggerType;
} }
} }

View File

@ -53,7 +53,6 @@ import org.thingsboard.server.common.msg.ToDeviceActorNotificationMsg;
import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg; import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg;
import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse; import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse;
import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest; import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest;
import org.thingsboard.server.common.msg.notification.trigger.NotificationRuleTrigger;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
@ -532,17 +531,6 @@ public class DefaultTbClusterService implements TbClusterService {
} }
} }
@Override
public void pushToNotificationRuleProcessingService(NotificationRuleTrigger notificationRuleTrigger) {
TransportProtos.NotificationRuleProcessingServiceMsg.Builder msg = TransportProtos.NotificationRuleProcessingServiceMsg.newBuilder()
.setTrigger(ByteString.copyFrom(encodingService.encode(notificationRuleTrigger)));
pushMsgToCore(notificationRuleTrigger.getTenantId(), notificationRuleTrigger.getOriginatorEntityId(),
ToCoreMsg.newBuilder()
.setNotificationRuleProcessingServiceMsg(msg)
.build(), null);
}
private void pushDeviceUpdateMessage(TenantId tenantId, EdgeId edgeId, EntityId entityId, EdgeEventActionType action) { private void pushDeviceUpdateMessage(TenantId tenantId, EdgeId edgeId, EntityId entityId, EdgeEventActionType action) {
log.trace("{} Going to send edge update notification for device actor, device id {}, edge id {}", tenantId, entityId, edgeId); log.trace("{} Going to send edge update notification for device actor, device id {}, edge id {}", tenantId, entityId, edgeId);
switch (action) { switch (action) {

View File

@ -40,7 +40,7 @@ import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse; import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse;
import org.thingsboard.server.common.stats.StatsFactory; import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.dao.notification.NotificationRuleProcessingService; import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.DeviceStateServiceMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.DeviceStateServiceMsgProto;
@ -131,7 +131,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
private final OtaPackageStateService firmwareStateService; private final OtaPackageStateService firmwareStateService;
private final GitVersionControlQueueService vcQueueService; private final GitVersionControlQueueService vcQueueService;
private final NotificationSchedulerService notificationSchedulerService; private final NotificationSchedulerService notificationSchedulerService;
private final NotificationRuleProcessingService notificationRuleProcessingService; private final NotificationRuleProcessor notificationRuleProcessor;
private final TbCoreConsumerStats stats; private final TbCoreConsumerStats stats;
protected final TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> usageStatsConsumer; protected final TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> usageStatsConsumer;
private final TbQueueConsumer<TbProtoQueueMsg<ToOtaPackageStateServiceMsg>> firmwareStatesConsumer; private final TbQueueConsumer<TbProtoQueueMsg<ToOtaPackageStateServiceMsg>> firmwareStatesConsumer;
@ -160,7 +160,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
ApplicationEventPublisher eventPublisher, ApplicationEventPublisher eventPublisher,
Optional<JwtSettingsService> jwtSettingsService, Optional<JwtSettingsService> jwtSettingsService,
NotificationSchedulerService notificationSchedulerService, NotificationSchedulerService notificationSchedulerService,
NotificationRuleProcessingService notificationRuleProcessingService) { NotificationRuleProcessor notificationRuleProcessor) {
super(actorContext, encodingService, tenantProfileCache, deviceProfileCache, assetProfileCache, apiUsageStateService, partitionService, eventPublisher, tbCoreQueueFactory.createToCoreNotificationsMsgConsumer(), jwtSettingsService); super(actorContext, encodingService, tenantProfileCache, deviceProfileCache, assetProfileCache, apiUsageStateService, partitionService, eventPublisher, tbCoreQueueFactory.createToCoreNotificationsMsgConsumer(), jwtSettingsService);
this.mainConsumer = tbCoreQueueFactory.createToCoreMsgConsumer(); this.mainConsumer = tbCoreQueueFactory.createToCoreMsgConsumer();
this.usageStatsConsumer = tbCoreQueueFactory.createToUsageStatsServiceMsgConsumer(); this.usageStatsConsumer = tbCoreQueueFactory.createToUsageStatsServiceMsgConsumer();
@ -175,7 +175,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
this.firmwareStateService = firmwareStateService; this.firmwareStateService = firmwareStateService;
this.vcQueueService = vcQueueService; this.vcQueueService = vcQueueService;
this.notificationSchedulerService = notificationSchedulerService; this.notificationSchedulerService = notificationSchedulerService;
this.notificationRuleProcessingService = notificationRuleProcessingService; this.notificationRuleProcessor = notificationRuleProcessor;
} }
@PostConstruct @PostConstruct
@ -274,9 +274,9 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
TransportProtos.NotificationSchedulerServiceMsg notificationSchedulerServiceMsg = toCoreMsg.getNotificationSchedulerServiceMsg(); TransportProtos.NotificationSchedulerServiceMsg notificationSchedulerServiceMsg = toCoreMsg.getNotificationSchedulerServiceMsg();
log.trace("[{}] Forwarding message to notification scheduler service {}", id, toCoreMsg.getNotificationSchedulerServiceMsg()); log.trace("[{}] Forwarding message to notification scheduler service {}", id, toCoreMsg.getNotificationSchedulerServiceMsg());
forwardToNotificationSchedulerService(notificationSchedulerServiceMsg, callback); forwardToNotificationSchedulerService(notificationSchedulerServiceMsg, callback);
} else if (toCoreMsg.hasNotificationRuleProcessingServiceMsg()) { } else if (toCoreMsg.hasNotificationRuleProcessorMsg()) {
Optional<NotificationRuleTrigger> notificationRuleTrigger = encodingService.decode(toCoreMsg.getNotificationRuleProcessingServiceMsg().getTrigger().toByteArray()); Optional<NotificationRuleTrigger> notificationRuleTrigger = encodingService.decode(toCoreMsg.getNotificationRuleProcessorMsg().getTrigger().toByteArray());
notificationRuleTrigger.ifPresent(notificationRuleProcessingService::process); notificationRuleTrigger.ifPresent(notificationRuleProcessor::process);
} }
} catch (Throwable e) { } catch (Throwable e) {
log.warn("[{}] Failed to process message: {}", id, msg, e); log.warn("[{}] Failed to process message: {}", id, msg, e);

View File

@ -51,7 +51,7 @@ import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.stats.TbApiUsageReportClient; import org.thingsboard.server.common.stats.TbApiUsageReportClient;
import org.thingsboard.server.dao.alarm.AlarmOperationResult; import org.thingsboard.server.dao.alarm.AlarmOperationResult;
import org.thingsboard.server.dao.alarm.AlarmService; import org.thingsboard.server.dao.alarm.AlarmService;
import org.thingsboard.server.dao.notification.NotificationRuleProcessingService; import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
import org.thingsboard.server.service.apiusage.TbApiUsageStateService; import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
import org.thingsboard.server.service.entitiy.alarm.TbAlarmCommentService; import org.thingsboard.server.service.entitiy.alarm.TbAlarmCommentService;
import org.thingsboard.server.service.subscription.TbSubscriptionUtils; import org.thingsboard.server.service.subscription.TbSubscriptionUtils;
@ -70,7 +70,7 @@ public class DefaultAlarmSubscriptionService extends AbstractSubscriptionService
private final TbAlarmCommentService alarmCommentService; private final TbAlarmCommentService alarmCommentService;
private final TbApiUsageReportClient apiUsageClient; private final TbApiUsageReportClient apiUsageClient;
private final TbApiUsageStateService apiUsageStateService; private final TbApiUsageStateService apiUsageStateService;
private final NotificationRuleProcessingService notificationRuleProcessingService; private final NotificationRuleProcessor notificationRuleProcessor;
@Override @Override
protected String getExecutorPrefix() { protected String getExecutorPrefix() {
@ -235,7 +235,7 @@ public class DefaultAlarmSubscriptionService extends AbstractSubscriptionService
return TbSubscriptionUtils.toAlarmUpdateProto(tenantId, entityId, alarm); return TbSubscriptionUtils.toAlarmUpdateProto(tenantId, entityId, alarm);
}); });
} }
notificationRuleProcessingService.process(AlarmTrigger.builder() notificationRuleProcessor.process(AlarmTrigger.builder()
.tenantId(tenantId) .tenantId(tenantId)
.alarmUpdate(result) .alarmUpdate(result)
.build()); .build());
@ -253,7 +253,7 @@ public class DefaultAlarmSubscriptionService extends AbstractSubscriptionService
return TbSubscriptionUtils.toAlarmDeletedProto(tenantId, entityId, alarm); return TbSubscriptionUtils.toAlarmDeletedProto(tenantId, entityId, alarm);
}); });
} }
notificationRuleProcessingService.process(AlarmTrigger.builder() notificationRuleProcessor.process(AlarmTrigger.builder()
.tenantId(tenantId) .tenantId(tenantId)
.alarmUpdate(result) .alarmUpdate(result)
.build()); .build());

View File

@ -26,7 +26,7 @@ import org.springframework.web.client.RestTemplate;
import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.UpdateMessage; import org.thingsboard.server.common.data.UpdateMessage;
import org.thingsboard.server.common.msg.notification.trigger.NewPlatformVersionTrigger; import org.thingsboard.server.common.msg.notification.trigger.NewPlatformVersionTrigger;
import org.thingsboard.server.dao.notification.NotificationRuleProcessingService; import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.queue.util.TbCoreComponent;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
@ -57,7 +57,7 @@ public class DefaultUpdateService implements UpdateService {
private boolean updatesEnabled; private boolean updatesEnabled;
@Autowired @Autowired
private NotificationRuleProcessingService notificationRuleProcessingService; private NotificationRuleProcessor notificationRuleProcessor;
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, ThingsBoardThreadFactory.forName("tb-update-service")); private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, ThingsBoardThreadFactory.forName("tb-update-service"));
@ -135,7 +135,7 @@ public class DefaultUpdateService implements UpdateService {
version version
); );
if (updateMessage.isUpdateAvailable() && !updateMessage.equals(prevUpdateMessage)) { if (updateMessage.isUpdateAvailable() && !updateMessage.equals(prevUpdateMessage)) {
notificationRuleProcessingService.process(NewPlatformVersionTrigger.builder() notificationRuleProcessor.process(NewPlatformVersionTrigger.builder()
.message(updateMessage) .message(updateMessage)
.build()); .build());
} }

View File

@ -31,13 +31,12 @@ import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.ToDeviceActorNotificationMsg; import org.thingsboard.server.common.msg.ToDeviceActorNotificationMsg;
import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse; import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse;
import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest; import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest;
import org.thingsboard.server.common.msg.notification.trigger.NotificationRuleTrigger;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse; import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse;
import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg;
import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueClusterService; import org.thingsboard.server.queue.TbQueueClusterService;
@ -97,6 +96,4 @@ public interface TbClusterService extends TbQueueClusterService {
void sendNotificationMsgToEdge(TenantId tenantId, EdgeId edgeId, EntityId entityId, String body, EdgeEventType type, EdgeEventActionType action); void sendNotificationMsgToEdge(TenantId tenantId, EdgeId edgeId, EntityId entityId, String body, EdgeEventType type, EdgeEventActionType action);
void pushToNotificationRuleProcessingService(NotificationRuleTrigger notificationRuleTrigger);
} }

View File

@ -968,7 +968,7 @@ message ToCoreMsg {
EdgeNotificationMsgProto edgeNotificationMsg = 5; EdgeNotificationMsgProto edgeNotificationMsg = 5;
DeviceActivityProto deviceActivityMsg = 6; DeviceActivityProto deviceActivityMsg = 6;
NotificationSchedulerServiceMsg notificationSchedulerServiceMsg = 7; NotificationSchedulerServiceMsg notificationSchedulerServiceMsg = 7;
NotificationRuleProcessingServiceMsg notificationRuleProcessingServiceMsg = 8; NotificationRuleProcessorMsg notificationRuleProcessorMsg = 8;
} }
/* High priority messages with low latency are handled by ThingsBoard Core Service separately */ /* High priority messages with low latency are handled by ThingsBoard Core Service separately */
@ -1055,6 +1055,6 @@ message NotificationSchedulerServiceMsg {
int64 ts = 5; int64 ts = 5;
} }
message NotificationRuleProcessingServiceMsg { message NotificationRuleProcessorMsg {
bytes trigger = 1; bytes trigger = 1;
} }

View File

@ -13,16 +13,12 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.thingsboard.server.dao.notification; package org.thingsboard.server.common.msg.notification;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.notification.trigger.NotificationRuleTrigger; import org.thingsboard.server.common.msg.notification.trigger.NotificationRuleTrigger;
import org.thingsboard.server.common.msg.TbMsg;
public interface NotificationRuleProcessingService { public interface NotificationRuleProcessor {
void process(NotificationRuleTrigger trigger); void process(NotificationRuleTrigger trigger);
void process(TenantId tenantId, TbMsg ruleEngineMsg);
} }

View File

@ -22,17 +22,20 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTriggerType; import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTriggerType;
import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsg;
import java.util.Map;
@Data @Data
@Builder @Builder
public class RuleEngineMsgTrigger implements NotificationRuleTrigger { public class RuleEngineMsgTrigger implements NotificationRuleTrigger {
private final TenantId tenantId; private final TenantId tenantId;
private final TbMsg msg; private final TbMsg msg;
private final NotificationRuleTriggerType triggerType;
public static Map<String, NotificationRuleTriggerType> msgTypeToTriggerType; // set on init by DefaultNotificationRuleProcessor
@Override @Override
public NotificationRuleTriggerType getType() { public NotificationRuleTriggerType getType() {
return triggerType; return msgTypeToTriggerType != null ? msgTypeToTriggerType.get(msg.getType()) : null;
} }
@Override @Override

View File

@ -0,0 +1,55 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.queue.notification;
import com.google.protobuf.ByteString;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
import org.thingsboard.server.common.msg.notification.trigger.NotificationRuleTrigger;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
import java.util.UUID;
@Service
@ConditionalOnMissingBean(NotificationRuleProcessor.class)
@RequiredArgsConstructor
public class RemoteNotificationRuleProcessor implements NotificationRuleProcessor {
private final TbQueueProducerProvider producerProvider;
private final PartitionService partitionService;
private final DataDecodingEncodingService encodingService;
@Override
public void process(NotificationRuleTrigger trigger) {
TransportProtos.NotificationRuleProcessorMsg.Builder msg = TransportProtos.NotificationRuleProcessorMsg.newBuilder()
.setTrigger(ByteString.copyFrom(encodingService.encode(trigger)));
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, trigger.getTenantId(), trigger.getOriginatorEntityId());
producerProvider.getTbCoreMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(),
TransportProtos.ToCoreMsg.newBuilder()
.setNotificationRuleProcessorMsg(msg)
.build()), null);
}
}

View File

@ -25,9 +25,9 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.query.EntityCountQuery; import org.thingsboard.server.common.data.query.EntityCountQuery;
import org.thingsboard.server.common.data.query.EntityTypeFilter; import org.thingsboard.server.common.data.query.EntityTypeFilter;
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
import org.thingsboard.server.dao.entity.EntityService; import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
import org.thingsboard.server.dao.notification.NotificationRuleProcessingService;
import org.thingsboard.server.common.msg.notification.trigger.EntitiesLimitTrigger; import org.thingsboard.server.common.msg.notification.trigger.EntitiesLimitTrigger;
import org.thingsboard.server.dao.entity.EntityService;
import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
@Service @Service
@ -37,7 +37,7 @@ public class DefaultApiLimitService implements ApiLimitService {
private final EntityService entityService; private final EntityService entityService;
private final TbTenantProfileCache tenantProfileCache; private final TbTenantProfileCache tenantProfileCache;
@Autowired(required = false) @Autowired(required = false)
private NotificationRuleProcessingService notificationRuleProcessingService; private NotificationRuleProcessor notificationRuleProcessor;
@Override @Override
public boolean checkEntitiesLimit(TenantId tenantId, EntityType entityType) { public boolean checkEntitiesLimit(TenantId tenantId, EntityType entityType) {
@ -47,8 +47,8 @@ public class DefaultApiLimitService implements ApiLimitService {
EntityTypeFilter filter = new EntityTypeFilter(); EntityTypeFilter filter = new EntityTypeFilter();
filter.setEntityType(entityType); filter.setEntityType(entityType);
long currentCount = entityService.countEntitiesByQuery(tenantId, new CustomerId(EntityId.NULL_UUID), new EntityCountQuery(filter)); long currentCount = entityService.countEntitiesByQuery(tenantId, new CustomerId(EntityId.NULL_UUID), new EntityCountQuery(filter));
if (notificationRuleProcessingService != null) { if (notificationRuleProcessor != null) {
notificationRuleProcessingService.process(EntitiesLimitTrigger.builder() notificationRuleProcessor.process(EntitiesLimitTrigger.builder()
.tenantId(tenantId) .tenantId(tenantId)
.entityType(entityType) .entityType(entityType)
.currentCount(currentCount) .currentCount(currentCount)