diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index 5d59955e2b..9a331bb208 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -156,11 +156,15 @@ public class DefaultTbClusterService implements TbClusterService { private TbMsg transformMsg(TbMsg tbMsg, DeviceProfile deviceProfile) { if (deviceProfile != null) { RuleChainId targetRuleChainId = deviceProfile.getDefaultRuleChainId(); - if (targetRuleChainId != null && !targetRuleChainId.equals(tbMsg.getRuleChainId())) { - tbMsg = TbMsg.transformMsg(tbMsg, targetRuleChainId); - } String targetQueueName = deviceProfile.getDefaultQueueName(); - if (targetQueueName != null && !targetQueueName.equals(tbMsg.getQueueName())) { + boolean isRuleChainTransform = targetRuleChainId != null && !targetRuleChainId.equals(tbMsg.getRuleChainId()); + boolean isQueueTransform = targetQueueName != null && !targetQueueName.equals(tbMsg.getQueueName()); + + if (isRuleChainTransform && isQueueTransform) { + tbMsg = TbMsg.transformMsg(tbMsg, targetRuleChainId, targetQueueName); + } else if (isRuleChainTransform) { + tbMsg = TbMsg.transformMsg(tbMsg, targetRuleChainId); + } else if (isQueueTransform) { tbMsg = TbMsg.transformMsg(tbMsg, targetQueueName); } } diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java index 5ff8527327..844a4d0216 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java @@ -95,6 +95,11 @@ public final class TbMsg implements Serializable { origMsg.data, origMsg.getRuleChainId(), null, origMsg.getCallback()); } + public static TbMsg transformMsg(TbMsg origMsg, RuleChainId ruleChainId, String queueName) { + return new TbMsg(queueName, origMsg.id, origMsg.ts, origMsg.type, origMsg.originator, origMsg.metaData, origMsg.dataType, + origMsg.data, ruleChainId, null, origMsg.getCallback()); + } + public static TbMsg newMsg(TbMsg tbMsg, RuleChainId ruleChainId, RuleNodeId ruleNodeId) { return new TbMsg(tbMsg.getQueueName(), UUID.randomUUID(), tbMsg.getTs(), tbMsg.getType(), tbMsg.getOriginator(), tbMsg.getMetaData().copy(), tbMsg.getDataType(), tbMsg.getData(), ruleChainId, ruleNodeId, TbMsgCallback.EMPTY); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index 3f37685383..aabb8c927e 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -23,7 +23,6 @@ import com.google.gson.JsonObject; import com.google.protobuf.ByteString; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.stereotype.Service; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.data.DeviceProfile; @@ -53,21 +52,46 @@ import org.thingsboard.server.common.transport.TransportTenantProfileCache; import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse; import org.thingsboard.server.common.transport.auth.TransportDeviceInfo; import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsResponse; -import org.thingsboard.server.common.transport.limits.TransportRateLimit; import org.thingsboard.server.common.transport.limits.TransportRateLimitService; import org.thingsboard.server.common.transport.limits.TransportRateLimitType; import org.thingsboard.server.common.transport.profile.TenantProfileUpdateResult; import org.thingsboard.server.common.transport.util.DataDecodingEncodingService; import org.thingsboard.server.common.transport.util.JsonUtils; -import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg; +import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto; +import org.thingsboard.server.gen.transport.TransportProtos.EntityDeleteMsg; +import org.thingsboard.server.gen.transport.TransportProtos.EntityUpdateMsg; +import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestMsg; +import org.thingsboard.server.gen.transport.TransportProtos.GetEntityProfileRequestMsg; +import org.thingsboard.server.gen.transport.TransportProtos.GetEntityProfileResponseMsg; +import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg; +import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg; +import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg; +import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg; import org.thingsboard.server.gen.transport.TransportProtos.ProvisionDeviceRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.ProvisionDeviceResponseMsg; +import org.thingsboard.server.gen.transport.TransportProtos.SessionCloseNotificationProto; +import org.thingsboard.server.gen.transport.TransportProtos.SessionEvent; +import org.thingsboard.server.gen.transport.TransportProtos.SessionEventMsg; +import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto; +import org.thingsboard.server.gen.transport.TransportProtos.SessionType; +import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToAttributeUpdatesMsg; +import org.thingsboard.server.gen.transport.TransportProtos.SubscribeToRPCMsg; +import org.thingsboard.server.gen.transport.TransportProtos.SubscriptionInfoProto; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToDeviceRpcResponseMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcRequestMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ToServerRpcResponseMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg; +import org.thingsboard.server.gen.transport.TransportProtos.TsKvListProto; +import org.thingsboard.server.gen.transport.TransportProtos.ValidateBasicMqttCredRequestMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg; +import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg; import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueMsgMetadata; @@ -237,14 +261,14 @@ public class DefaultTransportService implements TransportService { } @Override - public void registerAsyncSession(TransportProtos.SessionInfoProto sessionInfo, SessionMsgListener listener) { - sessions.putIfAbsent(toSessionId(sessionInfo), new SessionMetaData(sessionInfo, TransportProtos.SessionType.ASYNC, listener)); + public void registerAsyncSession(SessionInfoProto sessionInfo, SessionMsgListener listener) { + sessions.putIfAbsent(toSessionId(sessionInfo), new SessionMetaData(sessionInfo, SessionType.ASYNC, listener)); } @Override - public TransportProtos.GetEntityProfileResponseMsg getRoutingInfo(TransportProtos.GetEntityProfileRequestMsg msg) { - TbProtoQueueMsg protoMsg = - new TbProtoQueueMsg<>(UUID.randomUUID(), TransportProtos.TransportApiRequestMsg.newBuilder().setEntityProfileRequestMsg(msg).build()); + public GetEntityProfileResponseMsg getRoutingInfo(GetEntityProfileRequestMsg msg) { + TbProtoQueueMsg protoMsg = + new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setEntityProfileRequestMsg(msg).build()); try { TbProtoQueueMsg response = transportApiRequestTemplate.send(protoMsg).get(); return response.getValue().getEntityProfileResponseMsg(); @@ -254,7 +278,7 @@ public class DefaultTransportService implements TransportService { } @Override - public void process(DeviceTransportType transportType, TransportProtos.ValidateDeviceTokenRequestMsg msg, + public void process(DeviceTransportType transportType, ValidateDeviceTokenRequestMsg msg, TransportServiceCallback callback) { log.trace("Processing msg: {}", msg); TbProtoQueueMsg protoMsg = new TbProtoQueueMsg<>(UUID.randomUUID(), @@ -263,7 +287,7 @@ public class DefaultTransportService implements TransportService { } @Override - public void process(DeviceTransportType transportType, TransportProtos.ValidateBasicMqttCredRequestMsg msg, + public void process(DeviceTransportType transportType, ValidateBasicMqttCredRequestMsg msg, TransportServiceCallback callback) { log.trace("Processing msg: {}", msg); TbProtoQueueMsg protoMsg = new TbProtoQueueMsg<>(UUID.randomUUID(), @@ -272,7 +296,7 @@ public class DefaultTransportService implements TransportService { } @Override - public void process(DeviceTransportType transportType, TransportProtos.ValidateDeviceX509CertRequestMsg msg, TransportServiceCallback callback) { + public void process(DeviceTransportType transportType, ValidateDeviceX509CertRequestMsg msg, TransportServiceCallback callback) { log.trace("Processing msg: {}", msg); TbProtoQueueMsg protoMsg = new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setValidateX509CertRequestMsg(msg).build()); doProcess(transportType, protoMsg, callback); @@ -281,7 +305,7 @@ public class DefaultTransportService implements TransportService { private void doProcess(DeviceTransportType transportType, TbProtoQueueMsg protoMsg, TransportServiceCallback callback) { ListenableFuture response = Futures.transform(transportApiRequestTemplate.send(protoMsg), tmp -> { - TransportProtos.ValidateDeviceCredentialsResponseMsg msg = tmp.getValue().getValidateCredResponseMsg(); + ValidateDeviceCredentialsResponseMsg msg = tmp.getValue().getValidateCredResponseMsg(); ValidateDeviceCredentialsResponse.ValidateDeviceCredentialsResponseBuilder result = ValidateDeviceCredentialsResponse.builder(); if (msg.hasDeviceInfo()) { result.credentials(msg.getCredentialsBody()); @@ -304,11 +328,11 @@ public class DefaultTransportService implements TransportService { } @Override - public void process(TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg requestMsg, TransportServiceCallback callback) { + public void process(GetOrCreateDeviceFromGatewayRequestMsg requestMsg, TransportServiceCallback callback) { TbProtoQueueMsg protoMsg = new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setGetOrCreateDeviceRequestMsg(requestMsg).build()); log.trace("Processing msg: {}", requestMsg); ListenableFuture response = Futures.transform(transportApiRequestTemplate.send(protoMsg), tmp -> { - TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg msg = tmp.getValue().getGetOrCreateDeviceResponseMsg(); + GetOrCreateDeviceFromGatewayResponseMsg msg = tmp.getValue().getGetOrCreateDeviceResponseMsg(); GetOrCreateDeviceFromGatewayResponse.GetOrCreateDeviceFromGatewayResponseBuilder result = GetOrCreateDeviceFromGatewayResponse.builder(); if (msg.hasDeviceInfo()) { TransportDeviceInfo tdi = getTransportDeviceInfo(msg.getDeviceInfo()); @@ -323,7 +347,7 @@ public class DefaultTransportService implements TransportService { AsyncCallbackTemplate.withCallback(response, callback::onSuccess, callback::onError, transportCallbackExecutor); } - private TransportDeviceInfo getTransportDeviceInfo(TransportProtos.DeviceInfoProto di) { + private TransportDeviceInfo getTransportDeviceInfo(DeviceInfoProto di) { TransportDeviceInfo tdi = new TransportDeviceInfo(); tdi.setTenantId(new TenantId(new UUID(di.getTenantIdMSB(), di.getTenantIdLSB()))); tdi.setDeviceId(new DeviceId(new UUID(di.getDeviceIdMSB(), di.getDeviceIdLSB()))); @@ -345,7 +369,7 @@ public class DefaultTransportService implements TransportService { } @Override - public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscriptionInfoProto msg, TransportServiceCallback callback) { + public void process(SessionInfoProto sessionInfo, SubscriptionInfoProto msg, TransportServiceCallback callback) { if (log.isTraceEnabled()) { log.trace("[{}] Processing msg: {}", toSessionId(sessionInfo), msg); } @@ -354,7 +378,7 @@ public class DefaultTransportService implements TransportService { } @Override - public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SessionEventMsg msg, TransportServiceCallback callback) { + public void process(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback callback) { if (checkLimits(sessionInfo, msg, callback)) { reportActivityInternal(sessionInfo); sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) @@ -363,9 +387,9 @@ public class DefaultTransportService implements TransportService { } @Override - public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostTelemetryMsg msg, TransportServiceCallback callback) { + public void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback callback) { int dataPoints = 0; - for (TransportProtos.TsKvListProto tsKv : msg.getTsKvListList()) { + for (TsKvListProto tsKv : msg.getTsKvListList()) { dataPoints += tsKv.getKvCount(); } if (checkLimits(sessionInfo, msg, callback, dataPoints, TELEMETRY)) { @@ -373,22 +397,19 @@ public class DefaultTransportService implements TransportService { TenantId tenantId = new TenantId(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB())); DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB())); MsgPackCallback packCallback = new MsgPackCallback(msg.getTsKvListCount(), callback); - for (TransportProtos.TsKvListProto tsKv : msg.getTsKvListList()) { + for (TsKvListProto tsKv : msg.getTsKvListList()) { TbMsgMetaData metaData = new TbMsgMetaData(); metaData.putValue("deviceName", sessionInfo.getDeviceName()); metaData.putValue("deviceType", sessionInfo.getDeviceType()); metaData.putValue("ts", tsKv.getTs() + ""); JsonObject json = JsonUtils.getJsonObject(tsKv.getKvList()); - RuleChainId ruleChainId = resolveRuleChainId(sessionInfo); - TbMsg tbMsg = TbMsg.newMsg(ServiceQueue.MAIN, SessionMsgType.POST_TELEMETRY_REQUEST.name(), - deviceId, metaData, gson.toJson(json), ruleChainId, null); - sendToRuleEngine(tenantId, tbMsg, packCallback); + sendToRuleEngine(tenantId, deviceId, sessionInfo, json, metaData, SessionMsgType.POST_TELEMETRY_REQUEST, packCallback); } } } @Override - public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostAttributeMsg msg, TransportServiceCallback callback) { + public void process(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback callback) { if (checkLimits(sessionInfo, msg, callback, msg.getKvCount(), TELEMETRY)) { reportActivityInternal(sessionInfo); TenantId tenantId = new TenantId(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB())); @@ -398,15 +419,12 @@ public class DefaultTransportService implements TransportService { metaData.putValue("deviceName", sessionInfo.getDeviceName()); metaData.putValue("deviceType", sessionInfo.getDeviceType()); metaData.putValue("notifyDevice", "false"); - RuleChainId ruleChainId = resolveRuleChainId(sessionInfo); - TbMsg tbMsg = TbMsg.newMsg(ServiceQueue.MAIN, SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), - deviceId, metaData, gson.toJson(json), ruleChainId, null); - sendToRuleEngine(tenantId, tbMsg, new TransportTbQueueCallback(callback)); + sendToRuleEngine(tenantId, deviceId, sessionInfo, json, metaData, SessionMsgType.POST_ATTRIBUTES_REQUEST, new TransportTbQueueCallback(callback)); } } @Override - public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.GetAttributeRequestMsg msg, TransportServiceCallback callback) { + public void process(SessionInfoProto sessionInfo, GetAttributeRequestMsg msg, TransportServiceCallback callback) { if (checkLimits(sessionInfo, msg, callback)) { reportActivityInternal(sessionInfo); sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) @@ -415,7 +433,7 @@ public class DefaultTransportService implements TransportService { } @Override - public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback callback) { + public void process(SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg msg, TransportServiceCallback callback) { if (checkLimits(sessionInfo, msg, callback)) { SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo); sessionMetaData.setSubscribedToAttributes(!msg.getUnsubscribe()); @@ -425,7 +443,7 @@ public class DefaultTransportService implements TransportService { } @Override - public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.SubscribeToRPCMsg msg, TransportServiceCallback callback) { + public void process(SessionInfoProto sessionInfo, SubscribeToRPCMsg msg, TransportServiceCallback callback) { if (checkLimits(sessionInfo, msg, callback)) { SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo); sessionMetaData.setSubscribedToRPC(!msg.getUnsubscribe()); @@ -435,7 +453,7 @@ public class DefaultTransportService implements TransportService { } @Override - public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcResponseMsg msg, TransportServiceCallback callback) { + public void process(SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg msg, TransportServiceCallback callback) { if (checkLimits(sessionInfo, msg, callback)) { reportActivityInternal(sessionInfo); sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) @@ -450,13 +468,13 @@ public class DefaultTransportService implements TransportService { if (md != null) { SessionMsgListener listener = md.getListener(); transportCallbackExecutor.submit(() -> { - TransportProtos.ToServerRpcResponseMsg responseMsg = - TransportProtos.ToServerRpcResponseMsg.newBuilder() + ToServerRpcResponseMsg responseMsg = + ToServerRpcResponseMsg.newBuilder() .setRequestId(data.getRequestId()) .setError("timeout").build(); listener.onToServerRpcResponse(responseMsg); }); - if (md.getSessionType() == TransportProtos.SessionType.SYNC) { + if (md.getSessionType() == SessionType.SYNC) { deregisterSession(md.getSessionInfo()); } } else { @@ -466,7 +484,7 @@ public class DefaultTransportService implements TransportService { } @Override - public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToServerRpcRequestMsg msg, TransportServiceCallback callback) { + public void process(SessionInfoProto sessionInfo, ToServerRpcRequestMsg msg, TransportServiceCallback callback) { if (checkLimits(sessionInfo, msg, callback)) { reportActivityInternal(sessionInfo); UUID sessionId = toSessionId(sessionInfo); @@ -482,10 +500,9 @@ public class DefaultTransportService implements TransportService { metaData.putValue("requestId", Integer.toString(msg.getRequestId())); metaData.putValue("serviceId", serviceInfoProvider.getServiceId()); metaData.putValue("sessionId", sessionId.toString()); - RuleChainId ruleChainId = resolveRuleChainId(sessionInfo); - TbMsg tbMsg = TbMsg.newMsg(ServiceQueue.MAIN, SessionMsgType.TO_SERVER_RPC_REQUEST.name(), - deviceId, metaData, gson.toJson(json), ruleChainId, null); - sendToRuleEngine(tenantId, tbMsg, new TransportTbQueueCallback(callback)); + + sendToRuleEngine(tenantId, deviceId, sessionInfo, json, metaData, + SessionMsgType.TO_SERVER_RPC_REQUEST, new TransportTbQueueCallback(callback)); String requestId = sessionId + "-" + msg.getRequestId(); toServerRpcPendingMap.put(requestId, new RpcRequestMetadata(sessionId, msg.getRequestId())); schedulerExecutor.schedule(() -> processTimeout(requestId), clientSideRpcTimeout, TimeUnit.MILLISECONDS); @@ -493,7 +510,7 @@ public class DefaultTransportService implements TransportService { } @Override - public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ClaimDeviceMsg msg, TransportServiceCallback callback) { + public void process(SessionInfoProto sessionInfo, ClaimDeviceMsg msg, TransportServiceCallback callback) { if (checkLimits(sessionInfo, msg, callback)) { reportActivityInternal(sessionInfo); sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) @@ -502,11 +519,11 @@ public class DefaultTransportService implements TransportService { } @Override - public void reportActivity(TransportProtos.SessionInfoProto sessionInfo) { + public void reportActivity(SessionInfoProto sessionInfo) { reportActivityInternal(sessionInfo); } - private SessionMetaData reportActivityInternal(TransportProtos.SessionInfoProto sessionInfo) { + private SessionMetaData reportActivityInternal(SessionInfoProto sessionInfo) { UUID sessionId = toSessionId(sessionInfo); SessionMetaData sessionMetaData = sessions.get(sessionId); if (sessionMetaData != null) { @@ -519,7 +536,7 @@ public class DefaultTransportService implements TransportService { long expTime = System.currentTimeMillis() - sessionInactivityTimeout; sessions.forEach((uuid, sessionMD) -> { long lastActivityTime = sessionMD.getLastActivityTime(); - TransportProtos.SessionInfoProto sessionInfo = sessionMD.getSessionInfo(); + SessionInfoProto sessionInfo = sessionMD.getSessionInfo(); if (sessionInfo.getGwSessionIdMSB() > 0 && sessionInfo.getGwSessionIdLSB() > 0) { SessionMetaData gwMetaData = sessions.get(new UUID(sessionInfo.getGwSessionIdMSB(), sessionInfo.getGwSessionIdLSB())); @@ -531,13 +548,13 @@ public class DefaultTransportService implements TransportService { if (log.isDebugEnabled()) { log.debug("[{}] Session has expired due to last activity time: {}", toSessionId(sessionInfo), lastActivityTime); } - process(sessionInfo, getSessionEventMsg(TransportProtos.SessionEvent.CLOSED), null); + process(sessionInfo, getSessionEventMsg(SessionEvent.CLOSED), null); sessions.remove(uuid); - sessionMD.getListener().onRemoteSessionCloseCommand(TransportProtos.SessionCloseNotificationProto.getDefaultInstance()); + sessionMD.getListener().onRemoteSessionCloseCommand(SessionCloseNotificationProto.getDefaultInstance()); } else { if (lastActivityTime > sessionMD.getLastReportedActivityTime()) { final long lastActivityTimeFinal = lastActivityTime; - process(sessionInfo, TransportProtos.SubscriptionInfoProto.newBuilder() + process(sessionInfo, SubscriptionInfoProto.newBuilder() .setAttributeSubscription(sessionMD.isSubscribedToAttributes()) .setRpcSubscription(sessionMD.isSubscribedToRPC()) .setLastActivityTime(lastActivityTime).build(), new TransportServiceCallback() { @@ -557,12 +574,12 @@ public class DefaultTransportService implements TransportService { } @Override - public void registerSyncSession(TransportProtos.SessionInfoProto sessionInfo, SessionMsgListener listener, long timeout) { - SessionMetaData currentSession = new SessionMetaData(sessionInfo, TransportProtos.SessionType.SYNC, listener); + public void registerSyncSession(SessionInfoProto sessionInfo, SessionMsgListener listener, long timeout) { + SessionMetaData currentSession = new SessionMetaData(sessionInfo, SessionType.SYNC, listener); sessions.putIfAbsent(toSessionId(sessionInfo), currentSession); ScheduledFuture executorFuture = schedulerExecutor.schedule(() -> { - listener.onRemoteSessionCloseCommand(TransportProtos.SessionCloseNotificationProto.getDefaultInstance()); + listener.onRemoteSessionCloseCommand(SessionCloseNotificationProto.getDefaultInstance()); deregisterSession(sessionInfo); }, timeout, TimeUnit.MILLISECONDS); @@ -570,7 +587,7 @@ public class DefaultTransportService implements TransportService { } @Override - public void deregisterSession(TransportProtos.SessionInfoProto sessionInfo) { + public void deregisterSession(SessionInfoProto sessionInfo) { SessionMetaData currentSession = sessions.get(toSessionId(sessionInfo)); if (currentSession != null && currentSession.hasScheduledFuture()) { log.debug("Stopping scheduler to avoid resending response if request has been ack."); @@ -583,12 +600,12 @@ public class DefaultTransportService implements TransportService { private TransportRateLimitType[] TELEMETRY = TransportRateLimitType.values(); @Override - public boolean checkLimits(TransportProtos.SessionInfoProto sessionInfo, Object msg, TransportServiceCallback callback) { + public boolean checkLimits(SessionInfoProto sessionInfo, Object msg, TransportServiceCallback callback) { return checkLimits(sessionInfo, msg, callback, 0, DEFAULT); } @Override - public boolean checkLimits(TransportProtos.SessionInfoProto sessionInfo, Object msg, TransportServiceCallback callback, int dataPoints, TransportRateLimitType... limits) { + public boolean checkLimits(SessionInfoProto sessionInfo, Object msg, TransportServiceCallback callback, int dataPoints, TransportRateLimitType... limits) { if (log.isTraceEnabled()) { log.trace("[{}] Processing msg: {}", toSessionId(sessionInfo), msg); } @@ -609,7 +626,7 @@ public class DefaultTransportService implements TransportService { } } - protected void processToTransportMsg(TransportProtos.ToTransportMsg toSessionMsg) { + protected void processToTransportMsg(ToTransportMsg toSessionMsg) { UUID sessionId = new UUID(toSessionMsg.getSessionIdMSB(), toSessionMsg.getSessionIdLSB()); SessionMetaData md = sessions.get(sessionId); if (md != null) { @@ -633,12 +650,12 @@ public class DefaultTransportService implements TransportService { listener.onToServerRpcResponse(toSessionMsg.getToServerResponse()); } }); - if (md.getSessionType() == TransportProtos.SessionType.SYNC) { + if (md.getSessionType() == SessionType.SYNC) { deregisterSession(md.getSessionInfo()); } } else { if (toSessionMsg.hasEntityUpdateMsg()) { - TransportProtos.EntityUpdateMsg msg = toSessionMsg.getEntityUpdateMsg(); + EntityUpdateMsg msg = toSessionMsg.getEntityUpdateMsg(); EntityType entityType = EntityType.valueOf(msg.getEntityType()); if (EntityType.DEVICE_PROFILE.equals(entityType)) { DeviceProfile deviceProfile = deviceProfileCache.put(msg.getData()); @@ -659,7 +676,7 @@ public class DefaultTransportService implements TransportService { } } } else if (toSessionMsg.hasEntityDeleteMsg()) { - TransportProtos.EntityDeleteMsg msg = toSessionMsg.getEntityDeleteMsg(); + EntityDeleteMsg msg = toSessionMsg.getEntityDeleteMsg(); EntityType entityType = EntityType.valueOf(msg.getEntityType()); UUID entityUuid = new UUID(msg.getEntityIdMSB(), msg.getEntityIdLSB()); if (EntityType.DEVICE_PROFILE.equals(entityType)) { @@ -690,29 +707,29 @@ public class DefaultTransportService implements TransportService { }); } - protected UUID toSessionId(TransportProtos.SessionInfoProto sessionInfo) { + protected UUID toSessionId(SessionInfoProto sessionInfo) { return new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB()); } - protected UUID getRoutingKey(TransportProtos.SessionInfoProto sessionInfo) { + protected UUID getRoutingKey(SessionInfoProto sessionInfo) { return new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()); } - protected TenantId getTenantId(TransportProtos.SessionInfoProto sessionInfo) { + protected TenantId getTenantId(SessionInfoProto sessionInfo) { return new TenantId(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB())); } - protected DeviceId getDeviceId(TransportProtos.SessionInfoProto sessionInfo) { + protected DeviceId getDeviceId(SessionInfoProto sessionInfo) { return new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB())); } - public static TransportProtos.SessionEventMsg getSessionEventMsg(TransportProtos.SessionEvent event) { - return TransportProtos.SessionEventMsg.newBuilder() - .setSessionType(TransportProtos.SessionType.ASYNC) + public static SessionEventMsg getSessionEventMsg(SessionEvent event) { + return SessionEventMsg.newBuilder() + .setSessionType(SessionType.ASYNC) .setEvent(event).build(); } - protected void sendToDeviceActor(TransportProtos.SessionInfoProto sessionInfo, TransportToDeviceActorMsg toDeviceActorMsg, TransportServiceCallback callback) { + protected void sendToDeviceActor(SessionInfoProto sessionInfo, TransportToDeviceActorMsg toDeviceActorMsg, TransportServiceCallback callback) { TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, getTenantId(sessionInfo), getDeviceId(sessionInfo)); if (log.isTraceEnabled()) { log.trace("[{}][{}] Pushing to topic {} message {}", getTenantId(sessionInfo), getDeviceId(sessionInfo), tpi.getFullTopicName(), toDeviceActorMsg); @@ -740,17 +757,25 @@ public class DefaultTransportService implements TransportService { ruleEngineMsgProducer.send(tpi, new TbProtoQueueMsg<>(tbMsg.getId(), msg), wrappedCallback); } - private RuleChainId resolveRuleChainId(TransportProtos.SessionInfoProto sessionInfo) { + protected void sendToRuleEngine(TenantId tenantId, DeviceId deviceId, SessionInfoProto sessionInfo, JsonObject json, + TbMsgMetaData metaData, SessionMsgType sessionMsgType, TbQueueCallback callback) { DeviceProfileId deviceProfileId = new DeviceProfileId(new UUID(sessionInfo.getDeviceProfileIdMSB(), sessionInfo.getDeviceProfileIdLSB())); DeviceProfile deviceProfile = deviceProfileCache.get(deviceProfileId); RuleChainId ruleChainId; + String queueName; + if (deviceProfile == null) { log.warn("[{}] Device profile is null!", deviceProfileId); ruleChainId = null; + queueName = ServiceQueue.MAIN; } else { ruleChainId = deviceProfile.getDefaultRuleChainId(); + String defaultQueueName = deviceProfile.getDefaultQueueName(); + queueName = defaultQueueName != null ? defaultQueueName : ServiceQueue.MAIN; } - return ruleChainId; + + TbMsg tbMsg = TbMsg.newMsg(queueName, sessionMsgType.name(), deviceId, metaData, gson.toJson(json), ruleChainId, null); + sendToRuleEngine(tenantId, tbMsg, callback); } private class TransportTbQueueCallback implements TbQueueCallback {