From f01cbe0e368bc320e1efa92ccd5bdb7f8471022c Mon Sep 17 00:00:00 2001 From: Andrii Landiak Date: Mon, 23 Oct 2023 10:10:02 +0300 Subject: [PATCH 1/6] Refactoring ToCoreNotificationMsg: implement replacement for raw bytes messages --- .../device/DeviceActorMessageProcessor.java | 6 +- .../queue/DefaultTbClusterService.java | 17 +- .../queue/DefaultTbCoreConsumerService.java | 39 +- .../server/service/queue/ProtoUtils.java | 404 ++++++++++++++ .../service/queue/TbCoreConsumerStats.java | 6 + .../rpc/FromDeviceRpcResponseActorMsg.java | 18 +- .../server/service/rpc/RemoveRpcActorMsg.java | 15 +- .../rpc/ToDeviceRpcRequestActorMsg.java | 9 +- .../server/service/queue/ProtoUtilsTest.java | 511 +++++++++++++++++- common/cluster-api/src/main/proto/queue.proto | 126 ++++- .../data/plugin/ComponentLifecycleEvent.java | 2 +- .../common/msg/edge/EdgeEventUpdateMsg.java | 17 +- .../common/msg/edge/FromEdgeSyncResponse.java | 8 +- .../common/msg/edge/ToEdgeSyncRequest.java | 9 +- .../common/msg/rpc/FromDeviceRpcResponse.java | 11 +- .../common/msg/rpc/ToDeviceRpcRequest.java | 4 +- .../DeviceAttributesEventNotificationMsg.java | 15 +- .../engine/api/msg/DeviceEdgeUpdateMsg.java | 4 +- .../api/msg/DeviceNameOrTypeUpdateMsg.java | 2 - 19 files changed, 1115 insertions(+), 108 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index b838f0b86d..73cd948bcf 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -271,9 +271,13 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } private ToDeviceRpcRequestMsg createToDeviceRpcRequestMsg(ToDeviceRpcRequest request) { + return createToDeviceRpcRequestMsg(request, rpcSeq++); + } + + private ToDeviceRpcRequestMsg createToDeviceRpcRequestMsg(ToDeviceRpcRequest request, int rpcSeq) { ToDeviceRpcRequestBody body = request.getBody(); return ToDeviceRpcRequestMsg.newBuilder() - .setRequestId(rpcSeq++) + .setRequestId(rpcSeq) .setMethodName(body.getMethod()) .setParams(body.getParams()) .setExpirationTime(request.getExpirationTime()) diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index 937ee12fab..53b7ad5215 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -132,8 +132,8 @@ public class DefaultTbClusterService implements TbClusterService { public void pushMsgToCore(ToDeviceActorNotificationMsg msg, TbQueueCallback callback) { TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, msg.getTenantId(), msg.getDeviceId()); log.trace("PUSHING msg: {} to:{}", msg, tpi); - byte[] msgBytes = encodingService.encode(msg); - ToCoreMsg toCoreMsg = ToCoreMsg.newBuilder().setToDeviceActorNotificationMsg(ByteString.copyFrom(msgBytes)).build(); + TransportProtos.ToDeviceActorNotificationMsgProto proto = ProtoUtils.toProto(msg); + ToCoreMsg toCoreMsg = ToCoreMsg.newBuilder().setToDeviceActorNotification(proto).build(); producerProvider.getTbCoreMsgProducer().send(tpi, new TbProtoQueueMsg<>(msg.getDeviceId().getId(), toCoreMsg), callback); toCoreMsgs.incrementAndGet(); } @@ -362,25 +362,24 @@ public class DefaultTbClusterService implements TbClusterService { @Override public void onEdgeEventUpdate(TenantId tenantId, EdgeId edgeId) { log.trace("[{}] Processing edge {} event update ", tenantId, edgeId); - EdgeEventUpdateMsg msg = new EdgeEventUpdateMsg(tenantId, edgeId); - byte[] msgBytes = encodingService.encode(msg); - ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setEdgeEventUpdateMsg(ByteString.copyFrom(msgBytes)).build(); + TransportProtos.EdgeEventUpdateMsgProto edgeEventUpdateMsgProto = ProtoUtils.toProto(new EdgeEventUpdateMsg(tenantId, edgeId)); + ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setEdgeEventUpdate(edgeEventUpdateMsgProto).build(); pushEdgeSyncMsgToCore(edgeId, toCoreMsg); } @Override public void pushEdgeSyncRequestToCore(ToEdgeSyncRequest toEdgeSyncRequest) { log.trace("[{}] Processing edge sync request {} ", toEdgeSyncRequest.getTenantId(), toEdgeSyncRequest); - byte[] msgBytes = encodingService.encode(toEdgeSyncRequest); - ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setToEdgeSyncRequestMsg(ByteString.copyFrom(msgBytes)).build(); + TransportProtos.ToEdgeSyncRequestMsgProto toEdgeSyncRequestMsgProto = ProtoUtils.toProto(toEdgeSyncRequest); + ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setToEdgeSyncRequest(toEdgeSyncRequestMsgProto).build(); pushEdgeSyncMsgToCore(toEdgeSyncRequest.getEdgeId(), toCoreMsg); } @Override public void pushEdgeSyncResponseToCore(FromEdgeSyncResponse fromEdgeSyncResponse) { log.trace("[{}] Processing edge sync response {}", fromEdgeSyncResponse.getTenantId(), fromEdgeSyncResponse); - byte[] msgBytes = encodingService.encode(fromEdgeSyncResponse); - ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setFromEdgeSyncResponseMsg(ByteString.copyFrom(msgBytes)).build(); + TransportProtos.FromEdgeSyncResponseMsgProto fromEdgeSyncResponseMsgProto = ProtoUtils.toProto(fromEdgeSyncResponse); + ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setFromEdgeSyncResponse(fromEdgeSyncResponseMsgProto).build(); pushEdgeSyncMsgToCore(fromEdgeSyncResponse.getEdgeId(), toCoreMsg); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index bae1ea35a6..433f80ebe1 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -264,15 +264,14 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService actorMsg = encodingService.decode(toCoreMsg.getToDeviceActorNotificationMsg().toByteArray()); - if (actorMsg.isPresent()) { - TbActorMsg tbActorMsg = actorMsg.get(); - if (tbActorMsg.getMsgType().equals(MsgType.DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG)) { - tbCoreDeviceRpcService.forwardRpcRequestToDeviceActor((ToDeviceRpcRequestActorMsg) tbActorMsg); + } else if (toCoreMsg.hasToDeviceActorNotification()) { + TbActorMsg actorMsg = ProtoUtils.fromProto(toCoreMsg.getToDeviceActorNotification()); + if (actorMsg != null) { + if (actorMsg.getMsgType().equals(MsgType.DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG)) { + tbCoreDeviceRpcService.forwardRpcRequestToDeviceActor((ToDeviceRpcRequestActorMsg) actorMsg); } else { - log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get()); - actorContext.tell(actorMsg.get()); + log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg); + actorContext.tell(actorMsg); } } callback.onSuccess(); @@ -353,12 +352,15 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService actorMsg, TbCallback callback) { - if (actorMsg.isPresent()) { - log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get()); - actorContext.tell(actorMsg.get()); - } - callback.onSuccess(); + private void forwardToAppActor(UUID id, TbActorMsg actorMsg) { + log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg); + actorContext.tell(actorMsg); } private void forwardToEventService(ErrorEventProto eventProto, TbCallback callback) { diff --git a/application/src/main/java/org/thingsboard/server/service/queue/ProtoUtils.java b/application/src/main/java/org/thingsboard/server/service/queue/ProtoUtils.java index 3406265747..b50cb3c6f8 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/ProtoUtils.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/ProtoUtils.java @@ -15,13 +15,45 @@ */ package org.thingsboard.server.service.queue; +import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg; +import org.thingsboard.rule.engine.api.msg.DeviceCredentialsUpdateNotificationMsg; +import org.thingsboard.rule.engine.api.msg.DeviceEdgeUpdateMsg; +import org.thingsboard.rule.engine.api.msg.DeviceNameOrTypeUpdateMsg; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.kv.AttributeKey; +import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; +import org.thingsboard.server.common.data.kv.BooleanDataEntry; +import org.thingsboard.server.common.data.kv.DoubleDataEntry; +import org.thingsboard.server.common.data.kv.JsonDataEntry; +import org.thingsboard.server.common.data.kv.KvEntry; +import org.thingsboard.server.common.data.kv.LongDataEntry; +import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; +import org.thingsboard.server.common.data.rpc.RpcError; +import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody; +import org.thingsboard.server.common.data.security.DeviceCredentials; +import org.thingsboard.server.common.data.security.DeviceCredentialsType; +import org.thingsboard.server.common.msg.ToDeviceActorNotificationMsg; +import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg; +import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse; +import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; +import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse; +import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest; import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.service.rpc.FromDeviceRpcResponseActorMsg; +import org.thingsboard.server.service.rpc.RemoveRpcActorMsg; +import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; import java.util.UUID; +import java.util.stream.Collectors; public class ProtoUtils { @@ -44,4 +76,376 @@ public class ProtoUtils { ); } + public static TransportProtos.ToEdgeSyncRequestMsgProto toProto(ToEdgeSyncRequest request) { + return TransportProtos.ToEdgeSyncRequestMsgProto.newBuilder() + .setTenantIdMSB(request.getTenantId().getId().getMostSignificantBits()) + .setTenantIdLSB(request.getTenantId().getId().getLeastSignificantBits()) + .setRequestIdMSB(request.getId().getMostSignificantBits()) + .setRequestIdLSB(request.getId().getLeastSignificantBits()) + .setEdgeIdMSB(request.getEdgeId().getId().getMostSignificantBits()) + .setEdgeIdLSB(request.getEdgeId().getId().getLeastSignificantBits()) + .build(); + } + + public static ToEdgeSyncRequest fromProto(TransportProtos.ToEdgeSyncRequestMsgProto proto) { + return new ToEdgeSyncRequest( + new UUID(proto.getRequestIdMSB(), proto.getRequestIdLSB()), + TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())), + new EdgeId(new UUID(proto.getEdgeIdMSB(), proto.getEdgeIdLSB())) + ); + } + + public static TransportProtos.FromEdgeSyncResponseMsgProto toProto(FromEdgeSyncResponse response) { + return TransportProtos.FromEdgeSyncResponseMsgProto.newBuilder() + .setTenantIdMSB(response.getTenantId().getId().getMostSignificantBits()) + .setTenantIdLSB(response.getTenantId().getId().getLeastSignificantBits()) + .setResponseIdMSB(response.getId().getMostSignificantBits()) + .setResponseIdLSB(response.getId().getLeastSignificantBits()) + .setEdgeIdMSB(response.getEdgeId().getId().getMostSignificantBits()) + .setEdgeIdLSB(response.getEdgeId().getId().getLeastSignificantBits()) + .setSuccess(response.isSuccess()) + .build(); + } + + public static FromEdgeSyncResponse fromProto(TransportProtos.FromEdgeSyncResponseMsgProto proto) { + return new FromEdgeSyncResponse( + new UUID(proto.getResponseIdMSB(), proto.getResponseIdLSB()), + TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())), + new EdgeId(new UUID(proto.getEdgeIdMSB(), proto.getEdgeIdLSB())), + proto.getSuccess() + ); + } + + public static TransportProtos.EdgeEventUpdateMsgProto toProto(EdgeEventUpdateMsg msg) { + return TransportProtos.EdgeEventUpdateMsgProto.newBuilder() + .setTenantIdMSB(msg.getTenantId().getId().getMostSignificantBits()) + .setTenantIdLSB(msg.getTenantId().getId().getLeastSignificantBits()) + .setEdgeIdMSB(msg.getEdgeId().getId().getMostSignificantBits()) + .setEdgeIdLSB(msg.getEdgeId().getId().getLeastSignificantBits()) + .build(); + } + + public static EdgeEventUpdateMsg fromProto(TransportProtos.EdgeEventUpdateMsgProto proto) { + return new EdgeEventUpdateMsg( + TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())), + new EdgeId(new UUID(proto.getEdgeIdMSB(), proto.getEdgeIdLSB())) + ); + } + + private static TransportProtos.DeviceEdgeUpdateMsgProto toProto(DeviceEdgeUpdateMsg msg) { + return TransportProtos.DeviceEdgeUpdateMsgProto.newBuilder() + .setTenantIdMSB(msg.getTenantId().getId().getMostSignificantBits()) + .setTenantIdLSB(msg.getTenantId().getId().getLeastSignificantBits()) + .setDeviceIdMSB(msg.getDeviceId().getId().getMostSignificantBits()) + .setDeviceIdLSB(msg.getDeviceId().getId().getLeastSignificantBits()) + .setEdgeIdMSB(msg.getEdgeId().getId().getMostSignificantBits()) + .setEdgeIdLSB(msg.getEdgeId().getId().getLeastSignificantBits()) + .build(); + } + + private static DeviceEdgeUpdateMsg fromProto(TransportProtos.DeviceEdgeUpdateMsgProto proto) { + return new DeviceEdgeUpdateMsg( + TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())), + new DeviceId(new UUID(proto.getDeviceIdMSB(), proto.getDeviceIdLSB())), + new EdgeId(new UUID(proto.getEdgeIdMSB(), proto.getEdgeIdLSB())) + ); + } + + private static TransportProtos.DeviceNameOrTypeUpdateMsgProto toProto(DeviceNameOrTypeUpdateMsg msg) { + return TransportProtos.DeviceNameOrTypeUpdateMsgProto.newBuilder() + .setTenantIdMSB(msg.getTenantId().getId().getMostSignificantBits()) + .setTenantIdLSB(msg.getTenantId().getId().getLeastSignificantBits()) + .setDeviceIdMSB(msg.getDeviceId().getId().getMostSignificantBits()) + .setDeviceIdLSB(msg.getDeviceId().getId().getLeastSignificantBits()) + .setDeviceName(msg.getDeviceName()) + .setDeviceType(msg.getDeviceType()) + .build(); + } + + private static DeviceNameOrTypeUpdateMsg fromProto(TransportProtos.DeviceNameOrTypeUpdateMsgProto proto) { + return new DeviceNameOrTypeUpdateMsg( + TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())), + new DeviceId(new UUID(proto.getDeviceIdMSB(), proto.getDeviceIdLSB())), + proto.getDeviceName(), + proto.getDeviceType() + ); + } + + private static TransportProtos.DeviceAttributesEventMsgProto toProto(DeviceAttributesEventNotificationMsg msg) { + TransportProtos.DeviceAttributesEventMsgProto.Builder builder = TransportProtos.DeviceAttributesEventMsgProto.newBuilder(); + builder.setTenantIdMSB(msg.getTenantId().getId().getMostSignificantBits()) + .setTenantIdLSB(msg.getTenantId().getId().getLeastSignificantBits()) + .setDeviceIdMSB(msg.getDeviceId().getId().getMostSignificantBits()) + .setDeviceIdLSB(msg.getDeviceId().getId().getLeastSignificantBits()) + .setDeleted(msg.isDeleted()); + + if (msg.getScope() != null) { + builder.setScope(TransportProtos.AttributeScopeProto.valueOf(msg.getScope())); + } + + if (msg.getDeletedKeys() != null) { + for (AttributeKey key : msg.getDeletedKeys()) { + builder.addDeletedKeys(TransportProtos.AttributeKey.newBuilder() + .setScope(TransportProtos.AttributeScopeProto.valueOf(key.getScope())) + .setAttributeKey(key.getAttributeKey()) + .build()); + } + } + + if (msg.getValues() != null) { + for (AttributeKvEntry attributeKvEntry : msg.getValues()) { + TransportProtos.AttributeValueProto.Builder attributeValueBuilder = TransportProtos.AttributeValueProto.newBuilder() + .setLastUpdateTs(attributeKvEntry.getLastUpdateTs()) + .setKey(attributeKvEntry.getKey()); + switch (attributeKvEntry.getDataType()) { + case BOOLEAN: + attributeKvEntry.getBooleanValue().ifPresent(attributeValueBuilder::setBoolV); + attributeValueBuilder.setHasV(attributeKvEntry.getBooleanValue().isPresent()); + attributeValueBuilder.setType(TransportProtos.KeyValueType.BOOLEAN_V); + break; + case STRING: + attributeKvEntry.getStrValue().ifPresent(attributeValueBuilder::setStringV); + attributeValueBuilder.setHasV(attributeKvEntry.getStrValue().isPresent()); + attributeValueBuilder.setType(TransportProtos.KeyValueType.STRING_V); + break; + case DOUBLE: + attributeKvEntry.getDoubleValue().ifPresent(attributeValueBuilder::setDoubleV); + attributeValueBuilder.setHasV(attributeKvEntry.getDoubleValue().isPresent()); + attributeValueBuilder.setType(TransportProtos.KeyValueType.DOUBLE_V); + break; + case LONG: + attributeKvEntry.getLongValue().ifPresent(attributeValueBuilder::setLongV); + attributeValueBuilder.setHasV(attributeKvEntry.getLongValue().isPresent()); + attributeValueBuilder.setType(TransportProtos.KeyValueType.LONG_V); + break; + case JSON: + attributeKvEntry.getJsonValue().ifPresent(attributeValueBuilder::setJsonV); + attributeValueBuilder.setHasV(attributeKvEntry.getJsonValue().isPresent()); + attributeValueBuilder.setType(TransportProtos.KeyValueType.JSON_V); + break; + } + builder.addValues(attributeValueBuilder.build()); + } + } + return builder.build(); + } + + private static ToDeviceActorNotificationMsg fromProto(TransportProtos.DeviceAttributesEventMsgProto proto) { + return new DeviceAttributesEventNotificationMsg( + TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())), + new DeviceId(new UUID(proto.getDeviceIdMSB(), proto.getDeviceIdLSB())), + getAttributeKeySetFromProto(proto.getDeletedKeysList()), + proto.hasScope() ? proto.getScope().name() : null, + getAttributesKvEntryFromProto(proto.getValuesList()), + proto.getDeleted() + ); + } + + private static TransportProtos.DeviceCredentialsUpdateMsgProto toProto(DeviceCredentialsUpdateNotificationMsg msg) { + TransportProtos.DeviceCredentialsProto.Builder protoBuilder = TransportProtos.DeviceCredentialsProto.newBuilder() + .setDeviceIdMSB(msg.getDeviceCredentials().getDeviceId().getId().getMostSignificantBits()) + .setDeviceIdLSB(msg.getDeviceCredentials().getDeviceId().getId().getLeastSignificantBits()) + .setCredentialsId(msg.getDeviceCredentials().getCredentialsId()) + .setCredentialsType(TransportProtos.CredentialsType.valueOf(msg.getDeviceCredentials().getCredentialsType().name())); + + if (msg.getDeviceCredentials().getCredentialsValue() != null) { + protoBuilder.setCredentialsValue(msg.getDeviceCredentials().getCredentialsValue()); + } + + return TransportProtos.DeviceCredentialsUpdateMsgProto.newBuilder() + .setTenantIdMSB(msg.getTenantId().getId().getMostSignificantBits()) + .setTenantIdLSB(msg.getTenantId().getId().getLeastSignificantBits()) + .setDeviceIdMSB(msg.getDeviceId().getId().getMostSignificantBits()) + .setDeviceIdLSB(msg.getDeviceId().getId().getLeastSignificantBits()) + .setDeviceCredentials(protoBuilder.build()) + .build(); + } + + private static ToDeviceActorNotificationMsg fromProto(TransportProtos.DeviceCredentialsUpdateMsgProto proto) { + DeviceCredentials deviceCredentials = new DeviceCredentials(); + deviceCredentials.setDeviceId(new DeviceId(new UUID(proto.getDeviceCredentials().getDeviceIdMSB(), proto.getDeviceCredentials().getDeviceIdLSB()))); + deviceCredentials.setCredentialsId(proto.getDeviceCredentials().getCredentialsId()); + deviceCredentials.setCredentialsValue(proto.getDeviceCredentials().hasCredentialsValue() ? proto.getDeviceCredentials().getCredentialsValue() : null); + deviceCredentials.setCredentialsType(DeviceCredentialsType.valueOf(proto.getDeviceCredentials().getCredentialsType().name())); + return new DeviceCredentialsUpdateNotificationMsg( + TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())), + new DeviceId(new UUID(proto.getDeviceIdMSB(), proto.getDeviceIdLSB())), + deviceCredentials + ); + } + + private static TransportProtos.ToDeviceRpcRequestActorMsgProto toProto(ToDeviceRpcRequestActorMsg msg) { + TransportProtos.ToDeviceRpcRequestMsg proto = TransportProtos.ToDeviceRpcRequestMsg.newBuilder() + .setMethodName(msg.getMsg().getBody().getMethod()) + .setParams(msg.getMsg().getBody().getParams()) + .setExpirationTime(msg.getMsg().getExpirationTime()) + .setRequestIdMSB(msg.getMsg().getId().getMostSignificantBits()) + .setRequestIdLSB(msg.getMsg().getId().getLeastSignificantBits()) + .setOneway(msg.getMsg().isOneway()) + .build(); + + return TransportProtos.ToDeviceRpcRequestActorMsgProto.newBuilder() + .setTenantIdMSB(msg.getTenantId().getId().getMostSignificantBits()) + .setTenantIdLSB(msg.getTenantId().getId().getLeastSignificantBits()) + .setDeviceIdMSB(msg.getDeviceId().getId().getMostSignificantBits()) + .setDeviceIdLSB(msg.getDeviceId().getId().getLeastSignificantBits()) + .setServiceId(msg.getServiceId()) + .setToDeviceRpcRequestMsg(proto) + .build(); + } + + private static ToDeviceActorNotificationMsg fromProto(TransportProtos.ToDeviceRpcRequestActorMsgProto proto) { + TransportProtos.ToDeviceRpcRequestMsg toDeviceRpcRequestMsg = proto.getToDeviceRpcRequestMsg(); + ToDeviceRpcRequest toDeviceRpcRequest = new ToDeviceRpcRequest( + new UUID(toDeviceRpcRequestMsg.getRequestIdMSB(), toDeviceRpcRequestMsg.getRequestIdLSB()), + TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())), + new DeviceId(new UUID(proto.getDeviceIdMSB(), proto.getDeviceIdLSB())), + toDeviceRpcRequestMsg.getOneway(), + toDeviceRpcRequestMsg.getExpirationTime(), + new ToDeviceRpcRequestBody(toDeviceRpcRequestMsg.getMethodName(), toDeviceRpcRequestMsg.getParams()), + toDeviceRpcRequestMsg.getPersisted(), 0, ""); + return new ToDeviceRpcRequestActorMsg(proto.getServiceId(), toDeviceRpcRequest); + } + + private static TransportProtos.FromDeviceRpcResponseActorMsgProto toProto(FromDeviceRpcResponseActorMsg msg) { + TransportProtos.FromDeviceRPCResponseProto.Builder builder = TransportProtos.FromDeviceRPCResponseProto.newBuilder() + .setRequestIdMSB(msg.getMsg().getId().getMostSignificantBits()) + .setRequestIdLSB(msg.getMsg().getId().getLeastSignificantBits()) + .setError(msg.getMsg().getError().isPresent() ? msg.getMsg().getError().get().ordinal() : -1); + if (msg.getMsg().getResponse().isPresent()) { + builder.setResponse(msg.getMsg().getResponse().get()); + } + + return TransportProtos.FromDeviceRpcResponseActorMsgProto.newBuilder() + .setRequestId(msg.getRequestId()) + .setTenantIdMSB(msg.getTenantId().getId().getMostSignificantBits()) + .setTenantIdLSB(msg.getTenantId().getId().getLeastSignificantBits()) + .setDeviceIdMSB(msg.getDeviceId().getId().getMostSignificantBits()) + .setDeviceIdLSB(msg.getDeviceId().getId().getLeastSignificantBits()) + .setRpcResponse(builder.build()) + .build(); + } + + private static ToDeviceActorNotificationMsg fromProto(TransportProtos.FromDeviceRpcResponseActorMsgProto proto) { + FromDeviceRpcResponse fromDeviceRpcResponse = new FromDeviceRpcResponse( + new UUID(proto.getRpcResponse().getRequestIdMSB(), proto.getRpcResponse().getRequestIdLSB()), + proto.getRpcResponse().getResponse(), + proto.getRpcResponse().getError() >= 0 ? RpcError.values()[proto.getRpcResponse().getError()] : null); + return new FromDeviceRpcResponseActorMsg( + proto.getRequestId(), + TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())), + new DeviceId(new UUID(proto.getDeviceIdMSB(), proto.getDeviceIdLSB())), + fromDeviceRpcResponse + ); + } + + private static TransportProtos.RemoveRpcActorMsgProto toProto(RemoveRpcActorMsg msg) { + return TransportProtos.RemoveRpcActorMsgProto.newBuilder() + .setTenantIdMSB(msg.getTenantId().getId().getMostSignificantBits()) + .setTenantIdLSB(msg.getTenantId().getId().getLeastSignificantBits()) + .setDeviceIdMSB(msg.getDeviceId().getId().getMostSignificantBits()) + .setDeviceIdLSB(msg.getDeviceId().getId().getLeastSignificantBits()) + .setRequestIdMSB(msg.getRequestId().getMostSignificantBits()) + .setRequestIdLSB(msg.getRequestId().getLeastSignificantBits()) + .build(); + } + + private static ToDeviceActorNotificationMsg fromProto(TransportProtos.RemoveRpcActorMsgProto proto) { + return new RemoveRpcActorMsg( + TenantId.fromUUID(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())), + new DeviceId(new UUID(proto.getDeviceIdMSB(), proto.getDeviceIdLSB())), + new UUID(proto.getRequestIdMSB(), proto.getRequestIdLSB()) + ); + } + + public static TransportProtos.ToDeviceActorNotificationMsgProto toProto(ToDeviceActorNotificationMsg msg) { + if (msg instanceof DeviceEdgeUpdateMsg) { + DeviceEdgeUpdateMsg updateMsg = (DeviceEdgeUpdateMsg) msg; + TransportProtos.DeviceEdgeUpdateMsgProto proto = toProto(updateMsg); + return TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setDeviceEdgeUpdateMsg(proto).build(); + } else if (msg instanceof DeviceNameOrTypeUpdateMsg) { + DeviceNameOrTypeUpdateMsg updateMsg = (DeviceNameOrTypeUpdateMsg) msg; + TransportProtos.DeviceNameOrTypeUpdateMsgProto proto = toProto(updateMsg); + return TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setDeviceNameOrTypeMsg(proto).build(); + } else if (msg instanceof DeviceAttributesEventNotificationMsg) { + DeviceAttributesEventNotificationMsg updateMsg = (DeviceAttributesEventNotificationMsg) msg; + TransportProtos.DeviceAttributesEventMsgProto proto = toProto(updateMsg); + return TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setDeviceAttributesEventMsg(proto).build(); + } else if (msg instanceof DeviceCredentialsUpdateNotificationMsg) { + DeviceCredentialsUpdateNotificationMsg updateMsg = (DeviceCredentialsUpdateNotificationMsg) msg; + TransportProtos.DeviceCredentialsUpdateMsgProto proto = toProto(updateMsg); + return TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setDeviceCredentialsUpdateMsg(proto).build(); + } else if (msg instanceof ToDeviceRpcRequestActorMsg) { + ToDeviceRpcRequestActorMsg updateMsg = (ToDeviceRpcRequestActorMsg) msg; + TransportProtos.ToDeviceRpcRequestActorMsgProto proto = toProto(updateMsg); + return TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setToDeviceRpcRequestMsg(proto).build(); + } else if (msg instanceof FromDeviceRpcResponseActorMsg) { + FromDeviceRpcResponseActorMsg updateMsg = (FromDeviceRpcResponseActorMsg) msg; + TransportProtos.FromDeviceRpcResponseActorMsgProto proto = toProto(updateMsg); + return TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setFromDeviceRpcResponseMsg(proto).build(); + } else if (msg instanceof RemoveRpcActorMsg) { + RemoveRpcActorMsg updateMsg = (RemoveRpcActorMsg) msg; + TransportProtos.RemoveRpcActorMsgProto proto = toProto(updateMsg); + return TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setRemoveRpcActorMsg(proto).build(); + } + return null; + } + + public static ToDeviceActorNotificationMsg fromProto(TransportProtos.ToDeviceActorNotificationMsgProto proto) { + if (proto.hasDeviceEdgeUpdateMsg()) { + return fromProto(proto.getDeviceEdgeUpdateMsg()); + } else if (proto.hasDeviceNameOrTypeMsg()) { + return fromProto(proto.getDeviceNameOrTypeMsg()); + } else if (proto.hasDeviceAttributesEventMsg()) { + return fromProto(proto.getDeviceAttributesEventMsg()); + } else if (proto.hasDeviceCredentialsUpdateMsg()) { + return fromProto(proto.getDeviceCredentialsUpdateMsg()); + } else if (proto.hasToDeviceRpcRequestMsg()) { + return fromProto(proto.getToDeviceRpcRequestMsg()); + } else if (proto.hasFromDeviceRpcResponseMsg()) { + return fromProto(proto.getFromDeviceRpcResponseMsg()); + } else if (proto.hasRemoveRpcActorMsg()) { + return fromProto(proto.getRemoveRpcActorMsg()); + } + return null; + } + + private static Set getAttributeKeySetFromProto(List deletedKeysList) { + if (deletedKeysList.isEmpty()) { + return null; + } + return deletedKeysList.stream() + .map(attributeKey -> new AttributeKey(attributeKey.getScope().name(), attributeKey.getAttributeKey())) + .collect(Collectors.toSet()); + } + + private static List getAttributesKvEntryFromProto(List valuesList) { + if (valuesList.isEmpty()) { + return null; + } + List result = new ArrayList<>(); + for (TransportProtos.AttributeValueProto kvEntry : valuesList) { + boolean hasValue = kvEntry.getHasV(); + KvEntry entry = null; + switch (kvEntry.getType()) { + case BOOLEAN_V: + entry = new BooleanDataEntry(kvEntry.getKey(), hasValue ? kvEntry.getBoolV() : null); + break; + case LONG_V: + entry = new LongDataEntry(kvEntry.getKey(), hasValue ? kvEntry.getLongV() : null); + break; + case DOUBLE_V: + entry = new DoubleDataEntry(kvEntry.getKey(), hasValue ? kvEntry.getDoubleV() : null); + break; + case STRING_V: + entry = new StringDataEntry(kvEntry.getKey(), hasValue ? kvEntry.getStringV() : null); + break; + case JSON_V: + entry = new JsonDataEntry(kvEntry.getKey(), hasValue ? kvEntry.getJsonV() : null); + break; + } + result.add(new BaseAttributeKvEntry(kvEntry.getLastUpdateTs(), entry)); + } + return result; + } } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java b/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java index 9107159615..5ff72e97e3 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java @@ -172,10 +172,16 @@ public class TbCoreConsumerStats { toCoreNfComponentLifecycleCounter.increment(); } else if (!msg.getComponentLifecycleMsg().isEmpty()) { toCoreNfComponentLifecycleCounter.increment(); + } else if (msg.hasEdgeEventUpdate()) { + toCoreNfEdgeEventUpdateCounter.increment(); } else if (!msg.getEdgeEventUpdateMsg().isEmpty()) { toCoreNfEdgeEventUpdateCounter.increment(); + } else if (msg.hasToEdgeSyncRequest()) { + toCoreNfEdgeSyncRequestCounter.increment(); } else if (!msg.getToEdgeSyncRequestMsg().isEmpty()) { toCoreNfEdgeSyncRequestCounter.increment(); + } else if (msg.hasFromEdgeSyncResponse()) { + toCoreNfEdgeSyncResponseCounter.increment(); } else if (!msg.getFromEdgeSyncResponseMsg().isEmpty()) { toCoreNfEdgeSyncResponseCounter.increment(); } else if (msg.hasQueueUpdateMsg()) { diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/FromDeviceRpcResponseActorMsg.java b/application/src/main/java/org/thingsboard/server/service/rpc/FromDeviceRpcResponseActorMsg.java index 2a3e30adcf..138a1965f4 100644 --- a/application/src/main/java/org/thingsboard/server/service/rpc/FromDeviceRpcResponseActorMsg.java +++ b/application/src/main/java/org/thingsboard/server/service/rpc/FromDeviceRpcResponseActorMsg.java @@ -15,27 +15,21 @@ */ package org.thingsboard.server.service.rpc; -import lombok.Getter; -import lombok.RequiredArgsConstructor; -import lombok.ToString; +import lombok.Data; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.MsgType; import org.thingsboard.server.common.msg.ToDeviceActorNotificationMsg; import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse; -@ToString -@RequiredArgsConstructor +@Data public class FromDeviceRpcResponseActorMsg implements ToDeviceActorNotificationMsg { - @Getter - private final Integer requestId; - @Getter - private final TenantId tenantId; - @Getter - private final DeviceId deviceId; + private static final long serialVersionUID = -6648120137236354987L; - @Getter + private final Integer requestId; + private final TenantId tenantId; + private final DeviceId deviceId; private final FromDeviceRpcResponse msg; @Override diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/RemoveRpcActorMsg.java b/application/src/main/java/org/thingsboard/server/service/rpc/RemoveRpcActorMsg.java index c0b0c64b2d..77fc46c93c 100644 --- a/application/src/main/java/org/thingsboard/server/service/rpc/RemoveRpcActorMsg.java +++ b/application/src/main/java/org/thingsboard/server/service/rpc/RemoveRpcActorMsg.java @@ -15,9 +15,7 @@ */ package org.thingsboard.server.service.rpc; -import lombok.Getter; -import lombok.RequiredArgsConstructor; -import lombok.ToString; +import lombok.Data; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.MsgType; @@ -25,16 +23,13 @@ import org.thingsboard.server.common.msg.ToDeviceActorNotificationMsg; import java.util.UUID; -@ToString -@RequiredArgsConstructor +@Data public class RemoveRpcActorMsg implements ToDeviceActorNotificationMsg { - @Getter - private final TenantId tenantId; - @Getter - private final DeviceId deviceId; + private static final long serialVersionUID = -6112720854949677477L; - @Getter + private final TenantId tenantId; + private final DeviceId deviceId; private final UUID requestId; @Override diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/ToDeviceRpcRequestActorMsg.java b/application/src/main/java/org/thingsboard/server/service/rpc/ToDeviceRpcRequestActorMsg.java index 64c8887edb..9c128a296f 100644 --- a/application/src/main/java/org/thingsboard/server/service/rpc/ToDeviceRpcRequestActorMsg.java +++ b/application/src/main/java/org/thingsboard/server/service/rpc/ToDeviceRpcRequestActorMsg.java @@ -15,9 +15,7 @@ */ package org.thingsboard.server.service.rpc; -import lombok.Getter; -import lombok.RequiredArgsConstructor; -import lombok.ToString; +import lombok.Data; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.MsgType; @@ -27,15 +25,12 @@ import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest; /** * Created by ashvayka on 16.04.18. */ -@ToString -@RequiredArgsConstructor +@Data public class ToDeviceRpcRequestActorMsg implements ToDeviceActorNotificationMsg { private static final long serialVersionUID = -8592877558138716589L; - @Getter private final String serviceId; - @Getter private final ToDeviceRpcRequest msg; @Override diff --git a/application/src/test/java/org/thingsboard/server/service/queue/ProtoUtilsTest.java b/application/src/test/java/org/thingsboard/server/service/queue/ProtoUtilsTest.java index c63055a6f1..6678c9d14d 100644 --- a/application/src/test/java/org/thingsboard/server/service/queue/ProtoUtilsTest.java +++ b/application/src/test/java/org/thingsboard/server/service/queue/ProtoUtilsTest.java @@ -15,14 +15,39 @@ */ package org.thingsboard.server.service.queue; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg; +import org.thingsboard.rule.engine.api.msg.DeviceCredentialsUpdateNotificationMsg; +import org.thingsboard.rule.engine.api.msg.DeviceEdgeUpdateMsg; +import org.thingsboard.rule.engine.api.msg.DeviceNameOrTypeUpdateMsg; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; +import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; +import org.thingsboard.server.common.data.rpc.RpcError; +import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody; +import org.thingsboard.server.common.data.security.DeviceCredentials; +import org.thingsboard.server.common.data.security.DeviceCredentialsType; +import org.thingsboard.server.common.msg.ToDeviceActorNotificationMsg; +import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg; +import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse; +import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; +import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse; +import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest; import org.thingsboard.server.gen.transport.TransportProtos; -import java.util.UUID; +import org.thingsboard.server.service.rpc.FromDeviceRpcResponseActorMsg; +import org.thingsboard.server.service.rpc.RemoveRpcActorMsg; +import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg; + +import java.util.List; +import java.util.UUID; import static org.assertj.core.api.Assertions.assertThat; @@ -30,6 +55,10 @@ class ProtoUtilsTest { TenantId tenantId = TenantId.fromUUID(UUID.fromString("35e10f77-16e7-424d-ae46-ee780f87ac4f")); EntityId entityId = new RuleChainId(UUID.fromString("c640b635-4f0f-41e6-b10b-25a86003094e")); + DeviceId deviceId = new DeviceId(UUID.fromString("ceebb9e5-4239-437c-a507-dc5f71f1232d")); + EdgeId edgeId = new EdgeId(UUID.fromString("364be452-2183-459b-af93-1ddb325feac1")); + UUID id = UUID.fromString("31a07d85-6ed5-46f8-83c0-6715cb0a8782"); + @Test void toProtoComponentLifecycleMsg() { ComponentLifecycleMsg msg = new ComponentLifecycleMsg(tenantId, entityId, ComponentLifecycleEvent.UPDATED); @@ -68,4 +97,482 @@ class ProtoUtilsTest { assertThat(ProtoUtils.toProto(msg)).as("to proto").isEqualTo(proto); } -} \ No newline at end of file + @Test + void toProtoEdgeEventUpdateMsg() { + EdgeEventUpdateMsg msg = new EdgeEventUpdateMsg(tenantId, edgeId); + + TransportProtos.EdgeEventUpdateMsgProto proto = ProtoUtils.toProto(msg); + + assertThat(proto).as("to proto").isEqualTo(TransportProtos.EdgeEventUpdateMsgProto.newBuilder() + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) + .setEdgeIdMSB(edgeId.getId().getMostSignificantBits()) + .setEdgeIdLSB(edgeId.getId().getLeastSignificantBits()) + .build() + ); + + assertThat(ProtoUtils.fromProto(proto)).as("from proto").isEqualTo(msg); + } + + @Test + void fromProtoEdgeEventUpdateMsg() { + TransportProtos.EdgeEventUpdateMsgProto proto = TransportProtos.EdgeEventUpdateMsgProto.newBuilder() + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) + .setEdgeIdMSB(edgeId.getId().getMostSignificantBits()) + .setEdgeIdLSB(edgeId.getId().getLeastSignificantBits()) + .build(); + + EdgeEventUpdateMsg msg = ProtoUtils.fromProto(proto); + + assertThat(msg).as("from proto").isEqualTo( + new EdgeEventUpdateMsg(tenantId, edgeId)); + + assertThat(ProtoUtils.toProto(msg)).as("to proto").isEqualTo(proto); + } + + @Test + void toProtoEdgeSyncRequestMsg() { + ToEdgeSyncRequest msg = new ToEdgeSyncRequest(id, tenantId, edgeId); + + TransportProtos.ToEdgeSyncRequestMsgProto proto = ProtoUtils.toProto(msg); + + assertThat(proto).as("to proto").isEqualTo(TransportProtos.ToEdgeSyncRequestMsgProto.newBuilder() + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) + .setRequestIdMSB(id.getMostSignificantBits()) + .setRequestIdLSB(id.getLeastSignificantBits()) + .setEdgeIdMSB(edgeId.getId().getMostSignificantBits()) + .setEdgeIdLSB(edgeId.getId().getLeastSignificantBits()) + .build() + ); + + assertThat(ProtoUtils.fromProto(proto)).as("from proto").isEqualTo(msg); + } + + @Test + void fromProtoEdgeSyncRequestMsg() { + TransportProtos.ToEdgeSyncRequestMsgProto proto = TransportProtos.ToEdgeSyncRequestMsgProto.newBuilder() + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) + .setRequestIdMSB(id.getMostSignificantBits()) + .setRequestIdLSB(id.getLeastSignificantBits()) + .setEdgeIdMSB(edgeId.getId().getMostSignificantBits()) + .setEdgeIdLSB(edgeId.getId().getLeastSignificantBits()) + .build(); + + ToEdgeSyncRequest msg = ProtoUtils.fromProto(proto); + + assertThat(msg).as("from proto").isEqualTo( + new ToEdgeSyncRequest(id, tenantId, edgeId)); + + assertThat(ProtoUtils.toProto(msg)).as("to proto").isEqualTo(proto); + } + + @Test + void toProtoEdgeSyncResponseMsg() { + FromEdgeSyncResponse msg = new FromEdgeSyncResponse(id, tenantId, edgeId, true); + + TransportProtos.FromEdgeSyncResponseMsgProto proto = ProtoUtils.toProto(msg); + + assertThat(proto).as("to proto").isEqualTo(TransportProtos.FromEdgeSyncResponseMsgProto.newBuilder() + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) + .setResponseIdMSB(id.getMostSignificantBits()) + .setResponseIdLSB(id.getLeastSignificantBits()) + .setEdgeIdMSB(edgeId.getId().getMostSignificantBits()) + .setEdgeIdLSB(edgeId.getId().getLeastSignificantBits()) + .setSuccess(true) + .build() + ); + + assertThat(ProtoUtils.fromProto(proto)).as("from proto").isEqualTo(msg); + } + + @Test + void fromProtoEdgeSyncResponseMsg() { + TransportProtos.FromEdgeSyncResponseMsgProto proto = TransportProtos.FromEdgeSyncResponseMsgProto.newBuilder() + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) + .setResponseIdMSB(id.getMostSignificantBits()) + .setResponseIdLSB(id.getLeastSignificantBits()) + .setEdgeIdMSB(edgeId.getId().getMostSignificantBits()) + .setEdgeIdLSB(edgeId.getId().getLeastSignificantBits()) + .setSuccess(true) + .build(); + + FromEdgeSyncResponse msg = ProtoUtils.fromProto(proto); + + assertThat(msg).as("from proto").isEqualTo( + new FromEdgeSyncResponse(id, tenantId, edgeId, true)); + + assertThat(ProtoUtils.toProto(msg)).as("to proto").isEqualTo(proto); + } + + @Test + void toProtoDeviceEdgeUpdateMsg() { + DeviceEdgeUpdateMsg msg = new DeviceEdgeUpdateMsg(tenantId, deviceId, edgeId); + + TransportProtos.ToDeviceActorNotificationMsgProto proto = ProtoUtils.toProto(msg); + Assertions.assertNotNull(proto); + + TransportProtos.DeviceEdgeUpdateMsgProto deviceProto = TransportProtos.DeviceEdgeUpdateMsgProto.newBuilder() + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) + .setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) + .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) + .setEdgeIdMSB(edgeId.getId().getMostSignificantBits()) + .setEdgeIdLSB(edgeId.getId().getLeastSignificantBits()) + .build(); + + assertThat(proto).as("to proto").isEqualTo(TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setDeviceEdgeUpdateMsg(deviceProto).build()); + + assertThat(ProtoUtils.fromProto(proto)).as("from proto").isEqualTo(msg); + } + + @Test + void fromProtoDeviceEdgeUpdateMsg() { + TransportProtos.DeviceEdgeUpdateMsgProto deviceProto = TransportProtos.DeviceEdgeUpdateMsgProto.newBuilder() + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) + .setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) + .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) + .setEdgeIdMSB(edgeId.getId().getMostSignificantBits()) + .setEdgeIdLSB(edgeId.getId().getLeastSignificantBits()) + .build(); + + TransportProtos.ToDeviceActorNotificationMsgProto proto = TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setDeviceEdgeUpdateMsg(deviceProto).build(); + + ToDeviceActorNotificationMsg msg = ProtoUtils.fromProto(proto); + + assertThat(msg).as("from proto").isEqualTo( + new DeviceEdgeUpdateMsg(tenantId, deviceId, edgeId)); + + assertThat(ProtoUtils.toProto(msg)).as("to proto").isEqualTo(proto); + } + + @Test + void toProtoDeviceNameOrTypeUpdateMsg() { + String deviceName = "test", deviceType = "test"; + DeviceNameOrTypeUpdateMsg msg = new DeviceNameOrTypeUpdateMsg(tenantId, deviceId, deviceName, deviceType); + + TransportProtos.ToDeviceActorNotificationMsgProto proto = ProtoUtils.toProto(msg); + Assertions.assertNotNull(proto); + + TransportProtos.DeviceNameOrTypeUpdateMsgProto deviceProto = TransportProtos.DeviceNameOrTypeUpdateMsgProto.newBuilder() + .setTenantIdMSB(msg.getTenantId().getId().getMostSignificantBits()) + .setTenantIdLSB(msg.getTenantId().getId().getLeastSignificantBits()) + .setDeviceIdMSB(msg.getDeviceId().getId().getMostSignificantBits()) + .setDeviceIdLSB(msg.getDeviceId().getId().getLeastSignificantBits()) + .setDeviceName(msg.getDeviceName()) + .setDeviceType(msg.getDeviceType()) + .build(); + + assertThat(proto).as("to proto").isEqualTo(TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setDeviceNameOrTypeMsg(deviceProto).build()); + + + assertThat(ProtoUtils.fromProto(proto)).as("from proto").isEqualTo(msg); + } + + @Test + void fromProtoDeviceNameOrTypeUpdateMsg() { + String deviceName = "test", deviceType = "test"; + TransportProtos.DeviceNameOrTypeUpdateMsgProto deviceProto = TransportProtos.DeviceNameOrTypeUpdateMsgProto.newBuilder() + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) + .setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) + .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) + .setDeviceName(deviceName) + .setDeviceType(deviceType) + .build(); + + TransportProtos.ToDeviceActorNotificationMsgProto proto = TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setDeviceNameOrTypeMsg(deviceProto).build(); + + ToDeviceActorNotificationMsg msg = ProtoUtils.fromProto(proto); + + assertThat(msg).as("from proto").isEqualTo( + new DeviceNameOrTypeUpdateMsg(tenantId, deviceId, deviceName, deviceType)); + + assertThat(ProtoUtils.toProto(msg)).as("to proto").isEqualTo(proto); + } + + @Test + void toProtoDeviceAttributesEventMsg() { + long ts = System.currentTimeMillis(); + List list = List.of(new BaseAttributeKvEntry(ts, new StringDataEntry("key", "value"))); + DeviceAttributesEventNotificationMsg msg = new DeviceAttributesEventNotificationMsg(tenantId, deviceId, null, "CLIENT_SCOPE", list, false); + + TransportProtos.ToDeviceActorNotificationMsgProto proto = ProtoUtils.toProto(msg); + Assertions.assertNotNull(proto); + + TransportProtos.DeviceAttributesEventMsgProto deviceProto = TransportProtos.DeviceAttributesEventMsgProto.newBuilder() + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) + .setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) + .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) + .setScope(TransportProtos.AttributeScopeProto.CLIENT_SCOPE) + .setDeleted(false) + .addValues(TransportProtos.AttributeValueProto.newBuilder() + .setLastUpdateTs(ts) + .setHasV(true) + .setKey("key") + .setStringV("value") + .setType(TransportProtos.KeyValueType.STRING_V).build()) + .build(); + + assertThat(proto).as("to proto").isEqualTo(TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setDeviceAttributesEventMsg(deviceProto).build()); + + + assertThat(ProtoUtils.fromProto(proto)).as("from proto").isEqualTo(msg); + } + + @Test + void fromProtoDeviceAttributesEventMsg() { + long ts = System.currentTimeMillis(); + TransportProtos.DeviceAttributesEventMsgProto deviceProto = TransportProtos.DeviceAttributesEventMsgProto.newBuilder() + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) + .setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) + .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) + .setScope(TransportProtos.AttributeScopeProto.CLIENT_SCOPE) + .setDeleted(false) + .addValues(TransportProtos.AttributeValueProto.newBuilder() + .setLastUpdateTs(ts) + .setHasV(true) + .setKey("key") + .setStringV("value") + .setType(TransportProtos.KeyValueType.STRING_V).build()) + .build(); + + TransportProtos.ToDeviceActorNotificationMsgProto proto = TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setDeviceAttributesEventMsg(deviceProto).build(); + + ToDeviceActorNotificationMsg msg = ProtoUtils.fromProto(proto); + + assertThat(msg).as("from proto").isEqualTo( + new DeviceAttributesEventNotificationMsg(tenantId, deviceId, null, "CLIENT_SCOPE", + List.of(new BaseAttributeKvEntry(ts, new StringDataEntry("key", "value"))), false)); + + assertThat(ProtoUtils.toProto(msg)).as("to proto").isEqualTo(proto); + } + + @Test + void toProtoDeviceCredentialsUpdateMsg() { + DeviceCredentials deviceCredentials = new DeviceCredentials(); + deviceCredentials.setDeviceId(deviceId); + deviceCredentials.setCredentialsType(DeviceCredentialsType.ACCESS_TOKEN); + deviceCredentials.setCredentialsValue("test"); + deviceCredentials.setCredentialsId("test"); + DeviceCredentialsUpdateNotificationMsg msg = new DeviceCredentialsUpdateNotificationMsg(tenantId, deviceId, deviceCredentials); + + TransportProtos.ToDeviceActorNotificationMsgProto proto = ProtoUtils.toProto(msg); + Assertions.assertNotNull(proto); + + TransportProtos.DeviceCredentialsUpdateMsgProto deviceCredentialsProto = TransportProtos.DeviceCredentialsUpdateMsgProto.newBuilder() + .setTenantIdMSB(msg.getTenantId().getId().getMostSignificantBits()) + .setTenantIdLSB(msg.getTenantId().getId().getLeastSignificantBits()) + .setDeviceIdMSB(msg.getDeviceId().getId().getMostSignificantBits()) + .setDeviceIdLSB(msg.getDeviceId().getId().getLeastSignificantBits()) + .setDeviceCredentials(TransportProtos.DeviceCredentialsProto.newBuilder() + .setDeviceIdMSB(msg.getDeviceCredentials().getDeviceId().getId().getMostSignificantBits()) + .setDeviceIdLSB(msg.getDeviceCredentials().getDeviceId().getId().getLeastSignificantBits()) + .setCredentialsId(msg.getDeviceCredentials().getCredentialsId()) + .setCredentialsValue(msg.getDeviceCredentials().getCredentialsValue()) + .setCredentialsType(TransportProtos.CredentialsType.valueOf(msg.getDeviceCredentials().getCredentialsType().name())) + .build()) + .build(); + + assertThat(proto).as("to proto").isEqualTo(TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setDeviceCredentialsUpdateMsg(deviceCredentialsProto).build()); + + assertThat(ProtoUtils.fromProto(proto)).as("from proto").isEqualTo(msg); + } + + @Test + void fromProtoDeviceCredentialsUpdateMsg() { + DeviceCredentials deviceCredentials = new DeviceCredentials(); + deviceCredentials.setDeviceId(deviceId); + deviceCredentials.setCredentialsType(DeviceCredentialsType.ACCESS_TOKEN); + deviceCredentials.setCredentialsValue("test"); + deviceCredentials.setCredentialsId("test"); + + TransportProtos.DeviceCredentialsUpdateMsgProto deviceCredentialsProto = TransportProtos.DeviceCredentialsUpdateMsgProto.newBuilder() + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) + .setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) + .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) + .setDeviceCredentials(TransportProtos.DeviceCredentialsProto.newBuilder() + .setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) + .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) + .setCredentialsId(deviceCredentials.getCredentialsId()) + .setCredentialsValue(deviceCredentials.getCredentialsValue()) + .setCredentialsType(TransportProtos.CredentialsType.valueOf(deviceCredentials.getCredentialsType().name())) + .build()) + .build(); + + TransportProtos.ToDeviceActorNotificationMsgProto proto = TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setDeviceCredentialsUpdateMsg(deviceCredentialsProto).build(); + + ToDeviceActorNotificationMsg msg = ProtoUtils.fromProto(proto); + + assertThat(msg).as("from proto").isEqualTo( + new DeviceCredentialsUpdateNotificationMsg(tenantId, deviceId, deviceCredentials)); + + assertThat(ProtoUtils.toProto(msg)).as("to proto").isEqualTo(proto); + } + + @Test + void toProtoDeviceRpcRequestActorMsg() { + String serviceId = "cadcaac6-85c3-4211-9756-f074dcd1e7f7"; + ToDeviceRpcRequest request = new ToDeviceRpcRequest(id, tenantId, deviceId, true, 0, new ToDeviceRpcRequestBody("method", "params"), false, 0, ""); + ToDeviceRpcRequestActorMsg msg = new ToDeviceRpcRequestActorMsg(serviceId, request); + + TransportProtos.ToDeviceActorNotificationMsgProto proto = ProtoUtils.toProto(msg); + Assertions.assertNotNull(proto); + + TransportProtos.ToDeviceRpcRequestActorMsgProto deviceProto = TransportProtos.ToDeviceRpcRequestActorMsgProto.newBuilder() + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) + .setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) + .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) + .setServiceId(serviceId) + .setToDeviceRpcRequestMsg(TransportProtos.ToDeviceRpcRequestMsg.newBuilder() + .setRequestId(0) + .setMethodName("method") + .setParams("params") + .setExpirationTime(0) + .setRequestIdMSB(id.getMostSignificantBits()) + .setRequestIdLSB(id.getLeastSignificantBits()) + .setOneway(true) + .build()) + .build(); + + assertThat(proto).as("to proto").isEqualTo(TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setToDeviceRpcRequestMsg(deviceProto).build()); + + assertThat(ProtoUtils.fromProto(proto)).as("from proto").isEqualTo(msg); + } + + @Test + void fromProtoDeviceRpcRequestActorMsg() { + String serviceId = "cadcaac6-85c3-4211-9756-f074dcd1e7f7"; + ToDeviceRpcRequest request = new ToDeviceRpcRequest(id, tenantId, deviceId, true, 0, new ToDeviceRpcRequestBody("method", "params"), false, 0, ""); + + TransportProtos.ToDeviceRpcRequestActorMsgProto deviceProto = TransportProtos.ToDeviceRpcRequestActorMsgProto.newBuilder() + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) + .setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) + .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) + .setServiceId(serviceId) + .setToDeviceRpcRequestMsg(TransportProtos.ToDeviceRpcRequestMsg.newBuilder() + .setRequestId(0) + .setMethodName("method") + .setParams("params") + .setExpirationTime(0) + .setRequestIdMSB(id.getMostSignificantBits()) + .setRequestIdLSB(id.getLeastSignificantBits()) + .setOneway(true) + .build()) + .build(); + + TransportProtos.ToDeviceActorNotificationMsgProto proto = TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setToDeviceRpcRequestMsg(deviceProto).build(); + + ToDeviceActorNotificationMsg msg = ProtoUtils.fromProto(proto); + + assertThat(msg).as("from proto").isEqualTo( + new ToDeviceRpcRequestActorMsg(serviceId, request)); + + assertThat(ProtoUtils.toProto(msg)).as("to proto").isEqualTo(proto); + } + + @Test + void toProtoDeviceRpcResponseActorMsg() { + FromDeviceRpcResponse response = new FromDeviceRpcResponse(id, "response", RpcError.NOT_FOUND); + FromDeviceRpcResponseActorMsg msg = new FromDeviceRpcResponseActorMsg(23, tenantId, deviceId, response); + + TransportProtos.ToDeviceActorNotificationMsgProto proto = ProtoUtils.toProto(msg); + Assertions.assertNotNull(proto); + + TransportProtos.FromDeviceRpcResponseActorMsgProto deviceProto = TransportProtos.FromDeviceRpcResponseActorMsgProto.newBuilder() + .setRequestId(23) + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) + .setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) + .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) + .setRpcResponse(TransportProtos.FromDeviceRPCResponseProto.newBuilder() + .setRequestIdMSB(id.getMostSignificantBits()) + .setRequestIdLSB(id.getLeastSignificantBits()) + .setError(RpcError.NOT_FOUND.ordinal()) + .setResponse("response").build()) + .build(); + + assertThat(proto).as("to proto").isEqualTo(TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setFromDeviceRpcResponseMsg(deviceProto).build()); + + assertThat(ProtoUtils.fromProto(proto)).as("from proto").isEqualTo(msg); + } + + @Test + void fromProtoDeviceRpcResponseActorMsg() { + FromDeviceRpcResponse response = new FromDeviceRpcResponse(id, "response", RpcError.NOT_FOUND); + + TransportProtos.FromDeviceRpcResponseActorMsgProto deviceProto = TransportProtos.FromDeviceRpcResponseActorMsgProto.newBuilder() + .setRequestId(23) + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) + .setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) + .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) + .setRpcResponse(TransportProtos.FromDeviceRPCResponseProto.newBuilder() + .setRequestIdMSB(id.getMostSignificantBits()) + .setRequestIdLSB(id.getLeastSignificantBits()) + .setError(RpcError.NOT_FOUND.ordinal()) + .setResponse("response").build()) + .build(); + + TransportProtos.ToDeviceActorNotificationMsgProto proto = TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setFromDeviceRpcResponseMsg(deviceProto).build(); + + ToDeviceActorNotificationMsg msg = ProtoUtils.fromProto(proto); + + assertThat(msg).as("from proto").isEqualTo( + new FromDeviceRpcResponseActorMsg(23, tenantId, deviceId, response)); + + assertThat(ProtoUtils.toProto(msg)).as("to proto").isEqualTo(proto); + } + + @Test + void toProtoRemoveRpcActorMsg() { + RemoveRpcActorMsg msg = new RemoveRpcActorMsg(tenantId, deviceId, id); + + TransportProtos.ToDeviceActorNotificationMsgProto proto = ProtoUtils.toProto(msg); + Assertions.assertNotNull(proto); + + TransportProtos.RemoveRpcActorMsgProto rpcProto = TransportProtos.RemoveRpcActorMsgProto.newBuilder() + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) + .setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) + .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) + .setRequestIdMSB(id.getMostSignificantBits()) + .setRequestIdLSB(id.getLeastSignificantBits()) + .build(); + + assertThat(proto).as("to proto").isEqualTo(TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setRemoveRpcActorMsg(rpcProto).build()); + + assertThat(ProtoUtils.fromProto(proto)).as("from proto").isEqualTo(msg); + } + + @Test + void fromProtoRemoveRpcActorMsg() { + TransportProtos.RemoveRpcActorMsgProto rpcProto = TransportProtos.RemoveRpcActorMsgProto.newBuilder() + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) + .setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) + .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) + .setRequestIdMSB(id.getMostSignificantBits()) + .setRequestIdLSB(id.getLeastSignificantBits()) + .build(); + + TransportProtos.ToDeviceActorNotificationMsgProto proto = TransportProtos.ToDeviceActorNotificationMsgProto.newBuilder().setRemoveRpcActorMsg(rpcProto).build(); + + ToDeviceActorNotificationMsg msg = ProtoUtils.fromProto(proto); + + assertThat(msg).as("from proto").isEqualTo( + new RemoveRpcActorMsg(tenantId, deviceId, id)); + + assertThat(ProtoUtils.toProto(msg)).as("to proto").isEqualTo(proto); + } +} diff --git a/common/cluster-api/src/main/proto/queue.proto b/common/cluster-api/src/main/proto/queue.proto index d8439d0cd0..927142bb22 100644 --- a/common/cluster-api/src/main/proto/queue.proto +++ b/common/cluster-api/src/main/proto/queue.proto @@ -128,6 +128,17 @@ message KeyValueProto { string json_v = 7; } +enum AttributeScopeProto { + CLIENT_SCOPE = 0; + SERVER_SCOPE = 1; + SHARED_SCOPE = 2; +} + +message AttributeKey { + AttributeScopeProto scope = 1; + string attributeKey = 2; +} + message AttributeValueProto { int64 lastUpdateTs = 1; KeyValueType type = 2; @@ -137,6 +148,7 @@ message AttributeValueProto { double double_v = 6; string string_v = 7; string json_v = 8; + optional string key = 9; } message TsKvProto { @@ -459,7 +471,7 @@ message DeviceCredentialsProto { int64 deviceIdLSB = 2; CredentialsType credentialsType = 3; string credentialsId = 4; - string credentialsValue = 5; + optional string credentialsValue = 5; } message CredentialsDataProto { @@ -797,6 +809,106 @@ message EdgeNotificationMsgProto { PostAttributeMsg postAttributesMsg = 12; } +message EdgeEventUpdateMsgProto { + int64 tenantIdMSB = 1; + int64 tenantIdLSB = 2; + int64 edgeIdMSB = 3; + int64 edgeIdLSB = 4; +} + +message ToEdgeSyncRequestMsgProto { + int64 tenantIdMSB = 1; + int64 tenantIdLSB = 2; + int64 requestIdMSB = 3; + int64 requestIdLSB = 4; + int64 edgeIdMSB = 5; + int64 edgeIdLSB = 6; +} + +message FromEdgeSyncResponseMsgProto { + int64 tenantIdMSB = 1; + int64 tenantIdLSB = 2; + int64 responseIdMSB = 3; + int64 responseIdLSB = 4; + int64 edgeIdMSB = 5; + int64 edgeIdLSB = 6; + bool success = 7; +} + +message DeviceEdgeUpdateMsgProto { + int64 tenantIdMSB = 1; + int64 tenantIdLSB = 2; + int64 deviceIdMSB = 3; + int64 deviceIdLSB = 4; + int64 edgeIdMSB = 5; + int64 edgeIdLSB = 6; +} + +message DeviceNameOrTypeUpdateMsgProto { + int64 tenantIdMSB = 1; + int64 tenantIdLSB = 2; + int64 deviceIdMSB = 3; + int64 deviceIdLSB = 4; + string deviceName = 5; + string deviceType = 6; +} + +message DeviceAttributesEventMsgProto { + int64 tenantIdMSB = 1; + int64 tenantIdLSB = 2; + int64 deviceIdMSB = 3; + int64 deviceIdLSB = 4; + repeated AttributeKey deletedKeys = 5; + optional AttributeScopeProto scope = 6; + repeated AttributeValueProto values = 7; + bool deleted = 8; +} + +message DeviceCredentialsUpdateMsgProto { + int64 tenantIdMSB = 1; + int64 tenantIdLSB = 2; + int64 deviceIdMSB = 3; + int64 deviceIdLSB = 4; + DeviceCredentialsProto deviceCredentials = 5; +} + +message ToDeviceRpcRequestActorMsgProto { + int64 tenantIdMSB = 1; + int64 tenantIdLSB = 2; + int64 deviceIdMSB = 3; + int64 deviceIdLSB = 4; + string serviceId = 5; + ToDeviceRpcRequestMsg toDeviceRpcRequestMsg = 6; +} + +message FromDeviceRpcResponseActorMsgProto { + int64 tenantIdMSB = 1; + int64 tenantIdLSB = 2; + int64 deviceIdMSB = 3; + int64 deviceIdLSB = 4; + int32 requestId = 5; + FromDeviceRPCResponseProto rpcResponse = 6; +} + +message RemoveRpcActorMsgProto { + int64 tenantIdMSB = 1; + int64 tenantIdLSB = 2; + int64 requestIdMSB = 3; + int64 requestIdLSB = 4; + int64 deviceIdMSB = 5; + int64 deviceIdLSB = 6; +} + +message ToDeviceActorNotificationMsgProto { + DeviceEdgeUpdateMsgProto deviceEdgeUpdateMsg = 1; + DeviceNameOrTypeUpdateMsgProto deviceNameOrTypeMsg = 2; + DeviceAttributesEventMsgProto deviceAttributesEventMsg = 3; + DeviceCredentialsUpdateMsgProto deviceCredentialsUpdateMsg = 4; + ToDeviceRpcRequestActorMsgProto toDeviceRpcRequestMsg = 5; + FromDeviceRpcResponseActorMsgProto fromDeviceRpcResponseMsg = 6; + RemoveRpcActorMsgProto removeRpcActorMsg = 7; +} + /** TB Core to Version Control Service */ @@ -1022,12 +1134,13 @@ message ToCoreMsg { TransportToDeviceActorMsg toDeviceActorMsg = 1; DeviceStateServiceMsgProto deviceStateServiceMsg = 2; SubscriptionMgrMsgProto toSubscriptionMgrMsg = 3; - bytes toDeviceActorNotificationMsg = 4; + bytes toDeviceActorNotificationMsg = 4 [deprecated = true]; EdgeNotificationMsgProto edgeNotificationMsg = 5; DeviceActivityProto deviceActivityMsg = 6; NotificationSchedulerServiceMsg notificationSchedulerServiceMsg = 7; LifecycleEventProto lifecycleEventMsg = 8; ErrorEventProto errorEventMsg = 9; + ToDeviceActorNotificationMsgProto toDeviceActorNotification = 10; } /* High priority messages with low latency are handled by ThingsBoard Core Service separately */ @@ -1036,15 +1149,18 @@ message ToCoreNotificationMsg { LocalSubscriptionServiceMsgProto toLocalSubscriptionServiceMsg = 1; FromDeviceRPCResponseProto fromDeviceRpcResponse = 2; bytes componentLifecycleMsg = 3; //will be removed in 3.6.1 in favour of ComponentLifecycleMsgProto - bytes edgeEventUpdateMsg = 4; + bytes edgeEventUpdateMsg = 4 [deprecated = true]; QueueUpdateMsg queueUpdateMsg = 5; QueueDeleteMsg queueDeleteMsg = 6; VersionControlResponseMsg vcResponseMsg = 7; - bytes toEdgeSyncRequestMsg = 8; - bytes fromEdgeSyncResponseMsg = 9; + bytes toEdgeSyncRequestMsg = 8 [deprecated = true]; + bytes fromEdgeSyncResponseMsg = 9 [deprecated = true]; SubscriptionMgrMsgProto toSubscriptionMgrMsg = 10; NotificationRuleProcessorMsg notificationRuleProcessorMsg = 11; ComponentLifecycleMsgProto componentLifecycle = 12; + EdgeEventUpdateMsgProto edgeEventUpdate = 13; + ToEdgeSyncRequestMsgProto toEdgeSyncRequest = 14; + FromEdgeSyncResponseMsgProto fromEdgeSyncResponse = 15; } /* Messages that are handled by ThingsBoard RuleEngine Service */ diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/plugin/ComponentLifecycleEvent.java b/common/data/src/main/java/org/thingsboard/server/common/data/plugin/ComponentLifecycleEvent.java index 969baee088..316a5cfdea 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/plugin/ComponentLifecycleEvent.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/plugin/ComponentLifecycleEvent.java @@ -23,4 +23,4 @@ import java.io.Serializable; public enum ComponentLifecycleEvent implements Serializable { // In sync with ComponentLifecycleEvent proto CREATED, STARTED, ACTIVATED, SUSPENDED, UPDATED, STOPPED, DELETED -} \ No newline at end of file +} diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/edge/EdgeEventUpdateMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/edge/EdgeEventUpdateMsg.java index 30d95ac28c..f8c7bd437f 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/edge/EdgeEventUpdateMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/edge/EdgeEventUpdateMsg.java @@ -15,23 +15,18 @@ */ package org.thingsboard.server.common.msg.edge; -import lombok.Getter; -import lombok.ToString; +import lombok.Data; import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.MsgType; -@ToString +@Data public class EdgeEventUpdateMsg implements EdgeSessionMsg { - @Getter - private final TenantId tenantId; - @Getter - private final EdgeId edgeId; - public EdgeEventUpdateMsg(TenantId tenantId, EdgeId edgeId) { - this.tenantId = tenantId; - this.edgeId = edgeId; - } + private static final long serialVersionUID = -8050114506822836537L; + + private final TenantId tenantId; + private final EdgeId edgeId; @Override public MsgType getMsgType() { diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/edge/FromEdgeSyncResponse.java b/common/message/src/main/java/org/thingsboard/server/common/msg/edge/FromEdgeSyncResponse.java index 74c2d96b40..f9302fcb3f 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/edge/FromEdgeSyncResponse.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/edge/FromEdgeSyncResponse.java @@ -15,18 +15,18 @@ */ package org.thingsboard.server.common.msg.edge; -import lombok.AllArgsConstructor; -import lombok.Getter; +import lombok.Data; import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.MsgType; import java.util.UUID; -@AllArgsConstructor -@Getter +@Data public class FromEdgeSyncResponse implements EdgeSessionMsg { + private static final long serialVersionUID = -6360890886315347486L; + private final UUID id; private final TenantId tenantId; private final EdgeId edgeId; diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/edge/ToEdgeSyncRequest.java b/common/message/src/main/java/org/thingsboard/server/common/msg/edge/ToEdgeSyncRequest.java index 06778089e2..f6324ba5d0 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/edge/ToEdgeSyncRequest.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/edge/ToEdgeSyncRequest.java @@ -15,17 +15,18 @@ */ package org.thingsboard.server.common.msg.edge; -import lombok.AllArgsConstructor; -import lombok.Getter; +import lombok.Data; import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.MsgType; import java.util.UUID; -@AllArgsConstructor -@Getter +@Data public class ToEdgeSyncRequest implements EdgeSessionMsg { + + private static final long serialVersionUID = -7624597032448212259L; + private final UUID id; private final TenantId tenantId; private final EdgeId edgeId; diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/rpc/FromDeviceRpcResponse.java b/common/message/src/main/java/org/thingsboard/server/common/msg/rpc/FromDeviceRpcResponse.java index 75c2f35a5e..0ef89e460e 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/rpc/FromDeviceRpcResponse.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/rpc/FromDeviceRpcResponse.java @@ -15,9 +15,7 @@ */ package org.thingsboard.server.common.msg.rpc; -import lombok.Getter; -import lombok.RequiredArgsConstructor; -import lombok.ToString; +import lombok.Data; import org.thingsboard.server.common.data.rpc.RpcError; import java.io.Serializable; @@ -27,10 +25,11 @@ import java.util.UUID; /** * @author Andrew Shvayka */ -@RequiredArgsConstructor -@ToString +@Data public class FromDeviceRpcResponse implements Serializable { - @Getter + + private static final long serialVersionUID = -3799452502112373491L; + private final UUID id; private final String response; private final RpcError error; diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/rpc/ToDeviceRpcRequest.java b/common/message/src/main/java/org/thingsboard/server/common/msg/rpc/ToDeviceRpcRequest.java index 08785e4c31..846022bdc8 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/rpc/ToDeviceRpcRequest.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/rpc/ToDeviceRpcRequest.java @@ -29,6 +29,9 @@ import java.util.UUID; */ @Data public class ToDeviceRpcRequest implements Serializable { + + private static final long serialVersionUID = -7089247105087346214L; + private final UUID id; private final TenantId tenantId; private final DeviceId deviceId; @@ -40,4 +43,3 @@ public class ToDeviceRpcRequest implements Serializable { @JsonIgnore private final String additionalInfo; } - diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/msg/DeviceAttributesEventNotificationMsg.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/msg/DeviceAttributesEventNotificationMsg.java index 215322bde3..544445b58c 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/msg/DeviceAttributesEventNotificationMsg.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/msg/DeviceAttributesEventNotificationMsg.java @@ -15,9 +15,7 @@ */ package org.thingsboard.rule.engine.api.msg; -import lombok.AllArgsConstructor; -import lombok.Getter; -import lombok.ToString; +import lombok.Data; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKey; @@ -32,21 +30,16 @@ import java.util.Set; /** * @author Andrew Shvayka */ -@ToString -@AllArgsConstructor +@Data public class DeviceAttributesEventNotificationMsg implements ToDeviceActorNotificationMsg { - @Getter + private static final long serialVersionUID = 2422071590415277039L; + private final TenantId tenantId; - @Getter private final DeviceId deviceId; - @Getter private final Set deletedKeys; - @Getter private final String scope; - @Getter private final List values; - @Getter private final boolean deleted; public static DeviceAttributesEventNotificationMsg onUpdate(TenantId tenantId, DeviceId deviceId, String scope, List values) { diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/msg/DeviceEdgeUpdateMsg.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/msg/DeviceEdgeUpdateMsg.java index 96fc263b81..f969f2f895 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/msg/DeviceEdgeUpdateMsg.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/msg/DeviceEdgeUpdateMsg.java @@ -15,7 +15,6 @@ */ package org.thingsboard.rule.engine.api.msg; -import lombok.AllArgsConstructor; import lombok.Data; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EdgeId; @@ -24,9 +23,10 @@ import org.thingsboard.server.common.msg.MsgType; import org.thingsboard.server.common.msg.ToDeviceActorNotificationMsg; @Data -@AllArgsConstructor public class DeviceEdgeUpdateMsg implements ToDeviceActorNotificationMsg { + private static final long serialVersionUID = 4679029228395462172L; + private final TenantId tenantId; private final DeviceId deviceId; private final EdgeId edgeId; diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/msg/DeviceNameOrTypeUpdateMsg.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/msg/DeviceNameOrTypeUpdateMsg.java index b85899e06d..0bca6a5a74 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/msg/DeviceNameOrTypeUpdateMsg.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/msg/DeviceNameOrTypeUpdateMsg.java @@ -15,7 +15,6 @@ */ package org.thingsboard.rule.engine.api.msg; -import lombok.AllArgsConstructor; import lombok.Data; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.TenantId; @@ -23,7 +22,6 @@ import org.thingsboard.server.common.msg.MsgType; import org.thingsboard.server.common.msg.ToDeviceActorNotificationMsg; @Data -@AllArgsConstructor public class DeviceNameOrTypeUpdateMsg implements ToDeviceActorNotificationMsg { private static final long serialVersionUID = -5738949227650536685L; From dd73ae900ba9c3ddb3df18dd44ede4a50ce13673 Mon Sep 17 00:00:00 2001 From: Andrii Landiak Date: Mon, 23 Oct 2023 11:11:40 +0300 Subject: [PATCH 2/6] Remove deprecated coreMsgs from log --- .../server/service/queue/TbCoreConsumerStats.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java b/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java index 5ff72e97e3..b7f1147ec0 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java @@ -174,16 +174,10 @@ public class TbCoreConsumerStats { toCoreNfComponentLifecycleCounter.increment(); } else if (msg.hasEdgeEventUpdate()) { toCoreNfEdgeEventUpdateCounter.increment(); - } else if (!msg.getEdgeEventUpdateMsg().isEmpty()) { - toCoreNfEdgeEventUpdateCounter.increment(); } else if (msg.hasToEdgeSyncRequest()) { toCoreNfEdgeSyncRequestCounter.increment(); - } else if (!msg.getToEdgeSyncRequestMsg().isEmpty()) { - toCoreNfEdgeSyncRequestCounter.increment(); } else if (msg.hasFromEdgeSyncResponse()) { toCoreNfEdgeSyncResponseCounter.increment(); - } else if (!msg.getFromEdgeSyncResponseMsg().isEmpty()) { - toCoreNfEdgeSyncResponseCounter.increment(); } else if (msg.hasQueueUpdateMsg()) { toCoreNfQueueUpdateCounter.increment(); } else if (msg.hasQueueDeleteMsg()) { From 00000f7d0551da81ea87702a3286f610ad4a1728 Mon Sep 17 00:00:00 2001 From: Andrii Landiak Date: Thu, 26 Oct 2023 09:30:04 +0300 Subject: [PATCH 3/6] Leave old messaged, mark them as deprecated, for removal; Left intentionally to avoid throwNotHandled --- .../server/actors/device/DeviceActor.java | 6 ++--- .../device/DeviceActorMessageProcessor.java | 8 +++--- .../controller/TelemetryController.java | 2 +- .../telemetry/BaseTelemetryProcessor.java | 2 +- .../DefaultTbNotificationEntityService.java | 2 +- .../ota/DefaultOtaPackageStateService.java | 2 +- .../queue/DefaultTbClusterService.java | 4 +-- .../queue/DefaultTbCoreConsumerService.java | 27 +++++++++++++++++++ .../server/service/queue/ProtoUtils.java | 8 +++--- .../service/queue/TbCoreConsumerStats.java | 6 +++++ .../DefaultSubscriptionManagerService.java | 2 +- .../server/service/queue/ProtoUtilsTest.java | 8 +++--- .../engine}/DeviceAttributes.java | 2 +- .../DeviceAttributesEventNotificationMsg.java | 2 +- ...eviceCredentialsUpdateNotificationMsg.java | 2 +- .../engine}/DeviceEdgeUpdateMsg.java | 2 +- .../engine}/DeviceMetaData.java | 2 +- .../engine}/DeviceNameOrTypeUpdateMsg.java | 2 +- 18 files changed, 61 insertions(+), 28 deletions(-) rename common/message/src/main/java/org/thingsboard/server/common/msg/{ruleengine => rule/engine}/DeviceAttributes.java (98%) rename common/message/src/main/java/org/thingsboard/server/common/msg/{ruleengine => rule/engine}/DeviceAttributesEventNotificationMsg.java (97%) rename common/message/src/main/java/org/thingsboard/server/common/msg/{ruleengine => rule/engine}/DeviceCredentialsUpdateNotificationMsg.java (96%) rename common/message/src/main/java/org/thingsboard/server/common/msg/{ruleengine => rule/engine}/DeviceEdgeUpdateMsg.java (95%) rename common/message/src/main/java/org/thingsboard/server/common/msg/{ruleengine => rule/engine}/DeviceMetaData.java (94%) rename common/message/src/main/java/org/thingsboard/server/common/msg/{ruleengine => rule/engine}/DeviceNameOrTypeUpdateMsg.java (95%) diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java index 17119232cc..48425d58db 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActor.java @@ -16,9 +16,9 @@ package org.thingsboard.server.actors.device; import lombok.extern.slf4j.Slf4j; -import org.thingsboard.server.common.msg.ruleengine.DeviceAttributesEventNotificationMsg; -import org.thingsboard.server.common.msg.ruleengine.DeviceEdgeUpdateMsg; -import org.thingsboard.server.common.msg.ruleengine.DeviceNameOrTypeUpdateMsg; +import org.thingsboard.server.common.msg.rule.engine.DeviceAttributesEventNotificationMsg; +import org.thingsboard.server.common.msg.rule.engine.DeviceEdgeUpdateMsg; +import org.thingsboard.server.common.msg.rule.engine.DeviceNameOrTypeUpdateMsg; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.TbActorCtx; import org.thingsboard.server.actors.TbActorException; diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index d87548487c..cd7a7be4df 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -25,10 +25,10 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.LinkedHashMapRemoveEldest; -import org.thingsboard.server.common.msg.ruleengine.DeviceAttributesEventNotificationMsg; -import org.thingsboard.server.common.msg.ruleengine.DeviceCredentialsUpdateNotificationMsg; -import org.thingsboard.server.common.msg.ruleengine.DeviceEdgeUpdateMsg; -import org.thingsboard.server.common.msg.ruleengine.DeviceNameOrTypeUpdateMsg; +import org.thingsboard.server.common.msg.rule.engine.DeviceAttributesEventNotificationMsg; +import org.thingsboard.server.common.msg.rule.engine.DeviceCredentialsUpdateNotificationMsg; +import org.thingsboard.server.common.msg.rule.engine.DeviceEdgeUpdateMsg; +import org.thingsboard.server.common.msg.rule.engine.DeviceNameOrTypeUpdateMsg; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.TbActorCtx; import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor; diff --git a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java index 829a6dd040..5f88752ff4 100644 --- a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java +++ b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java @@ -45,7 +45,7 @@ import org.springframework.web.bind.annotation.RestController; import org.springframework.web.context.request.async.DeferredResult; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ThingsBoardThreadFactory; -import org.thingsboard.server.common.msg.ruleengine.DeviceAttributesEventNotificationMsg; +import org.thingsboard.server.common.msg.rule.engine.DeviceAttributesEventNotificationMsg; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.StringUtils; diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java index fafd2d1c4d..52bf7dbaa2 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java @@ -29,7 +29,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.thingsboard.common.util.JacksonUtil; -import org.thingsboard.server.common.msg.ruleengine.DeviceAttributesEventNotificationMsg; +import org.thingsboard.server.common.msg.rule.engine.DeviceAttributesEventNotificationMsg; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceProfile; diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/DefaultTbNotificationEntityService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/DefaultTbNotificationEntityService.java index 1b1fa4a4b4..72e8f44020 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/DefaultTbNotificationEntityService.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/DefaultTbNotificationEntityService.java @@ -19,7 +19,7 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.thingsboard.common.util.JacksonUtil; -import org.thingsboard.server.common.msg.ruleengine.DeviceCredentialsUpdateNotificationMsg; +import org.thingsboard.server.common.msg.rule.engine.DeviceCredentialsUpdateNotificationMsg; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.HasName; diff --git a/application/src/main/java/org/thingsboard/server/service/ota/DefaultOtaPackageStateService.java b/application/src/main/java/org/thingsboard/server/service/ota/DefaultOtaPackageStateService.java index a6bce41ce0..fef96c0fcb 100644 --- a/application/src/main/java/org/thingsboard/server/service/ota/DefaultOtaPackageStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/ota/DefaultOtaPackageStateService.java @@ -20,7 +20,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; import org.thingsboard.rule.engine.api.RuleEngineTelemetryService; -import org.thingsboard.server.common.msg.ruleengine.DeviceAttributesEventNotificationMsg; +import org.thingsboard.server.common.msg.rule.engine.DeviceAttributesEventNotificationMsg; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Device; diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index 7955b52393..7241c0e571 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -23,8 +23,8 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Lazy; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; -import org.thingsboard.server.common.msg.ruleengine.DeviceEdgeUpdateMsg; -import org.thingsboard.server.common.msg.ruleengine.DeviceNameOrTypeUpdateMsg; +import org.thingsboard.server.common.msg.rule.engine.DeviceEdgeUpdateMsg; +import org.thingsboard.server.common.msg.rule.engine.DeviceNameOrTypeUpdateMsg; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.ApiUsageState; import org.thingsboard.server.common.data.Device; diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index 8d725b837c..56038214d0 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -279,6 +279,19 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService actorMsg = encodingService.decode(toCoreMsg.getToDeviceActorNotificationMsg().toByteArray()); + if (actorMsg.isPresent()) { + TbActorMsg tbActorMsg = actorMsg.get(); + if (tbActorMsg.getMsgType().equals(MsgType.DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG)) { + tbCoreDeviceRpcService.forwardRpcRequestToDeviceActor((ToDeviceRpcRequestActorMsg) tbActorMsg); + } else { + log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get()); + actorContext.tell(actorMsg.get()); + } + } + callback.onSuccess(); } else if (toCoreMsg.hasNotificationSchedulerServiceMsg()) { TransportProtos.NotificationSchedulerServiceMsg notificationSchedulerServiceMsg = toCoreMsg.getNotificationSchedulerServiceMsg(); log.trace("[{}] Forwarding message to notification scheduler service {}", id, toCoreMsg.getNotificationSchedulerServiceMsg()); @@ -359,12 +372,21 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService actorMsg, TbCallback callback) { + actorMsg.ifPresent(tbActorMsg -> forwardToAppActor(id, tbActorMsg)); + callback.onSuccess(); + } + private void forwardToAppActor(UUID id, TbActorMsg actorMsg) { log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg); actorContext.tell(actorMsg); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/ProtoUtils.java b/application/src/main/java/org/thingsboard/server/service/queue/ProtoUtils.java index 9d6e215c39..539f26ef03 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/ProtoUtils.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/ProtoUtils.java @@ -15,10 +15,10 @@ */ package org.thingsboard.server.service.queue; -import org.thingsboard.server.common.msg.ruleengine.DeviceAttributesEventNotificationMsg; -import org.thingsboard.server.common.msg.ruleengine.DeviceCredentialsUpdateNotificationMsg; -import org.thingsboard.server.common.msg.ruleengine.DeviceEdgeUpdateMsg; -import org.thingsboard.server.common.msg.ruleengine.DeviceNameOrTypeUpdateMsg; +import org.thingsboard.server.common.msg.rule.engine.DeviceAttributesEventNotificationMsg; +import org.thingsboard.server.common.msg.rule.engine.DeviceCredentialsUpdateNotificationMsg; +import org.thingsboard.server.common.msg.rule.engine.DeviceEdgeUpdateMsg; +import org.thingsboard.server.common.msg.rule.engine.DeviceNameOrTypeUpdateMsg; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EdgeId; diff --git a/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java b/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java index b7f1147ec0..5ff72e97e3 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java @@ -174,10 +174,16 @@ public class TbCoreConsumerStats { toCoreNfComponentLifecycleCounter.increment(); } else if (msg.hasEdgeEventUpdate()) { toCoreNfEdgeEventUpdateCounter.increment(); + } else if (!msg.getEdgeEventUpdateMsg().isEmpty()) { + toCoreNfEdgeEventUpdateCounter.increment(); } else if (msg.hasToEdgeSyncRequest()) { toCoreNfEdgeSyncRequestCounter.increment(); + } else if (!msg.getToEdgeSyncRequestMsg().isEmpty()) { + toCoreNfEdgeSyncRequestCounter.increment(); } else if (msg.hasFromEdgeSyncResponse()) { toCoreNfEdgeSyncResponseCounter.increment(); + } else if (!msg.getFromEdgeSyncResponseMsg().isEmpty()) { + toCoreNfEdgeSyncResponseCounter.increment(); } else if (msg.hasQueueUpdateMsg()) { toCoreNfQueueUpdateCounter.increment(); } else if (msg.hasQueueDeleteMsg()) { diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java index b92aa8cd44..01461f87d3 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java @@ -21,7 +21,7 @@ import org.springframework.stereotype.Service; import org.thingsboard.common.util.DonAsynchron; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ThingsBoardThreadFactory; -import org.thingsboard.server.common.msg.ruleengine.DeviceAttributesEventNotificationMsg; +import org.thingsboard.server.common.msg.rule.engine.DeviceAttributesEventNotificationMsg; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.EntityType; diff --git a/application/src/test/java/org/thingsboard/server/service/queue/ProtoUtilsTest.java b/application/src/test/java/org/thingsboard/server/service/queue/ProtoUtilsTest.java index 0024c6b259..a1ac31a68f 100644 --- a/application/src/test/java/org/thingsboard/server/service/queue/ProtoUtilsTest.java +++ b/application/src/test/java/org/thingsboard/server/service/queue/ProtoUtilsTest.java @@ -17,10 +17,10 @@ package org.thingsboard.server.service.queue; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import org.thingsboard.server.common.msg.ruleengine.DeviceAttributesEventNotificationMsg; -import org.thingsboard.server.common.msg.ruleengine.DeviceCredentialsUpdateNotificationMsg; -import org.thingsboard.server.common.msg.ruleengine.DeviceEdgeUpdateMsg; -import org.thingsboard.server.common.msg.ruleengine.DeviceNameOrTypeUpdateMsg; +import org.thingsboard.server.common.msg.rule.engine.DeviceAttributesEventNotificationMsg; +import org.thingsboard.server.common.msg.rule.engine.DeviceCredentialsUpdateNotificationMsg; +import org.thingsboard.server.common.msg.rule.engine.DeviceEdgeUpdateMsg; +import org.thingsboard.server.common.msg.rule.engine.DeviceNameOrTypeUpdateMsg; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EdgeId; diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/ruleengine/DeviceAttributes.java b/common/message/src/main/java/org/thingsboard/server/common/msg/rule/engine/DeviceAttributes.java similarity index 98% rename from common/message/src/main/java/org/thingsboard/server/common/msg/ruleengine/DeviceAttributes.java rename to common/message/src/main/java/org/thingsboard/server/common/msg/rule/engine/DeviceAttributes.java index f7f042485a..00079558e6 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/ruleengine/DeviceAttributes.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/rule/engine/DeviceAttributes.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.common.msg.ruleengine; +package org.thingsboard.server.common.msg.rule.engine; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.kv.AttributeKey; diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/ruleengine/DeviceAttributesEventNotificationMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/rule/engine/DeviceAttributesEventNotificationMsg.java similarity index 97% rename from common/message/src/main/java/org/thingsboard/server/common/msg/ruleengine/DeviceAttributesEventNotificationMsg.java rename to common/message/src/main/java/org/thingsboard/server/common/msg/rule/engine/DeviceAttributesEventNotificationMsg.java index 93db8efd11..d0124fbb01 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/ruleengine/DeviceAttributesEventNotificationMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/rule/engine/DeviceAttributesEventNotificationMsg.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.common.msg.ruleengine; +package org.thingsboard.server.common.msg.rule.engine; import lombok.Data; import org.thingsboard.server.common.data.id.DeviceId; diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/ruleengine/DeviceCredentialsUpdateNotificationMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/rule/engine/DeviceCredentialsUpdateNotificationMsg.java similarity index 96% rename from common/message/src/main/java/org/thingsboard/server/common/msg/ruleengine/DeviceCredentialsUpdateNotificationMsg.java rename to common/message/src/main/java/org/thingsboard/server/common/msg/rule/engine/DeviceCredentialsUpdateNotificationMsg.java index e4edbc2311..b13026ba38 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/ruleengine/DeviceCredentialsUpdateNotificationMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/rule/engine/DeviceCredentialsUpdateNotificationMsg.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.common.msg.ruleengine; +package org.thingsboard.server.common.msg.rule.engine; import lombok.Data; import org.thingsboard.server.common.data.id.DeviceId; diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/ruleengine/DeviceEdgeUpdateMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/rule/engine/DeviceEdgeUpdateMsg.java similarity index 95% rename from common/message/src/main/java/org/thingsboard/server/common/msg/ruleengine/DeviceEdgeUpdateMsg.java rename to common/message/src/main/java/org/thingsboard/server/common/msg/rule/engine/DeviceEdgeUpdateMsg.java index 99411f7043..99e52e31b0 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/ruleengine/DeviceEdgeUpdateMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/rule/engine/DeviceEdgeUpdateMsg.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.common.msg.ruleengine; +package org.thingsboard.server.common.msg.rule.engine; import lombok.Data; import org.thingsboard.server.common.data.id.DeviceId; diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/ruleengine/DeviceMetaData.java b/common/message/src/main/java/org/thingsboard/server/common/msg/rule/engine/DeviceMetaData.java similarity index 94% rename from common/message/src/main/java/org/thingsboard/server/common/msg/ruleengine/DeviceMetaData.java rename to common/message/src/main/java/org/thingsboard/server/common/msg/rule/engine/DeviceMetaData.java index 21302c0f3c..f7fbb7b1ef 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/ruleengine/DeviceMetaData.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/rule/engine/DeviceMetaData.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.common.msg.ruleengine; +package org.thingsboard.server.common.msg.rule.engine; import lombok.Data; import org.thingsboard.server.common.data.id.DeviceId; diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/ruleengine/DeviceNameOrTypeUpdateMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/rule/engine/DeviceNameOrTypeUpdateMsg.java similarity index 95% rename from common/message/src/main/java/org/thingsboard/server/common/msg/ruleengine/DeviceNameOrTypeUpdateMsg.java rename to common/message/src/main/java/org/thingsboard/server/common/msg/rule/engine/DeviceNameOrTypeUpdateMsg.java index db6ff91b22..13b6a8558f 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/ruleengine/DeviceNameOrTypeUpdateMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/rule/engine/DeviceNameOrTypeUpdateMsg.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.common.msg.ruleengine; +package org.thingsboard.server.common.msg.rule.engine; import lombok.Data; import org.thingsboard.server.common.data.id.DeviceId; From b98567ddfb9b422c774026538865594a0c11a2f2 Mon Sep 17 00:00:00 2001 From: Andrii Landiak Date: Thu, 26 Oct 2023 09:31:53 +0300 Subject: [PATCH 4/6] Minor refactoring --- .../server/actors/device/DeviceActorMessageProcessor.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java index cd7a7be4df..50e237ac73 100644 --- a/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/device/DeviceActorMessageProcessor.java @@ -271,13 +271,9 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso } private ToDeviceRpcRequestMsg createToDeviceRpcRequestMsg(ToDeviceRpcRequest request) { - return createToDeviceRpcRequestMsg(request, rpcSeq++); - } - - private ToDeviceRpcRequestMsg createToDeviceRpcRequestMsg(ToDeviceRpcRequest request, int rpcSeq) { ToDeviceRpcRequestBody body = request.getBody(); return ToDeviceRpcRequestMsg.newBuilder() - .setRequestId(rpcSeq) + .setRequestId(rpcSeq++) .setMethodName(body.getMethod()) .setParams(body.getParams()) .setExpirationTime(request.getExpirationTime()) From dc6956cb3da1d9d1a65f6edf242f745239c17762 Mon Sep 17 00:00:00 2001 From: Andrii Landiak Date: Tue, 31 Oct 2023 09:21:10 +0200 Subject: [PATCH 5/6] Add tests required after review --- .../server/service/queue/ProtoUtilsTest.java | 38 +++++++++++++++---- 1 file changed, 31 insertions(+), 7 deletions(-) diff --git a/application/src/test/java/org/thingsboard/server/service/queue/ProtoUtilsTest.java b/application/src/test/java/org/thingsboard/server/service/queue/ProtoUtilsTest.java index a1ac31a68f..1a696b730d 100644 --- a/application/src/test/java/org/thingsboard/server/service/queue/ProtoUtilsTest.java +++ b/application/src/test/java/org/thingsboard/server/service/queue/ProtoUtilsTest.java @@ -17,17 +17,17 @@ package org.thingsboard.server.service.queue; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import org.thingsboard.server.common.msg.rule.engine.DeviceAttributesEventNotificationMsg; -import org.thingsboard.server.common.msg.rule.engine.DeviceCredentialsUpdateNotificationMsg; -import org.thingsboard.server.common.msg.rule.engine.DeviceEdgeUpdateMsg; -import org.thingsboard.server.common.msg.rule.engine.DeviceNameOrTypeUpdateMsg; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.kv.AttributeKey; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; +import org.thingsboard.server.common.data.kv.BooleanDataEntry; +import org.thingsboard.server.common.data.kv.DoubleDataEntry; +import org.thingsboard.server.common.data.kv.JsonDataEntry; import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; import org.thingsboard.server.common.data.rpc.RpcError; @@ -39,13 +39,18 @@ import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse; import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse; -import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest; -import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponseActorMsg; import org.thingsboard.server.common.msg.rpc.RemoveRpcActorMsg; +import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest; import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequestActorMsg; +import org.thingsboard.server.common.msg.rule.engine.DeviceAttributesEventNotificationMsg; +import org.thingsboard.server.common.msg.rule.engine.DeviceCredentialsUpdateNotificationMsg; +import org.thingsboard.server.common.msg.rule.engine.DeviceEdgeUpdateMsg; +import org.thingsboard.server.common.msg.rule.engine.DeviceNameOrTypeUpdateMsg; +import org.thingsboard.server.gen.transport.TransportProtos; import java.util.List; +import java.util.Set; import java.util.UUID; import static org.assertj.core.api.Assertions.assertThat; @@ -68,7 +73,7 @@ class ProtoUtilsTest { @Test void protoEntityTypeSerialization() { - for(EntityType entityType : EntityType.values()){ + for (EntityType entityType : EntityType.values()) { assertThat(ProtoUtils.fromProto(ProtoUtils.toProto(entityType))).as(entityType.getNormalName()).isEqualTo(entityType); } } @@ -115,6 +120,25 @@ class ProtoUtilsTest { TransportProtos.ToDeviceActorNotificationMsgProto serializedMsg = ProtoUtils.toProto(msg); Assertions.assertNotNull(serializedMsg); assertThat(ProtoUtils.fromProto(serializedMsg)).as("deserialized").isEqualTo(msg); + + msg = new DeviceAttributesEventNotificationMsg(tenantId, deviceId, null, "SERVER_SCOPE", + List.of(new BaseAttributeKvEntry(System.currentTimeMillis(), new DoubleDataEntry("doubleEntry", 231.5)), + new BaseAttributeKvEntry(System.currentTimeMillis(), new JsonDataEntry("jsonEntry", "jsonValue"))), false); + serializedMsg = ProtoUtils.toProto(msg); + Assertions.assertNotNull(serializedMsg); + assertThat(ProtoUtils.fromProto(serializedMsg)).as("deserialized").isEqualTo(msg); + + msg = new DeviceAttributesEventNotificationMsg(tenantId, deviceId, null, "SERVER_SCOPE", + List.of(new BaseAttributeKvEntry(System.currentTimeMillis(), new DoubleDataEntry("entry", 11.3)), + new BaseAttributeKvEntry(System.currentTimeMillis(), new BooleanDataEntry("jsonEntry", true))), false); + serializedMsg = ProtoUtils.toProto(msg); + Assertions.assertNotNull(serializedMsg); + assertThat(ProtoUtils.fromProto(serializedMsg)).as("deserialized").isEqualTo(msg); + + msg = new DeviceAttributesEventNotificationMsg(tenantId, deviceId, Set.of(new AttributeKey("SHARED_SCOPE", "attributeKey")), null, null, true); + serializedMsg = ProtoUtils.toProto(msg); + Assertions.assertNotNull(serializedMsg); + assertThat(ProtoUtils.fromProto(serializedMsg)).as("deserialized").isEqualTo(msg); } @Test From ddfac584a0f4a144faba422a0dcaaed8d84b8741 Mon Sep 17 00:00:00 2001 From: Andrii Landiak Date: Tue, 31 Oct 2023 09:58:41 +0200 Subject: [PATCH 6/6] Make componentLifecycle bytes deprecated. Cleaning code with a single line for TbClusterService --- .../queue/DefaultTbClusterService.java | 21 +++++++++---------- .../queue/DefaultTbCoreConsumerService.java | 4 +++- .../server/service/queue/ProtoUtils.java | 14 ++++++------- common/cluster-api/src/main/proto/queue.proto | 4 ++-- 4 files changed, 22 insertions(+), 21 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index 7241c0e571..d5aa187603 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -23,8 +23,6 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Lazy; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; -import org.thingsboard.server.common.msg.rule.engine.DeviceEdgeUpdateMsg; -import org.thingsboard.server.common.msg.rule.engine.DeviceNameOrTypeUpdateMsg; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.ApiUsageState; import org.thingsboard.server.common.data.Device; @@ -57,6 +55,8 @@ import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse; +import org.thingsboard.server.common.msg.rule.engine.DeviceEdgeUpdateMsg; +import org.thingsboard.server.common.msg.rule.engine.DeviceNameOrTypeUpdateMsg; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.FromDeviceRPCResponseProto; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; @@ -81,6 +81,8 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; +import static org.thingsboard.server.service.queue.ProtoUtils.toProto; + @Service @Slf4j @RequiredArgsConstructor @@ -132,8 +134,7 @@ public class DefaultTbClusterService implements TbClusterService { public void pushMsgToCore(ToDeviceActorNotificationMsg msg, TbQueueCallback callback) { TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, msg.getTenantId(), msg.getDeviceId()); log.trace("PUSHING msg: {} to:{}", msg, tpi); - TransportProtos.ToDeviceActorNotificationMsgProto proto = ProtoUtils.toProto(msg); - ToCoreMsg toCoreMsg = ToCoreMsg.newBuilder().setToDeviceActorNotification(proto).build(); + ToCoreMsg toCoreMsg = ToCoreMsg.newBuilder().setToDeviceActorNotification(toProto(msg)).build(); producerProvider.getTbCoreMsgProducer().send(tpi, new TbProtoQueueMsg<>(msg.getDeviceId().getId(), toCoreMsg), callback); toCoreMsgs.incrementAndGet(); } @@ -362,24 +363,22 @@ public class DefaultTbClusterService implements TbClusterService { @Override public void onEdgeEventUpdate(TenantId tenantId, EdgeId edgeId) { log.trace("[{}] Processing edge {} event update ", tenantId, edgeId); - TransportProtos.EdgeEventUpdateMsgProto edgeEventUpdateMsgProto = ProtoUtils.toProto(new EdgeEventUpdateMsg(tenantId, edgeId)); - ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setEdgeEventUpdate(edgeEventUpdateMsgProto).build(); + EdgeEventUpdateMsg msg = new EdgeEventUpdateMsg(tenantId, edgeId); + ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setEdgeEventUpdate(toProto(msg)).build(); pushEdgeSyncMsgToCore(edgeId, toCoreMsg); } @Override public void pushEdgeSyncRequestToCore(ToEdgeSyncRequest toEdgeSyncRequest) { log.trace("[{}] Processing edge sync request {} ", toEdgeSyncRequest.getTenantId(), toEdgeSyncRequest); - TransportProtos.ToEdgeSyncRequestMsgProto toEdgeSyncRequestMsgProto = ProtoUtils.toProto(toEdgeSyncRequest); - ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setToEdgeSyncRequest(toEdgeSyncRequestMsgProto).build(); + ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setToEdgeSyncRequest(toProto(toEdgeSyncRequest)).build(); pushEdgeSyncMsgToCore(toEdgeSyncRequest.getEdgeId(), toCoreMsg); } @Override public void pushEdgeSyncResponseToCore(FromEdgeSyncResponse fromEdgeSyncResponse) { log.trace("[{}] Processing edge sync response {}", fromEdgeSyncResponse.getTenantId(), fromEdgeSyncResponse); - TransportProtos.FromEdgeSyncResponseMsgProto fromEdgeSyncResponseMsgProto = ProtoUtils.toProto(fromEdgeSyncResponse); - ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setFromEdgeSyncResponse(fromEdgeSyncResponseMsgProto).build(); + ToCoreNotificationMsg toCoreMsg = ToCoreNotificationMsg.newBuilder().setFromEdgeSyncResponse(toProto(fromEdgeSyncResponse)).build(); pushEdgeSyncMsgToCore(fromEdgeSyncResponse.getEdgeId(), toCoreMsg); } @@ -394,7 +393,7 @@ public class DefaultTbClusterService implements TbClusterService { } private void broadcast(ComponentLifecycleMsg msg) { - TransportProtos.ComponentLifecycleMsgProto componentLifecycleMsgProto = ProtoUtils.toProto(msg); + TransportProtos.ComponentLifecycleMsgProto componentLifecycleMsgProto = toProto(msg); TbQueueProducer> toRuleEngineProducer = producerProvider.getRuleEngineNotificationsMsgProducer(); Set tbRuleEngineServices = partitionService.getAllServiceIds(ServiceType.TB_RULE_ENGINE); EntityType entityType = msg.getEntityId().getEntityType(); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index 56038214d0..57be0c8de4 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -664,7 +664,9 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService actorMsg, TbCallback callback) { - actorMsg.ifPresent(tbActorMsg -> forwardToAppActor(id, tbActorMsg)); + if (actorMsg.isPresent()) { + forwardToAppActor(id, actorMsg.get()); + } callback.onSuccess(); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/ProtoUtils.java b/application/src/main/java/org/thingsboard/server/service/queue/ProtoUtils.java index 539f26ef03..76a8e3e89a 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/ProtoUtils.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/ProtoUtils.java @@ -15,10 +15,6 @@ */ package org.thingsboard.server.service.queue; -import org.thingsboard.server.common.msg.rule.engine.DeviceAttributesEventNotificationMsg; -import org.thingsboard.server.common.msg.rule.engine.DeviceCredentialsUpdateNotificationMsg; -import org.thingsboard.server.common.msg.rule.engine.DeviceEdgeUpdateMsg; -import org.thingsboard.server.common.msg.rule.engine.DeviceNameOrTypeUpdateMsg; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EdgeId; @@ -44,11 +40,15 @@ import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse; import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse; -import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest; -import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponseActorMsg; import org.thingsboard.server.common.msg.rpc.RemoveRpcActorMsg; +import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest; import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequestActorMsg; +import org.thingsboard.server.common.msg.rule.engine.DeviceAttributesEventNotificationMsg; +import org.thingsboard.server.common.msg.rule.engine.DeviceCredentialsUpdateNotificationMsg; +import org.thingsboard.server.common.msg.rule.engine.DeviceEdgeUpdateMsg; +import org.thingsboard.server.common.msg.rule.engine.DeviceNameOrTypeUpdateMsg; +import org.thingsboard.server.gen.transport.TransportProtos; import java.util.ArrayList; import java.util.Arrays; @@ -416,7 +416,7 @@ public class ProtoUtils { } else if (proto.hasDeviceNameOrTypeMsg()) { return fromProto(proto.getDeviceNameOrTypeMsg()); } else if (proto.hasDeviceAttributesEventMsg()) { - return fromProto(proto.getDeviceAttributesEventMsg()); + return fromProto(proto.getDeviceAttributesEventMsg()); } else if (proto.hasDeviceCredentialsUpdateMsg()) { return fromProto(proto.getDeviceCredentialsUpdateMsg()); } else if (proto.hasToDeviceRpcRequestMsg()) { diff --git a/common/cluster-api/src/main/proto/queue.proto b/common/cluster-api/src/main/proto/queue.proto index f79cf0b731..480a7f3ba4 100644 --- a/common/cluster-api/src/main/proto/queue.proto +++ b/common/cluster-api/src/main/proto/queue.proto @@ -1152,7 +1152,7 @@ message ToCoreMsg { message ToCoreNotificationMsg { LocalSubscriptionServiceMsgProto toLocalSubscriptionServiceMsg = 1; FromDeviceRPCResponseProto fromDeviceRpcResponse = 2; - bytes componentLifecycleMsg = 3; //will be removed in 3.6.1 in favour of ComponentLifecycleMsgProto + bytes componentLifecycleMsg = 3 [deprecated = true]; bytes edgeEventUpdateMsg = 4 [deprecated = true]; QueueUpdateMsg queueUpdateMsg = 5; QueueDeleteMsg queueDeleteMsg = 6; @@ -1177,7 +1177,7 @@ message ToRuleEngineMsg { } message ToRuleEngineNotificationMsg { - bytes componentLifecycleMsg = 1; // will be removed in 3.6.1 in favour of ComponentLifecycleMsgProto + bytes componentLifecycleMsg = 1 [deprecated = true]; FromDeviceRPCResponseProto fromDeviceRpcResponse = 2; QueueUpdateMsg queueUpdateMsg = 3; QueueDeleteMsg queueDeleteMsg = 4;