diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index c0fff5e387..26eab30d5d 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -71,7 +71,6 @@ import org.thingsboard.server.dao.event.EventService; import org.thingsboard.server.dao.nosql.CassandraBufferedRateReadExecutor; import org.thingsboard.server.dao.nosql.CassandraBufferedRateWriteExecutor; import org.thingsboard.server.dao.notification.NotificationRequestService; -import org.thingsboard.server.dao.notification.NotificationRuleProcessingService; import org.thingsboard.server.dao.notification.NotificationRuleService; import org.thingsboard.server.dao.notification.NotificationTargetService; import org.thingsboard.server.dao.notification.NotificationTemplateService; @@ -90,6 +89,7 @@ import org.thingsboard.server.dao.widget.WidgetTypeService; import org.thingsboard.server.dao.widget.WidgetsBundleService; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; +import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor; import org.thingsboard.server.queue.util.DataDecodingEncodingService; import org.thingsboard.server.service.apiusage.TbApiUsageStateService; import org.thingsboard.server.service.component.ComponentDiscoveryService; @@ -341,7 +341,7 @@ public class ActorSystemContext { @Autowired @Getter - private NotificationRuleProcessingService notificationRuleProcessingService; + private NotificationRuleProcessor notificationRuleProcessor; @Autowired @Getter diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleEngineComponentActor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleEngineComponentActor.java index 892b655255..e1918466a8 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleEngineComponentActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleEngineComponentActor.java @@ -50,7 +50,7 @@ public abstract class RuleEngineComponentActor { ApiUsageRecordState recordState = createApiUsageRecordState((TenantApiUsageState) state, apiFeature, stateValue); - notificationRuleProcessingService.process(ApiUsageLimitTrigger.builder() + notificationRuleProcessor.process(ApiUsageLimitTrigger.builder() .tenantId(state.getTenantId()) .state(recordState) .status(stateValue) diff --git a/application/src/main/java/org/thingsboard/server/service/notification/rule/DefaultNotificationRuleProcessingService.java b/application/src/main/java/org/thingsboard/server/service/notification/rule/DefaultNotificationRuleProcessor.java similarity index 90% rename from application/src/main/java/org/thingsboard/server/service/notification/rule/DefaultNotificationRuleProcessingService.java rename to application/src/main/java/org/thingsboard/server/service/notification/rule/DefaultNotificationRuleProcessor.java index 0c14938d7d..18d8198087 100644 --- a/application/src/main/java/org/thingsboard/server/service/notification/rule/DefaultNotificationRuleProcessingService.java +++ b/application/src/main/java/org/thingsboard/server/service/notification/rule/DefaultNotificationRuleProcessor.java @@ -35,13 +35,12 @@ import org.thingsboard.server.common.data.notification.rule.NotificationRule; import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTriggerConfig; import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTriggerType; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; -import org.thingsboard.server.common.msg.TbMsg; -import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; -import org.thingsboard.server.dao.notification.NotificationRequestService; -import org.thingsboard.server.dao.notification.NotificationRuleProcessingService; -import org.thingsboard.server.dao.notification.NotificationRuleService; import org.thingsboard.server.common.msg.notification.trigger.NotificationRuleTrigger; import org.thingsboard.server.common.msg.notification.trigger.RuleEngineMsgTrigger; +import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; +import org.thingsboard.server.dao.notification.NotificationRequestService; +import org.thingsboard.server.dao.notification.NotificationRuleService; +import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor; import org.thingsboard.server.service.executors.NotificationExecutorService; import org.thingsboard.server.service.notification.rule.trigger.NotificationRuleTriggerProcessor; import org.thingsboard.server.service.notification.rule.trigger.RuleEngineMsgNotificationRuleTriggerProcessor; @@ -59,7 +58,7 @@ import java.util.stream.Collectors; @RequiredArgsConstructor @Slf4j @SuppressWarnings({"rawtypes", "unchecked"}) -public class DefaultNotificationRuleProcessingService implements NotificationRuleProcessingService { +public class DefaultNotificationRuleProcessor implements NotificationRuleProcessor { private final NotificationRuleService notificationRuleService; private final NotificationRequestService notificationRequestService; @@ -69,12 +68,13 @@ public class DefaultNotificationRuleProcessingService implements NotificationRul private final Map triggerProcessors = new EnumMap<>(NotificationRuleTriggerType.class); - private final Map ruleEngineMsgTypeToTriggerType = new HashMap<>(); - @Override public void process(NotificationRuleTrigger trigger) { - List rules = notificationRuleService.findNotificationRulesByTenantIdAndTriggerType( - trigger.getType().isTenantLevel() ? trigger.getTenantId() : TenantId.SYS_TENANT_ID, trigger.getType()); + NotificationRuleTriggerType triggerType = trigger.getType(); + if (triggerType == null) return; + TenantId tenantId = triggerType.isTenantLevel() ? trigger.getTenantId() : TenantId.SYS_TENANT_ID; + + List rules = notificationRuleService.findNotificationRulesByTenantIdAndTriggerType(tenantId, triggerType); for (NotificationRule rule : rules) { notificationExecutor.submit(() -> { try { @@ -86,19 +86,6 @@ public class DefaultNotificationRuleProcessingService implements NotificationRul } } - @Override - public void process(TenantId tenantId, TbMsg ruleEngineMsg) { - NotificationRuleTriggerType triggerType = ruleEngineMsgTypeToTriggerType.get(ruleEngineMsg.getType()); - if (triggerType == null) { - return; - } - process(RuleEngineMsgTrigger.builder() - .tenantId(tenantId) - .msg(ruleEngineMsg) - .triggerType(triggerType) - .build()); - } - private void processNotificationRule(NotificationRule rule, NotificationRuleTrigger trigger) { NotificationRuleTriggerConfig triggerConfig = rule.getTriggerConfig(); log.debug("Processing notification rule '{}' for trigger type {}", rule.getName(), rule.getTriggerType()); @@ -192,6 +179,7 @@ public class DefaultNotificationRuleProcessingService implements NotificationRul @Autowired public void setTriggerProcessors(Collection processors) { + Map ruleEngineMsgTypeToTriggerType = new HashMap<>(); processors.forEach(processor -> { triggerProcessors.put(processor.getTriggerType(), processor); if (processor instanceof RuleEngineMsgNotificationRuleTriggerProcessor) { @@ -201,6 +189,7 @@ public class DefaultNotificationRuleProcessingService implements NotificationRul }); } }); + RuleEngineMsgTrigger.msgTypeToTriggerType = ruleEngineMsgTypeToTriggerType; } } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index e4119d76e8..78ea88b32d 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -53,7 +53,6 @@ import org.thingsboard.server.common.msg.ToDeviceActorNotificationMsg; import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg; import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse; import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest; -import org.thingsboard.server.common.msg.notification.trigger.NotificationRuleTrigger; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; @@ -532,17 +531,6 @@ public class DefaultTbClusterService implements TbClusterService { } } - @Override - public void pushToNotificationRuleProcessingService(NotificationRuleTrigger notificationRuleTrigger) { - TransportProtos.NotificationRuleProcessingServiceMsg.Builder msg = TransportProtos.NotificationRuleProcessingServiceMsg.newBuilder() - .setTrigger(ByteString.copyFrom(encodingService.encode(notificationRuleTrigger))); - - pushMsgToCore(notificationRuleTrigger.getTenantId(), notificationRuleTrigger.getOriginatorEntityId(), - ToCoreMsg.newBuilder() - .setNotificationRuleProcessingServiceMsg(msg) - .build(), null); - } - private void pushDeviceUpdateMessage(TenantId tenantId, EdgeId edgeId, EntityId entityId, EdgeEventActionType action) { log.trace("{} Going to send edge update notification for device actor, device id {}, edge id {}", tenantId, entityId, edgeId); switch (action) { diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index a56ff865a6..ad2a5baebe 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -40,7 +40,7 @@ import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse; import org.thingsboard.server.common.stats.StatsFactory; -import org.thingsboard.server.dao.notification.NotificationRuleProcessingService; +import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor; import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.DeviceStateServiceMsgProto; @@ -131,7 +131,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService> usageStatsConsumer; private final TbQueueConsumer> firmwareStatesConsumer; @@ -160,7 +160,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService jwtSettingsService, NotificationSchedulerService notificationSchedulerService, - NotificationRuleProcessingService notificationRuleProcessingService) { + NotificationRuleProcessor notificationRuleProcessor) { super(actorContext, encodingService, tenantProfileCache, deviceProfileCache, assetProfileCache, apiUsageStateService, partitionService, eventPublisher, tbCoreQueueFactory.createToCoreNotificationsMsgConsumer(), jwtSettingsService); this.mainConsumer = tbCoreQueueFactory.createToCoreMsgConsumer(); this.usageStatsConsumer = tbCoreQueueFactory.createToUsageStatsServiceMsgConsumer(); @@ -175,7 +175,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService notificationRuleTrigger = encodingService.decode(toCoreMsg.getNotificationRuleProcessingServiceMsg().getTrigger().toByteArray()); - notificationRuleTrigger.ifPresent(notificationRuleProcessingService::process); + } else if (toCoreMsg.hasNotificationRuleProcessorMsg()) { + Optional notificationRuleTrigger = encodingService.decode(toCoreMsg.getNotificationRuleProcessorMsg().getTrigger().toByteArray()); + notificationRuleTrigger.ifPresent(notificationRuleProcessor::process); } } catch (Throwable e) { log.warn("[{}] Failed to process message: {}", id, msg, e); diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultAlarmSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultAlarmSubscriptionService.java index 01a0c98a60..07243711f3 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultAlarmSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultAlarmSubscriptionService.java @@ -51,7 +51,7 @@ import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.stats.TbApiUsageReportClient; import org.thingsboard.server.dao.alarm.AlarmOperationResult; import org.thingsboard.server.dao.alarm.AlarmService; -import org.thingsboard.server.dao.notification.NotificationRuleProcessingService; +import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor; import org.thingsboard.server.service.apiusage.TbApiUsageStateService; import org.thingsboard.server.service.entitiy.alarm.TbAlarmCommentService; import org.thingsboard.server.service.subscription.TbSubscriptionUtils; @@ -70,7 +70,7 @@ public class DefaultAlarmSubscriptionService extends AbstractSubscriptionService private final TbAlarmCommentService alarmCommentService; private final TbApiUsageReportClient apiUsageClient; private final TbApiUsageStateService apiUsageStateService; - private final NotificationRuleProcessingService notificationRuleProcessingService; + private final NotificationRuleProcessor notificationRuleProcessor; @Override protected String getExecutorPrefix() { @@ -235,7 +235,7 @@ public class DefaultAlarmSubscriptionService extends AbstractSubscriptionService return TbSubscriptionUtils.toAlarmUpdateProto(tenantId, entityId, alarm); }); } - notificationRuleProcessingService.process(AlarmTrigger.builder() + notificationRuleProcessor.process(AlarmTrigger.builder() .tenantId(tenantId) .alarmUpdate(result) .build()); @@ -253,7 +253,7 @@ public class DefaultAlarmSubscriptionService extends AbstractSubscriptionService return TbSubscriptionUtils.toAlarmDeletedProto(tenantId, entityId, alarm); }); } - notificationRuleProcessingService.process(AlarmTrigger.builder() + notificationRuleProcessor.process(AlarmTrigger.builder() .tenantId(tenantId) .alarmUpdate(result) .build()); diff --git a/application/src/main/java/org/thingsboard/server/service/update/DefaultUpdateService.java b/application/src/main/java/org/thingsboard/server/service/update/DefaultUpdateService.java index bbb1047879..5010969b3b 100644 --- a/application/src/main/java/org/thingsboard/server/service/update/DefaultUpdateService.java +++ b/application/src/main/java/org/thingsboard/server/service/update/DefaultUpdateService.java @@ -26,7 +26,7 @@ import org.springframework.web.client.RestTemplate; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.data.UpdateMessage; import org.thingsboard.server.common.msg.notification.trigger.NewPlatformVersionTrigger; -import org.thingsboard.server.dao.notification.NotificationRuleProcessingService; +import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor; import org.thingsboard.server.queue.util.TbCoreComponent; import javax.annotation.PostConstruct; @@ -57,7 +57,7 @@ public class DefaultUpdateService implements UpdateService { private boolean updatesEnabled; @Autowired - private NotificationRuleProcessingService notificationRuleProcessingService; + private NotificationRuleProcessor notificationRuleProcessor; private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, ThingsBoardThreadFactory.forName("tb-update-service")); @@ -135,7 +135,7 @@ public class DefaultUpdateService implements UpdateService { version ); if (updateMessage.isUpdateAvailable() && !updateMessage.equals(prevUpdateMessage)) { - notificationRuleProcessingService.process(NewPlatformVersionTrigger.builder() + notificationRuleProcessor.process(NewPlatformVersionTrigger.builder() .message(updateMessage) .build()); } diff --git a/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java b/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java index a32dcd44ce..8962a35187 100644 --- a/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java +++ b/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java @@ -31,13 +31,12 @@ import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.ToDeviceActorNotificationMsg; import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse; import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest; -import org.thingsboard.server.common.msg.notification.trigger.NotificationRuleTrigger; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse; -import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToVersionControlServiceMsg; import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueClusterService; @@ -97,6 +96,4 @@ public interface TbClusterService extends TbQueueClusterService { void sendNotificationMsgToEdge(TenantId tenantId, EdgeId edgeId, EntityId entityId, String body, EdgeEventType type, EdgeEventActionType action); - void pushToNotificationRuleProcessingService(NotificationRuleTrigger notificationRuleTrigger); - } diff --git a/common/cluster-api/src/main/proto/queue.proto b/common/cluster-api/src/main/proto/queue.proto index fc75bad5a3..7553d97b16 100644 --- a/common/cluster-api/src/main/proto/queue.proto +++ b/common/cluster-api/src/main/proto/queue.proto @@ -968,7 +968,7 @@ message ToCoreMsg { EdgeNotificationMsgProto edgeNotificationMsg = 5; DeviceActivityProto deviceActivityMsg = 6; NotificationSchedulerServiceMsg notificationSchedulerServiceMsg = 7; - NotificationRuleProcessingServiceMsg notificationRuleProcessingServiceMsg = 8; + NotificationRuleProcessorMsg notificationRuleProcessorMsg = 8; } /* High priority messages with low latency are handled by ThingsBoard Core Service separately */ @@ -1055,6 +1055,6 @@ message NotificationSchedulerServiceMsg { int64 ts = 5; } -message NotificationRuleProcessingServiceMsg { +message NotificationRuleProcessorMsg { bytes trigger = 1; } diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/notification/NotificationRuleProcessingService.java b/common/message/src/main/java/org/thingsboard/server/common/msg/notification/NotificationRuleProcessor.java similarity index 74% rename from common/dao-api/src/main/java/org/thingsboard/server/dao/notification/NotificationRuleProcessingService.java rename to common/message/src/main/java/org/thingsboard/server/common/msg/notification/NotificationRuleProcessor.java index f634bd60fc..380773c1b0 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/notification/NotificationRuleProcessingService.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/notification/NotificationRuleProcessor.java @@ -13,16 +13,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.dao.notification; +package org.thingsboard.server.common.msg.notification; -import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.notification.trigger.NotificationRuleTrigger; -import org.thingsboard.server.common.msg.TbMsg; -public interface NotificationRuleProcessingService { +public interface NotificationRuleProcessor { void process(NotificationRuleTrigger trigger); - void process(TenantId tenantId, TbMsg ruleEngineMsg); - } diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/notification/trigger/RuleEngineMsgTrigger.java b/common/message/src/main/java/org/thingsboard/server/common/msg/notification/trigger/RuleEngineMsgTrigger.java index 9d9d231311..361264a0c2 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/notification/trigger/RuleEngineMsgTrigger.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/notification/trigger/RuleEngineMsgTrigger.java @@ -22,17 +22,20 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTriggerType; import org.thingsboard.server.common.msg.TbMsg; +import java.util.Map; + @Data @Builder public class RuleEngineMsgTrigger implements NotificationRuleTrigger { private final TenantId tenantId; private final TbMsg msg; - private final NotificationRuleTriggerType triggerType; + + public static Map msgTypeToTriggerType; // set on init by DefaultNotificationRuleProcessor @Override public NotificationRuleTriggerType getType() { - return triggerType; + return msgTypeToTriggerType != null ? msgTypeToTriggerType.get(msg.getType()) : null; } @Override diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/notification/RemoteNotificationRuleProcessor.java b/common/queue/src/main/java/org/thingsboard/server/queue/notification/RemoteNotificationRuleProcessor.java new file mode 100644 index 0000000000..2e38266046 --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/notification/RemoteNotificationRuleProcessor.java @@ -0,0 +1,55 @@ +/** + * Copyright © 2016-2023 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.notification; + +import com.google.protobuf.ByteString; +import lombok.RequiredArgsConstructor; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.stereotype.Service; +import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor; +import org.thingsboard.server.common.msg.notification.trigger.NotificationRuleTrigger; +import org.thingsboard.server.common.msg.queue.ServiceType; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.queue.common.TbProtoQueueMsg; +import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.provider.TbQueueProducerProvider; +import org.thingsboard.server.queue.util.DataDecodingEncodingService; + +import java.util.UUID; + +@Service +@ConditionalOnMissingBean(NotificationRuleProcessor.class) +@RequiredArgsConstructor +public class RemoteNotificationRuleProcessor implements NotificationRuleProcessor { + + private final TbQueueProducerProvider producerProvider; + private final PartitionService partitionService; + private final DataDecodingEncodingService encodingService; + + @Override + public void process(NotificationRuleTrigger trigger) { + TransportProtos.NotificationRuleProcessorMsg.Builder msg = TransportProtos.NotificationRuleProcessorMsg.newBuilder() + .setTrigger(ByteString.copyFrom(encodingService.encode(trigger))); + + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, trigger.getTenantId(), trigger.getOriginatorEntityId()); + producerProvider.getTbCoreMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), + TransportProtos.ToCoreMsg.newBuilder() + .setNotificationRuleProcessorMsg(msg) + .build()), null); + } + +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/usagerecord/DefaultApiLimitService.java b/dao/src/main/java/org/thingsboard/server/dao/usagerecord/DefaultApiLimitService.java index 2ebd91bc24..d58c91c1c7 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/usagerecord/DefaultApiLimitService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/usagerecord/DefaultApiLimitService.java @@ -25,9 +25,9 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.query.EntityCountQuery; import org.thingsboard.server.common.data.query.EntityTypeFilter; import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; -import org.thingsboard.server.dao.entity.EntityService; -import org.thingsboard.server.dao.notification.NotificationRuleProcessingService; +import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor; import org.thingsboard.server.common.msg.notification.trigger.EntitiesLimitTrigger; +import org.thingsboard.server.dao.entity.EntityService; import org.thingsboard.server.dao.tenant.TbTenantProfileCache; @Service @@ -37,7 +37,7 @@ public class DefaultApiLimitService implements ApiLimitService { private final EntityService entityService; private final TbTenantProfileCache tenantProfileCache; @Autowired(required = false) - private NotificationRuleProcessingService notificationRuleProcessingService; + private NotificationRuleProcessor notificationRuleProcessor; @Override public boolean checkEntitiesLimit(TenantId tenantId, EntityType entityType) { @@ -47,8 +47,8 @@ public class DefaultApiLimitService implements ApiLimitService { EntityTypeFilter filter = new EntityTypeFilter(); filter.setEntityType(entityType); long currentCount = entityService.countEntitiesByQuery(tenantId, new CustomerId(EntityId.NULL_UUID), new EntityCountQuery(filter)); - if (notificationRuleProcessingService != null) { - notificationRuleProcessingService.process(EntitiesLimitTrigger.builder() + if (notificationRuleProcessor != null) { + notificationRuleProcessor.process(EntitiesLimitTrigger.builder() .tenantId(tenantId) .entityType(entityType) .currentCount(currentCount)