diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index 7822438724..fb4ff77359 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -159,10 +159,6 @@ public class ActorSystemContext { @Getter private TbClusterService clusterService; - @Autowired - @Getter - private TbQueueProducerProvider producerProvider; - @Autowired @Getter private TimeseriesService tsService; diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index 0e8033fac2..13f7b1122d 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -17,10 +17,8 @@ package org.thingsboard.server.actors.ruleChain; import akka.actor.ActorRef; import com.datastax.driver.core.ResultSetFuture; -import com.datastax.driver.core.utils.UUIDs; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; import io.netty.channel.EventLoopGroup; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.RedisTemplate; @@ -36,7 +34,6 @@ import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.common.data.Customer; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Device; -import org.thingsboard.server.common.data.SearchTextBasedWithAdditionalInfo; import org.thingsboard.server.common.data.alarm.Alarm; import org.thingsboard.server.common.data.asset.Asset; import org.thingsboard.server.common.data.id.EntityId; @@ -45,8 +42,8 @@ import org.thingsboard.server.common.data.id.RuleNodeId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.rule.RuleNode; import org.thingsboard.server.common.msg.TbMsg; -import org.thingsboard.server.common.msg.TbMsgDataType; import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.dao.alarm.AlarmService; import org.thingsboard.server.dao.asset.AssetService; @@ -62,11 +59,9 @@ import org.thingsboard.server.dao.rule.RuleChainService; import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.dao.user.UserService; -import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueMsgMetadata; -import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.service.script.RuleNodeJsScriptEngine; import scala.concurrent.duration.Duration; @@ -136,7 +131,7 @@ class DefaultTbContext implements TbContext { .setTenantIdMSB(getTenantId().getId().getMostSignificantBits()) .setTenantIdLSB(getTenantId().getId().getLeastSignificantBits()) .setTbMsg(TbMsg.toByteString(tbMsg)).build(); - mainCtx.getProducerProvider().getRuleEngineMsgProducer().send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), new SimpleTbQueueCallback(onSuccess, onFailure)); + mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg, new SimpleTbQueueCallback(onSuccess, onFailure)); } @Override @@ -193,7 +188,7 @@ class DefaultTbContext implements TbContext { if (failureMessage != null) { msg.setFailureMessage(failureMessage); } - mainCtx.getProducerProvider().getRuleEngineMsgProducer().send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg.build()), new SimpleTbQueueCallback(onSuccess, onFailure)); + mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg.build(), new SimpleTbQueueCallback(onSuccess, onFailure)); } @Override diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java index dd0a515a33..a374e590b5 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java @@ -44,10 +44,9 @@ import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.dao.rule.RuleChainService; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.queue.TbQueueCallback; -import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.common.MultipleTbQueueTbMsgCallbackWrapper; -import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.TbQueueTbMsgCallbackWrapper; +import org.thingsboard.server.service.queue.TbClusterService; import java.util.ArrayList; import java.util.Collections; @@ -68,7 +67,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor nodeActors; private final Map> nodeRoutes; private final RuleChainService service; - private final TbQueueProducer> producer; + private final TbClusterService clusterService; private String ruleChainName; private RuleNodeId firstId; @@ -84,7 +83,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor(); this.nodeRoutes = new HashMap<>(); this.service = systemContext.getRuleChainService(); - this.producer = systemContext.getProducerProvider().getRuleEngineMsgProducer(); + this.clusterService = systemContext.getClusterService(); } @Override @@ -255,7 +254,6 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor(newMsg.getId(), toQueueMsg), callbackWrapper); + clusterService.pushMsgToRuleEngine(tpi, newMsg.getId(), toQueueMsg, callbackWrapper); } private boolean contains(Set relationTypes, String type) { diff --git a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java index 537e897583..7b04e82786 100644 --- a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java +++ b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java @@ -97,96 +97,4 @@ public class DefaultActorService implements ActorService { } } - @Value("${cluster.stats.enabled:false}") - private boolean statsEnabled; - - //TODO 2.5 - private final AtomicInteger sentClusterMsgs = new AtomicInteger(0); - private final AtomicInteger receivedClusterMsgs = new AtomicInteger(0); - - - @Scheduled(fixedDelayString = "${cluster.stats.print_interval_ms}") - public void printStats() { - if (statsEnabled) { - int sent = sentClusterMsgs.getAndSet(0); - int received = receivedClusterMsgs.getAndSet(0); - if (sent > 0 || received > 0) { - log.info("Cluster msgs sent [{}] received [{}]", sent, received); - } - } - } - - //TODO 2.5 -// @Override -// public void onReceivedMsg(ServerAddress source, ClusterAPIProtos.ClusterMessage msg) { -// if (statsEnabled) { -// receivedClusterMsgs.incrementAndGet(); -// } -// ServerAddress serverAddress = new ServerAddress(source.getHost(), source.getPort(), source.getServerType()); -// if (log.isDebugEnabled()) { -// log.info("Received msg [{}] from [{}]", msg.getMessageType().name(), serverAddress); -// log.info("MSG: {}", msg); -// } -// switch (msg.getMessageType()) { -// case CLUSTER_ACTOR_MESSAGE: -// java.util.Optional decodedMsg = actorContext.getEncodingService() -// .decode(msg.getPayload().toByteArray()); -// if (decodedMsg.isPresent()) { -// appActor.tell(decodedMsg.get(), ActorRef.noSender()); -// } else { -// log.error("Error during decoding cluster proto message"); -// } -// break; -// case TO_ALL_NODES_MSG: -// //TODO -// break; -// case CLUSTER_TELEMETRY_SUBSCRIPTION_CREATE_MESSAGE: -// actorContext.getTsSubService().onNewRemoteSubscription(serverAddress, msg.getPayload().toByteArray()); -// break; -// case CLUSTER_TELEMETRY_SUBSCRIPTION_UPDATE_MESSAGE: -// actorContext.getTsSubService().onRemoteSubscriptionUpdate(serverAddress, msg.getPayload().toByteArray()); -// break; -// case CLUSTER_TELEMETRY_SUBSCRIPTION_CLOSE_MESSAGE: -// actorContext.getTsSubService().onRemoteSubscriptionClose(serverAddress, msg.getPayload().toByteArray()); -// break; -// case CLUSTER_TELEMETRY_SESSION_CLOSE_MESSAGE: -// actorContext.getTsSubService().onRemoteSessionClose(serverAddress, msg.getPayload().toByteArray()); -// break; -// case CLUSTER_TELEMETRY_ATTR_UPDATE_MESSAGE: -// actorContext.getTsSubService().onRemoteAttributesUpdate(serverAddress, msg.getPayload().toByteArray()); -// break; -// case CLUSTER_TELEMETRY_TS_UPDATE_MESSAGE: -// actorContext.getTsSubService().onRemoteTsUpdate(serverAddress, msg.getPayload().toByteArray()); -// break; -// case CLUSTER_RPC_FROM_DEVICE_RESPONSE_MESSAGE: -// actorContext.getDeviceRpcService().processResponseToServerSideRPCRequestFromRemoteServer(serverAddress, msg.getPayload().toByteArray()); -// break; -// case CLUSTER_DEVICE_STATE_SERVICE_MESSAGE: -// actorContext.getDeviceStateService().onRemoteMsg(serverAddress, msg.getPayload().toByteArray()); -// break; -// case CLUSTER_TRANSACTION_SERVICE_MESSAGE: -// actorContext.getRuleChainTransactionService().onRemoteTransactionMsg(serverAddress, msg.getPayload().toByteArray()); -// break; -// } -// } -// @Override -// public void onSendMsg(ClusterAPIProtos.ClusterMessage msg) { -// if (statsEnabled) { -// sentClusterMsgs.incrementAndGet(); -// } -// rpcManagerActor.tell(msg, ActorRef.noSender()); -// } -// -// @Override -// public void onRpcSessionCreateRequestMsg(RpcSessionCreateRequestMsg msg) { -// if (statsEnabled) { -// sentClusterMsgs.incrementAndGet(); -// } -// rpcManagerActor.tell(msg, ActorRef.noSender()); -// } -// @Override -// public void onBroadcastMsg(RpcBroadcastMsg msg) { -// rpcManagerActor.tell(msg, ActorRef.noSender()); -// } - } diff --git a/application/src/main/java/org/thingsboard/server/controller/BaseController.java b/application/src/main/java/org/thingsboard/server/controller/BaseController.java index fbccaedb9e..73b25d4de2 100644 --- a/application/src/main/java/org/thingsboard/server/controller/BaseController.java +++ b/application/src/main/java/org/thingsboard/server/controller/BaseController.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.controller; -import com.datastax.driver.core.utils.UUIDs; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -668,7 +667,7 @@ public abstract class BaseController { } } TbMsg tbMsg = TbMsg.newMsg(msgType, entityId, metaData, TbMsgDataType.JSON, json.writeValueAsString(entityNode)); - tbClusterService.onToRuleEngineMsg(user.getTenantId(), entityId, tbMsg); + tbClusterService.pushMsgToRuleEngine(user.getTenantId(), entityId, tbMsg, null); } catch (Exception e) { log.warn("[{}] Failed to push entity action to rule engine: {}", entityId, actionType, e); } diff --git a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java index afa394b98c..ca61ecc8e9 100644 --- a/application/src/main/java/org/thingsboard/server/controller/DeviceController.java +++ b/application/src/main/java/org/thingsboard/server/controller/DeviceController.java @@ -99,8 +99,8 @@ public class DeviceController extends BaseController { Device savedDevice = checkNotNull(deviceService.saveDeviceWithAccessToken(device, accessToken)); - tbClusterService.onToCoreMsg(new DeviceNameOrTypeUpdateMsg(savedDevice.getTenantId(), - savedDevice.getId(), savedDevice.getName(), savedDevice.getType())); + tbClusterService.pushMsgToCore(new DeviceNameOrTypeUpdateMsg(savedDevice.getTenantId(), + savedDevice.getId(), savedDevice.getName(), savedDevice.getType()), null); logEntityAction(savedDevice.getId(), savedDevice, savedDevice.getCustomerId(), @@ -254,7 +254,7 @@ public class DeviceController extends BaseController { Device device = checkDeviceId(deviceCredentials.getDeviceId(), Operation.WRITE_CREDENTIALS); DeviceCredentials result = checkNotNull(deviceCredentialsService.updateDeviceCredentials(getCurrentUser().getTenantId(), deviceCredentials)); - tbClusterService.onToCoreMsg(new DeviceCredentialsUpdateNotificationMsg(getCurrentUser().getTenantId(), deviceCredentials.getDeviceId())); + tbClusterService.pushMsgToCore(new DeviceCredentialsUpdateNotificationMsg(getCurrentUser().getTenantId(), deviceCredentials.getDeviceId()), null); logEntityAction(device.getId(), device, device.getCustomerId(), diff --git a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java index d0c4be2386..8352e3f457 100644 --- a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java +++ b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java @@ -364,8 +364,8 @@ public class TelemetryController extends BaseController { DeviceId deviceId = new DeviceId(entityId.getId()); Set keysToNotify = new HashSet<>(); keys.forEach(key -> keysToNotify.add(new AttributeKey(scope, key))); - tbClusterService.onToCoreMsg(DeviceAttributesEventNotificationMsg.onDelete( - user.getTenantId(), deviceId, keysToNotify)); + tbClusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onDelete( + user.getTenantId(), deviceId, keysToNotify), null); } result.setResult(new ResponseEntity<>(HttpStatus.OK)); } @@ -399,8 +399,8 @@ public class TelemetryController extends BaseController { logAttributesUpdated(user, entityId, scope, attributes, null); if (entityId.getEntityType() == EntityType.DEVICE) { DeviceId deviceId = new DeviceId(entityId.getId()); - tbClusterService.onToCoreMsg(DeviceAttributesEventNotificationMsg.onUpdate( - user.getTenantId(), deviceId, scope, attributes)); + tbClusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onUpdate( + user.getTenantId(), deviceId, scope, attributes), null); } result.setResult(new ResponseEntity(HttpStatus.OK)); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index f93657427d..f0054771d9 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -17,6 +17,8 @@ package org.thingsboard.server.service.queue; import com.google.protobuf.ByteString; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.thingsboard.rule.engine.api.msg.ToDeviceActorNotificationMsg; import org.thingsboard.server.common.data.EntityType; @@ -27,7 +29,13 @@ import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; -import org.thingsboard.server.gen.transport.TransportProtos.*; +import org.thingsboard.server.gen.transport.TransportProtos.FromDeviceRPCResponseProto; +import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; +import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; @@ -38,12 +46,22 @@ import org.thingsboard.server.service.rpc.FromDeviceRpcResponse; import java.util.HashSet; import java.util.Set; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; @Service @Slf4j public class DefaultTbClusterService implements TbClusterService { - protected TbQueueProducerProvider producerProvider; + @Value("${cluster.stats.enabled:false}") + private boolean statsEnabled; + + private final AtomicInteger toCoreMsgs = new AtomicInteger(0); + private final AtomicInteger toCoreNfs = new AtomicInteger(0); + private final AtomicInteger toRuleEngineMsgs = new AtomicInteger(0); + private final AtomicInteger toRuleEngineNfs = new AtomicInteger(0); + private final AtomicInteger toTransportNfs = new AtomicInteger(0); + + private final TbQueueProducerProvider producerProvider; private final PartitionService partitionService; private final DataDecodingEncodingService encodingService; @@ -54,7 +72,48 @@ public class DefaultTbClusterService implements TbClusterService { } @Override - public void onToRuleEngineMsg(TenantId tenantId, EntityId entityId, TbMsg tbMsg) { + public void pushMsgToCore(TenantId tenantId, EntityId entityId, ToCoreMsg msg, TbQueueCallback callback) { + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId); + producerProvider.getTbCoreMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), msg), callback); + toCoreMsgs.incrementAndGet(); + } + + @Override + public void pushMsgToCore(TopicPartitionInfo tpi, UUID msgId, ToCoreMsg msg, TbQueueCallback callback) { + producerProvider.getTbCoreMsgProducer().send(tpi, new TbProtoQueueMsg<>(msgId, msg), callback); + toCoreMsgs.incrementAndGet(); + } + + @Override + public void pushMsgToCore(ToDeviceActorNotificationMsg msg, TbQueueCallback callback) { + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, msg.getTenantId(), msg.getDeviceId()); + byte[] msgBytes = encodingService.encode(msg); + ToCoreMsg toCoreMsg = ToCoreMsg.newBuilder().setToDeviceActorNotificationMsg(ByteString.copyFrom(msgBytes)).build(); + producerProvider.getTbCoreMsgProducer().send(tpi, new TbProtoQueueMsg<>(msg.getDeviceId().getId(), toCoreMsg), callback); + toCoreMsgs.incrementAndGet(); + } + + @Override + public void pushNotificationToCore(String serviceId, FromDeviceRpcResponse response, TbQueueCallback callback) { + TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceId); + FromDeviceRPCResponseProto.Builder builder = FromDeviceRPCResponseProto.newBuilder() + .setRequestIdMSB(response.getId().getMostSignificantBits()) + .setRequestIdLSB(response.getId().getLeastSignificantBits()) + .setError(response.getError().isPresent() ? response.getError().get().ordinal() : -1); + response.getResponse().ifPresent(builder::setResponse); + ToCoreNotificationMsg msg = ToCoreNotificationMsg.newBuilder().setFromDeviceRpcResponse(builder).build(); + producerProvider.getTbCoreNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(response.getId(), msg), callback); + toCoreNfs.incrementAndGet(); + } + + @Override + public void pushMsgToRuleEngine(TopicPartitionInfo tpi, UUID msgId, ToRuleEngineMsg msg, TbQueueCallback callback) { + producerProvider.getRuleEngineMsgProducer().send(tpi, new TbProtoQueueMsg<>(msgId, msg), callback); + toRuleEngineMsgs.incrementAndGet(); + } + + @Override + public void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, TbMsg tbMsg, TbQueueCallback callback) { if (tenantId.isNullUid()) { if (entityId.getEntityType().equals(EntityType.TENANT)) { tenantId = new TenantId(entityId.getId()); @@ -68,31 +127,12 @@ public class DefaultTbClusterService implements TbClusterService { .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) .setTbMsg(TbMsg.toByteString(tbMsg)).build(); - producerProvider.getRuleEngineMsgProducer().send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), null); + producerProvider.getRuleEngineMsgProducer().send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), callback); + toRuleEngineMsgs.incrementAndGet(); } @Override - public void onToCoreMsg(ToDeviceActorNotificationMsg msg) { - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, msg.getTenantId(), msg.getDeviceId()); - byte[] msgBytes = encodingService.encode(msg); - ToCoreMsg toCoreMsg = ToCoreMsg.newBuilder().setToDeviceActorNotificationMsg(ByteString.copyFrom(msgBytes)).build(); - producerProvider.getTbCoreMsgProducer().send(tpi, new TbProtoQueueMsg<>(msg.getDeviceId().getId(), toCoreMsg), null); - } - - @Override - public void onToCoreMsg(String serviceId, FromDeviceRpcResponse response) { - TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceId); - FromDeviceRPCResponseProto.Builder builder = FromDeviceRPCResponseProto.newBuilder() - .setRequestIdMSB(response.getId().getMostSignificantBits()) - .setRequestIdLSB(response.getId().getLeastSignificantBits()) - .setError(response.getError().isPresent() ? response.getError().get().ordinal() : -1); - response.getResponse().ifPresent(builder::setResponse); - ToCoreNotificationMsg msg = ToCoreNotificationMsg.newBuilder().setFromDeviceRpcResponse(builder).build(); - producerProvider.getTbCoreNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(response.getId(), msg), null); - } - - @Override - public void onToRuleEngineMsg(String serviceId, FromDeviceRpcResponse response) { + public void pushNotificationToRuleEngine(String serviceId, FromDeviceRpcResponse response, TbQueueCallback callback) { TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceId); FromDeviceRPCResponseProto.Builder builder = FromDeviceRPCResponseProto.newBuilder() .setRequestIdMSB(response.getId().getMostSignificantBits()) @@ -100,13 +140,15 @@ public class DefaultTbClusterService implements TbClusterService { .setError(response.getError().isPresent() ? response.getError().get().ordinal() : -1); response.getResponse().ifPresent(builder::setResponse); ToRuleEngineNotificationMsg msg = ToRuleEngineNotificationMsg.newBuilder().setFromDeviceRpcResponse(builder).build(); - producerProvider.getRuleEngineNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(response.getId(), msg), null); + producerProvider.getRuleEngineNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(response.getId(), msg), callback); + toRuleEngineNfs.incrementAndGet(); } @Override - public void onToTransportMsg(String serviceId, ToTransportMsg response) { + public void pushNotificationToTransport(String serviceId, ToTransportMsg response, TbQueueCallback callback) { TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_TRANSPORT, serviceId); - producerProvider.getTransportNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), response), null); + producerProvider.getTransportNotificationsMsgProducer().send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), response), callback); + toTransportNfs.incrementAndGet(); } @Override @@ -120,12 +162,13 @@ public class DefaultTbClusterService implements TbClusterService { TbQueueProducer> toRuleEngineProducer = producerProvider.getRuleEngineNotificationsMsgProducer(); Set tbRuleEngineServices = new HashSet<>(partitionService.getAllServiceIds(ServiceType.TB_RULE_ENGINE)); if (msg.getEntityId().getEntityType().equals(EntityType.TENANT)) { - TbQueueProducer> toCoreProducer = producerProvider.getTbCoreNotificationsMsgProducer(); + TbQueueProducer> toCoreNfProducer = producerProvider.getTbCoreNotificationsMsgProducer(); Set tbCoreServices = partitionService.getAllServiceIds(ServiceType.TB_CORE); for (String serviceId : tbCoreServices) { TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_CORE, serviceId); ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setComponentLifecycleMsg(ByteString.copyFrom(msgBytes)).build(); - toCoreProducer.send(tpi, new TbProtoQueueMsg<>(msg.getEntityId().getId(), toCoreMsg), null); + toCoreNfProducer.send(tpi, new TbProtoQueueMsg<>(msg.getEntityId().getId(), toCoreMsg), null); + toCoreNfs.incrementAndGet(); } // No need to push notifications twice tbRuleEngineServices.removeAll(tbCoreServices); @@ -134,6 +177,22 @@ public class DefaultTbClusterService implements TbClusterService { TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceId); ToRuleEngineNotificationMsg toRuleEngineMsg = ToRuleEngineNotificationMsg.newBuilder().setComponentLifecycleMsg(ByteString.copyFrom(msgBytes)).build(); toRuleEngineProducer.send(tpi, new TbProtoQueueMsg<>(msg.getEntityId().getId(), toRuleEngineMsg), null); + toRuleEngineNfs.incrementAndGet(); + } + } + + @Scheduled(fixedDelayString = "${cluster.stats.print_interval_ms}") + public void printStats() { + if (statsEnabled) { + int toCoreMsgCnt = toCoreMsgs.getAndSet(0); + int toCoreNfsCnt = toCoreNfs.getAndSet(0); + int toRuleEngineMsgsCnt = toRuleEngineMsgs.getAndSet(0); + int toRuleEngineNfsCnt = toRuleEngineNfs.getAndSet(0); + int toTransportNfsCnt = toTransportNfs.getAndSet(0); + if (toCoreMsgCnt > 0 || toCoreNfsCnt > 0 || toRuleEngineMsgsCnt > 0 || toRuleEngineNfsCnt > 0 || toTransportNfsCnt > 0) { + log.info("To TbCore: [{}] messages [{}] notifications; To TbRuleEngine: [{}] messages [{}] notifications; To Transport: [{}] notifications", + toCoreMsgCnt, toCoreNfsCnt, toRuleEngineMsgsCnt, toRuleEngineNfsCnt, toTransportNfsCnt); + } } } } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index 263e6eab24..c943e4c4d0 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -183,21 +183,24 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService msg, TbCallback callback) { - ToCoreNotificationMsg toCoreMsg = msg.getValue(); - if (toCoreMsg.hasToLocalSubscriptionServiceMsg()) { - log.trace("[{}] Forwarding message to local subscription service {}", id, toCoreMsg.getToLocalSubscriptionServiceMsg()); - forwardToLocalSubMgrService(toCoreMsg.getToLocalSubscriptionServiceMsg(), callback); - } else if (toCoreMsg.hasFromDeviceRpcResponse()) { - log.trace("[{}] Forwarding message to RPC service {}", id, toCoreMsg.getFromDeviceRpcResponse()); - forwardToCoreRpcService(toCoreMsg.getFromDeviceRpcResponse(), callback); - } else if (toCoreMsg.getComponentLifecycleMsg() != null && !toCoreMsg.getComponentLifecycleMsg().isEmpty()) { - Optional actorMsg = encodingService.decode(toCoreMsg.getComponentLifecycleMsg().toByteArray()); + ToCoreNotificationMsg toCoreNotification = msg.getValue(); + if (toCoreNotification.hasToLocalSubscriptionServiceMsg()) { + log.trace("[{}] Forwarding message to local subscription service {}", id, toCoreNotification.getToLocalSubscriptionServiceMsg()); + forwardToLocalSubMgrService(toCoreNotification.getToLocalSubscriptionServiceMsg(), callback); + } else if (toCoreNotification.hasFromDeviceRpcResponse()) { + log.trace("[{}] Forwarding message to RPC service {}", id, toCoreNotification.getFromDeviceRpcResponse()); + forwardToCoreRpcService(toCoreNotification.getFromDeviceRpcResponse(), callback); + } else if (toCoreNotification.getComponentLifecycleMsg() != null && !toCoreNotification.getComponentLifecycleMsg().isEmpty()) { + Optional actorMsg = encodingService.decode(toCoreNotification.getComponentLifecycleMsg().toByteArray()); if (actorMsg.isPresent()) { log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get()); actorContext.tell(actorMsg.get(), ActorRef.noSender()); } callback.onSuccess(); } + if (statsEnabled) { + stats.log(toCoreNotification); + } } private void forwardToCoreRpcService(FromDeviceRPCResponseProto proto, TbCallback callback) { @@ -246,6 +249,9 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService 0) { - log.info("Transport total [{}] sessionEvents [{}] getAttr [{}] subToAttr [{}] subToRpc [{}] toDevRpc [{}] subInfo [{}] claimDevice [{}]", + log.info("Transport total [{}] sessionEvents [{}] getAttr [{}] subToAttr [{}] subToRpc [{}] toDevRpc [{}] subInfo [{}] claimDevice [{}]" + + " deviceState [{}] subMgr [{}] coreNfs [{}]", total, sessionEventCounter.getAndSet(0), getAttributesCounter.getAndSet(0), subscribeToAttributesCounter.getAndSet(0), subscribeToRPCCounter.getAndSet(0), toDeviceRPCCallResponseCounter.getAndSet(0), - subscriptionInfoCounter.getAndSet(0), claimDeviceCounter.getAndSet(0)); + subscriptionInfoCounter.getAndSet(0), claimDeviceCounter.getAndSet(0) + , deviceStateCounter.getAndSet(0), subscriptionMsgCounter.getAndSet(0), toCoreNotificationsCounter.getAndSet(0)); } } diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbCoreDeviceRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbCoreDeviceRpcService.java index 6d70056af0..8a1423f97c 100644 --- a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbCoreDeviceRpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbCoreDeviceRpcService.java @@ -16,7 +16,6 @@ package org.thingsboard.server.service.rpc; import akka.actor.ActorRef; -import com.datastax.driver.core.utils.UUIDs; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -147,7 +146,7 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService { log.warn("Failed to find tbCoreRpcService for local service. Possible duplication of serviceIds."); } } else { - clusterService.onToRuleEngineMsg(originServiceId, response); + clusterService.pushNotificationToRuleEngine(originServiceId, response, null); } } @@ -170,7 +169,7 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService { try { TbMsg tbMsg = TbMsg.newMsg(DataConstants.RPC_CALL_FROM_SERVER_TO_DEVICE, msg.getDeviceId(), metaData, TbMsgDataType.JSON, json.writeValueAsString(entityNode)); - clusterService.onToRuleEngineMsg(msg.getTenantId(), msg.getDeviceId(), tbMsg); + clusterService.pushMsgToRuleEngine(msg.getTenantId(), msg.getDeviceId(), tbMsg, null); } catch (JsonProcessingException e) { throw new RuntimeException(e); } diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java index 909690ff6c..0ec730b7dc 100644 --- a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java @@ -22,7 +22,6 @@ import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.rule.engine.api.RpcError; import org.thingsboard.rule.engine.api.RuleEngineDeviceRpcRequest; import org.thingsboard.rule.engine.api.RuleEngineDeviceRpcResponse; -import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; @@ -96,7 +95,7 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi .setSessionIdLSB(sessionId.getLeastSignificantBits()) .setToServerResponse(responseMsg) .build(); - clusterService.onToTransportMsg(serviceId, msg); + clusterService.pushNotificationToTransport(serviceId, msg, null); } @Override @@ -148,7 +147,7 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi } } else { log.trace("[{}] Forwarding msg {} to queue actor!", msg.getDeviceId(), msg); - clusterService.onToCoreMsg(rpcMsg); + clusterService.pushMsgToCore(rpcMsg, null); } } @@ -160,7 +159,7 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi log.warn("Failed to find tbCoreRpcService for local service. Possible duplication of serviceIds."); } } else { - clusterService.onToCoreMsg(originServiceId, response); + clusterService.pushNotificationToCore(originServiceId, response, null); } } diff --git a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java index 87f46f2072..1e3ab73a26 100644 --- a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java @@ -29,7 +29,6 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; import org.thingsboard.common.util.ThingsBoardThreadFactory; -import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.Tenant; @@ -56,8 +55,8 @@ import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.common.msg.queue.TbCallback; -import org.thingsboard.server.queue.provider.TbQueueProducerProvider; import org.thingsboard.server.queue.util.TbCoreComponent; +import org.thingsboard.server.service.queue.TbClusterService; import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; import javax.annotation.Nullable; @@ -106,7 +105,7 @@ public class DefaultDeviceStateService implements DeviceStateService { private final DeviceService deviceService; private final AttributesService attributesService; private final TimeseriesService tsService; - private final TbQueueProducerProvider producerProvider; + private final TbClusterService clusterService; private final PartitionService partitionService; private TelemetrySubscriptionService tsSubService; @@ -137,12 +136,12 @@ public class DefaultDeviceStateService implements DeviceStateService { public DefaultDeviceStateService(TenantService tenantService, DeviceService deviceService, AttributesService attributesService, TimeseriesService tsService, - TbQueueProducerProvider producerProvider, PartitionService partitionService) { + TbClusterService clusterService, PartitionService partitionService) { this.tenantService = tenantService; this.deviceService = deviceService; this.attributesService = attributesService; this.tsService = tsService; - this.producerProvider = producerProvider; + this.clusterService = clusterService; this.partitionService = partitionService; } @@ -413,8 +412,6 @@ public class DefaultDeviceStateService implements DeviceStateService { } private void sendDeviceEvent(TenantId tenantId, DeviceId deviceId, boolean added, boolean updated, boolean deleted) { - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId); - log.trace("[{}][{}] Device is monitored on partition: {}", tenantId, deviceId, tpi); TransportProtos.DeviceStateServiceMsgProto.Builder builder = TransportProtos.DeviceStateServiceMsgProto.newBuilder(); builder.setTenantIdMSB(tenantId.getId().getMostSignificantBits()); builder.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()); @@ -424,8 +421,7 @@ public class DefaultDeviceStateService implements DeviceStateService { builder.setUpdated(updated); builder.setDeleted(deleted); TransportProtos.DeviceStateServiceMsgProto msg = builder.build(); - producerProvider.getTbCoreMsgProducer().send(tpi, new TbProtoQueueMsg<>(deviceId.getId(), - TransportProtos.ToCoreMsg.newBuilder().setDeviceStateServiceMsg(msg).build()), null); + clusterService.pushMsgToCore(tenantId, deviceId, TransportProtos.ToCoreMsg.newBuilder().setDeviceStateServiceMsg(msg).build(), null); } private void onDeviceDeleted(TenantId tenantId, DeviceId deviceId) { @@ -497,12 +493,7 @@ public class DefaultDeviceStateService implements DeviceStateService { try { TbMsg tbMsg = TbMsg.newMsg(msgType, stateData.getDeviceId(), stateData.getMetaData().copy(), TbMsgDataType.JSON , json.writeValueAsString(state)); - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, stateData.getTenantId(), stateData.getDeviceId()); - TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder() - .setTenantIdMSB(stateData.getTenantId().getId().getMostSignificantBits()) - .setTenantIdLSB(stateData.getTenantId().getId().getLeastSignificantBits()) - .setTbMsg(TbMsg.toByteString(tbMsg)).build(); - producerProvider.getRuleEngineMsgProducer().send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), null); + clusterService.pushMsgToRuleEngine(stateData.getTenantId(), stateData.getDeviceId(), tbMsg, null); } catch (Exception e) { log.warn("[{}] Failed to push inactivity alarm: {}", stateData.getDeviceId(), state, e); } diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java index e841acae12..67c81ac06f 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java @@ -245,8 +245,9 @@ public class DefaultSubscriptionManagerService implements SubscriptionManagerSer } } } else if (TbAttributeSubscriptionScope.SHARED_SCOPE.name().equalsIgnoreCase(scope)) { - clusterService.onToCoreMsg(DeviceAttributesEventNotificationMsg.onUpdate(tenantId, - new DeviceId(entityId.getId()), DataConstants.SHARED_SCOPE, new ArrayList<>(attributes))); + clusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onUpdate(tenantId, + new DeviceId(entityId.getId()), DataConstants.SHARED_SCOPE, new ArrayList<>(attributes)) + , null); } } callback.onSuccess(); diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java index 12c5e206e3..d57067fa28 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java @@ -28,16 +28,14 @@ import org.thingsboard.server.common.data.id.EntityViewId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.dao.entityview.EntityViewService; import org.thingsboard.server.gen.transport.TransportProtos; -import org.thingsboard.server.queue.TbQueueProducer; -import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.ClusterTopologyChangeEvent; import org.thingsboard.server.queue.discovery.PartitionChangeEvent; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.queue.TbCallback; -import org.thingsboard.server.queue.provider.TbQueueProducerProvider; import org.thingsboard.server.queue.util.TbCoreComponent; +import org.thingsboard.server.service.queue.TbClusterService; import org.thingsboard.server.service.telemetry.TelemetryWebSocketService; import org.thingsboard.server.service.telemetry.sub.SubscriptionUpdate; @@ -70,19 +68,17 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer private PartitionService partitionService; @Autowired - private TbQueueProducerProvider producerProvider; + private TbClusterService clusterService; @Autowired @Lazy private SubscriptionManagerService subscriptionManagerService; private ExecutorService wsCallBackExecutor; - private TbQueueProducer> toCoreProducer; @PostConstruct public void initExecutor() { wsCallBackExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("ws-sub-callback")); - toCoreProducer = producerProvider.getTbCoreMsgProducer(); } @PreDestroy @@ -140,7 +136,7 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer } else { // Push to the queue; TransportProtos.ToCoreMsg toCoreMsg = TbSubscriptionUtils.toNewSubscriptionProto(subscription); - toCoreProducer.send(tpi, new TbProtoQueueMsg<>(subscription.getEntityId().getId(), toCoreMsg), null); + clusterService.pushMsgToCore(tpi, subscription.getEntityId().getId(), toCoreMsg, null); } } @@ -181,7 +177,7 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer } else { // Push to the queue; TransportProtos.ToCoreMsg toCoreMsg = TbSubscriptionUtils.toCloseSubscriptionProto(subscription); - toCoreProducer.send(tpi, new TbProtoQueueMsg<>(subscription.getEntityId().getId(), toCoreMsg), null); + clusterService.pushMsgToCore(tpi, subscription.getEntityId().getId(), toCoreMsg, null); } } else { log.debug("[{}][{}] Subscription not found!", sessionId, subscriptionId); diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index b3c728a25b..665a787e6d 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -32,17 +32,15 @@ import org.thingsboard.server.common.data.kv.DoubleDataEntry; import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.msg.queue.ServiceType; +import org.thingsboard.server.common.msg.queue.TbCallback; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.gen.transport.TransportProtos; -import org.thingsboard.server.queue.TbQueueProducer; -import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionChangeEvent; import org.thingsboard.server.queue.discovery.PartitionService; -import org.thingsboard.server.common.msg.queue.ServiceType; -import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; -import org.thingsboard.server.common.msg.queue.TbCallback; -import org.thingsboard.server.queue.provider.TbQueueProducerProvider; +import org.thingsboard.server.service.queue.TbClusterService; import org.thingsboard.server.service.subscription.SubscriptionManagerService; import org.thingsboard.server.service.subscription.TbSubscriptionUtils; @@ -69,22 +67,20 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio private final AttributesService attrService; private final TimeseriesService tsService; - private final TbQueueProducerProvider producerProvider; + private final TbClusterService clusterService; private final PartitionService partitionService; private Optional subscriptionManagerService; - private TbQueueProducer> toCoreProducer; - private ExecutorService tsCallBackExecutor; private ExecutorService wsCallBackExecutor; public DefaultTelemetrySubscriptionService(AttributesService attrService, TimeseriesService tsService, - TbQueueProducerProvider producerProvider, + TbClusterService clusterService, PartitionService partitionService) { this.attrService = attrService; this.tsService = tsService; - this.producerProvider = producerProvider; + this.clusterService = clusterService; this.partitionService = partitionService; } @@ -97,7 +93,6 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio public void initExecutor() { tsCallBackExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("ts-service-ts-callback")); wsCallBackExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("ts-service-ws-callback")); - toCoreProducer = producerProvider.getTbCoreMsgProducer(); } @PreDestroy @@ -172,7 +167,7 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio } } else { TransportProtos.ToCoreMsg toCoreMsg = TbSubscriptionUtils.toAttributesUpdateProto(tenantId, entityId, scope, attributes); - toCoreProducer.send(tpi, new TbProtoQueueMsg<>(entityId.getId(), toCoreMsg), null); + clusterService.pushMsgToCore(tpi, entityId.getId(), toCoreMsg, null); } } @@ -186,7 +181,7 @@ public class DefaultTelemetrySubscriptionService implements TelemetrySubscriptio } } else { TransportProtos.ToCoreMsg toCoreMsg = TbSubscriptionUtils.toTimeseriesUpdateProto(tenantId, entityId, ts); - toCoreProducer.send(tpi, new TbProtoQueueMsg<>(entityId.getId(), toCoreMsg), null); + clusterService.pushMsgToCore(tpi, entityId.getId(), toCoreMsg, null); } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbTransportQueueProducerProvider.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbTransportQueueProducerProvider.java index 0ebd59ec24..bced3f83ac 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbTransportQueueProducerProvider.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/TbTransportQueueProducerProvider.java @@ -23,13 +23,11 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg; import javax.annotation.PostConstruct; -//TODO 2.5 Maybe remove this service if it is not used. @Service @ConditionalOnExpression("'${service.type:null}'=='tb-transport'") public class TbTransportQueueProducerProvider implements TbQueueProducerProvider { private final TbTransportQueueFactory tbQueueProvider; - private TbQueueProducer> toTransport; private TbQueueProducer> toRuleEngine; private TbQueueProducer> toTbCore; diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java index 4299aeb867..daab510102 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java @@ -211,7 +211,7 @@ public interface TbContext { ResultSetFuture submitCassandraTask(CassandraStatementTask task); - //TODO 2.5: - need to remove this. + @Deprecated RedisTemplate getRedisTemplate(); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbMsgCountNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbMsgCountNode.java index 10e664bc16..19d8515455 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbMsgCountNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbMsgCountNode.java @@ -75,7 +75,6 @@ public class TbMsgCountNode implements TbNode { TbMsgMetaData metaData = new TbMsgMetaData(); metaData.putValue("delta", Long.toString(System.currentTimeMillis() - lastScheduledTs + delay)); - //TODO 2.5: Callback? TbMsg tbMsg = TbMsg.newMsg(SessionMsgType.POST_TELEMETRY_REQUEST.name(), ctx.getTenantId(), metaData, gson.toJson(telemetryJson)); ctx.enqueueForTellNext(tbMsg, SUCCESS); scheduleTickMsg(ctx); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbHttpClient.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbHttpClient.java index 5b30f4792c..ba860c103b 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbHttpClient.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbHttpClient.java @@ -23,7 +23,6 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.http.HttpEntity; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; -import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.http.client.Netty4ClientHttpRequestFactory; import org.springframework.util.concurrent.ListenableFuture; @@ -84,7 +83,7 @@ class TbHttpClient { } } - void processMessage(TbContext ctx, TbMsg msg, TbRedisQueueProcessor queueProcessor) { + void processMessage(TbContext ctx, TbMsg msg) { String endpointUrl = TbNodeUtils.processPattern(config.getRestEndpointUrlPattern(), msg.getMetaData()); HttpHeaders headers = prepareHeaders(msg.getMetaData()); HttpMethod method = HttpMethod.valueOf(config.getRequestMethod()); @@ -95,13 +94,6 @@ class TbHttpClient { future.addCallback(new ListenableFutureCallback>() { @Override public void onFailure(Throwable throwable) { - if (config.isUseRedisQueueForMsgPersistence()) { - if (throwable instanceof HttpClientErrorException) { - processHttpClientError(((HttpClientErrorException) throwable).getStatusCode(), msg, queueProcessor); - } else { - queueProcessor.pushOnFailure(msg); - } - } TbMsg next = processException(ctx, msg, throwable); ctx.tellFailure(next, throwable); } @@ -109,15 +101,9 @@ class TbHttpClient { @Override public void onSuccess(ResponseEntity responseEntity) { if (responseEntity.getStatusCode().is2xxSuccessful()) { - if (config.isUseRedisQueueForMsgPersistence()) { - queueProcessor.resetCounter(); - } TbMsg next = processResponse(ctx, msg, responseEntity); ctx.tellSuccess(next); } else { - if (config.isUseRedisQueueForMsgPersistence()) { - processHttpClientError(responseEntity.getStatusCode(), msg, queueProcessor); - } TbMsg next = processFailureResponse(ctx, msg, responseEntity); ctx.tellNext(next, TbRelationTypes.FAILURE); } @@ -183,11 +169,4 @@ class TbHttpClient { } } - private void processHttpClientError(HttpStatus statusCode, TbMsg msg, TbRedisQueueProcessor queueProcessor) { - if (statusCode.is4xxClientError()) { - log.warn("[{}] Client error during message delivering!", msg); - } else { - queueProcessor.pushOnFailure(msg); - } - } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbRedisQueueProcessor.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbRedisQueueProcessor.java deleted file mode 100644 index 015cced6c6..0000000000 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbRedisQueueProcessor.java +++ /dev/null @@ -1,126 +0,0 @@ -/** - * Copyright © 2016-2020 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.rule.engine.rest; - -import lombok.Data; -import lombok.extern.slf4j.Slf4j; -import org.springframework.data.redis.core.ListOperations; -import org.thingsboard.rule.engine.api.TbContext; -import org.thingsboard.server.common.msg.TbMsg; - -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -@Data -@Slf4j -class TbRedisQueueProcessor { - - private static final int MAX_QUEUE_SIZE = Integer.MAX_VALUE; - - private final TbContext ctx; - private final TbHttpClient httpClient; - private final ExecutorService executor; - private final ListOperations listOperations; - private final String redisKey; - private final boolean trimQueue; - private final int maxQueueSize; - - private AtomicInteger failuresCounter; - private Future future; - - TbRedisQueueProcessor(TbContext ctx, TbHttpClient httpClient, boolean trimQueue, int maxQueueSize) { - this.ctx = ctx; - this.httpClient = httpClient; - this.executor = Executors.newSingleThreadExecutor(); - this.listOperations = ctx.getRedisTemplate().opsForList(); - this.redisKey = constructRedisKey(); - this.trimQueue = trimQueue; - this.maxQueueSize = maxQueueSize; - init(); - } - - private void init() { - failuresCounter = new AtomicInteger(0); - future = executor.submit(() -> { - while (true) { - if (failuresCounter.get() != 0 && failuresCounter.get() % 50 == 0) { - sleep("Target HTTP server is down...", 3); - } - if (listOperations.size(redisKey) > 0) { - List list = listOperations.range(redisKey, -10, -1); - list.forEach(obj -> { - //TODO 2.5: Callback? - TbMsg msg = TbMsg.fromBytes((byte[]) obj, null); - log.debug("Trying to send the message: {}", msg); - listOperations.remove(redisKey, -1, obj); - httpClient.processMessage(ctx, msg, this); - }); - } else { - sleep("Queue is empty, waiting for tasks!", 1); - } - } - }); - } - - void destroy() { - if (future != null) { - future.cancel(true); - } - if (executor != null) { - executor.shutdownNow(); - } - } - - void push(TbMsg msg) { - listOperations.leftPush(redisKey, TbMsg.toByteArray(msg)); - if (trimQueue) { - listOperations.trim(redisKey, 0, validateMaxQueueSize()); - } - } - - void pushOnFailure(TbMsg msg) { - listOperations.rightPush(redisKey, TbMsg.toByteArray(msg)); - failuresCounter.incrementAndGet(); - } - - void resetCounter() { - failuresCounter.set(0); - } - - private String constructRedisKey() { - return ctx.getServiceId() + ctx.getSelfId(); - } - - private int validateMaxQueueSize() { - if (maxQueueSize != 0) { - return maxQueueSize; - } - return MAX_QUEUE_SIZE; - } - - private void sleep(String logMessage, int sleepSeconds) { - try { - log.debug(logMessage); - TimeUnit.SECONDS.sleep(sleepSeconds); - } catch (InterruptedException e) { - throw new IllegalStateException("Thread interrupted!", e); - } - } -} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbRestApiCallNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbRestApiCallNode.java index 9b9d275977..9cb171d0dc 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbRestApiCallNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rest/TbRestApiCallNode.java @@ -25,8 +25,6 @@ import org.thingsboard.rule.engine.api.util.TbNodeUtils; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.msg.TbMsg; -import java.util.concurrent.ExecutionException; - @Slf4j @RuleNode( type = ComponentType.EXTERNAL, @@ -47,7 +45,6 @@ public class TbRestApiCallNode implements TbNode { private boolean useRedisQueueForMsgPersistence; private TbHttpClient httpClient; - private TbRedisQueueProcessor queueProcessor; @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { @@ -55,20 +52,13 @@ public class TbRestApiCallNode implements TbNode { httpClient = new TbHttpClient(config); useRedisQueueForMsgPersistence = config.isUseRedisQueueForMsgPersistence(); if (useRedisQueueForMsgPersistence) { - if (ctx.getRedisTemplate() == null) { - throw new RuntimeException("Redis cache type must be used!"); - } - queueProcessor = new TbRedisQueueProcessor(ctx, httpClient, config.isTrimQueue(), config.getMaxQueueSize()); + log.warn("[{}][{}] Usage of Redis Template is deprecated starting 2.5 and will have no affect", ctx.getTenantId(), ctx.getSelfId()); } } @Override - public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException, TbNodeException { - if (useRedisQueueForMsgPersistence) { - queueProcessor.push(msg); - } else { - httpClient.processMessage(ctx, msg, null); - } + public void onMsg(TbContext ctx, TbMsg msg) { + httpClient.processMessage(ctx, msg); } @Override @@ -76,9 +66,6 @@ public class TbRestApiCallNode implements TbNode { if (this.httpClient != null) { this.httpClient.destroy(); } - if (this.queueProcessor != null) { - this.queueProcessor.destroy(); - } } }