From 8ccd70fc6571b9a413f0c0f941d025b1c82d4432 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Wed, 20 Dec 2023 17:33:04 +0100 Subject: [PATCH] Used ProtoUtils instead of ProtoWIthFSTService for serialization --- .../server/actors/ActorSystemContext.java | 7 +- .../device/DeviceMsgConstructorV1.java | 9 +- .../tenant/TenantMsgConstructorV1.java | 7 +- .../device/DeviceEdgeProcessorV1.java | 10 +- .../profile/DeviceProfileEdgeProcessorV1.java | 11 +- .../queue/DefaultTbClusterService.java | 11 +- .../queue/DefaultTbCoreConsumerService.java | 42 +- .../DefaultTbRuleEngineConsumerService.java | 8 +- .../service/queue/TbCoreConsumerStats.java | 11 +- .../processing/AbstractConsumerService.java | 29 +- .../DefaultGitVersionControlQueueService.java | 8 +- .../transport/DefaultTransportApiService.java | 19 +- .../server/edge/AbstractEdgeTest.java | 19 +- .../rpc/processor/BaseEdgeProcessorTest.java | 4 - .../device/AbstractDeviceProcessorTest.java | 2 - .../queue/DefaultTbClusterServiceTest.java | 5 +- .../DefaultTransportApiServiceTest.java | 3 - .../server/cache/device/DeviceRedisCache.java | 4 +- .../server/common/util/ProtoUtils.java | 518 +++++++++++++++--- common/proto/src/main/proto/queue.proto | 192 +++++-- .../RemoteNotificationRuleProcessor.java | 7 +- .../util/DataDecodingEncodingService.java | 27 - .../queue/util/ProtoWithFSTService.java | 63 --- .../service/ProtoTransportEntityService.java | 16 +- .../TransportDeviceProfileCache.java | 6 +- .../TransportTenantProfileCache.java | 4 +- .../DefaultTransportDeviceProfileCache.java | 40 +- .../DefaultTransportResourceCache.java | 16 +- .../service/DefaultTransportService.java | 91 ++- .../DefaultTransportTenantProfileCache.java | 51 +- .../DefaultClusterVersionControlService.java | 18 +- .../dao/device/DeviceProfileRedisCache.java | 4 +- 32 files changed, 725 insertions(+), 537 deletions(-) delete mode 100644 common/queue/src/main/java/org/thingsboard/server/queue/util/DataDecodingEncodingService.java delete mode 100644 common/queue/src/main/java/org/thingsboard/server/queue/util/ProtoWithFSTService.java diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index 09dcf3ca1e..edbc2c34ea 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -29,7 +29,6 @@ import org.springframework.data.redis.core.RedisTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.thingsboard.common.util.JacksonUtil; -import org.thingsboard.server.service.executors.PubSubRuleNodeExecutorProvider; import org.thingsboard.rule.engine.api.MailService; import org.thingsboard.rule.engine.api.NotificationCenter; import org.thingsboard.rule.engine.api.SmsService; @@ -93,7 +92,6 @@ import org.thingsboard.server.dao.widget.WidgetsBundleService; import org.thingsboard.server.queue.discovery.DiscoveryService; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; -import org.thingsboard.server.queue.util.DataDecodingEncodingService; import org.thingsboard.server.service.apiusage.TbApiUsageStateService; import org.thingsboard.server.service.component.ComponentDiscoveryService; import org.thingsboard.server.service.edge.rpc.EdgeRpcService; @@ -101,6 +99,7 @@ import org.thingsboard.server.service.entitiy.entityview.TbEntityViewService; import org.thingsboard.server.service.executors.DbCallbackExecutorService; import org.thingsboard.server.service.executors.ExternalCallExecutorService; import org.thingsboard.server.service.executors.NotificationExecutorService; +import org.thingsboard.server.service.executors.PubSubRuleNodeExecutorProvider; import org.thingsboard.server.service.executors.SharedEventLoopGroupService; import org.thingsboard.server.service.mail.MailExecutorService; import org.thingsboard.server.service.profile.TbAssetProfileCache; @@ -183,10 +182,6 @@ public class ActorSystemContext { @Getter private DiscoveryService discoveryService; - @Autowired - @Getter - private DataDecodingEncodingService encodingService; - @Autowired @Getter private DeviceService deviceService; diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/device/DeviceMsgConstructorV1.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/device/DeviceMsgConstructorV1.java index 4442387b21..3211a08d58 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/device/DeviceMsgConstructorV1.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/device/DeviceMsgConstructorV1.java @@ -16,7 +16,6 @@ package org.thingsboard.server.service.edge.rpc.constructor.device; import com.google.protobuf.ByteString; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.Device; @@ -26,7 +25,6 @@ import org.thingsboard.server.gen.edge.v1.DeviceCredentialsUpdateMsg; import org.thingsboard.server.gen.edge.v1.DeviceProfileUpdateMsg; import org.thingsboard.server.gen.edge.v1.DeviceUpdateMsg; import org.thingsboard.server.gen.edge.v1.UpdateMsgType; -import org.thingsboard.server.queue.util.DataDecodingEncodingService; import org.thingsboard.server.queue.util.TbCoreComponent; import java.nio.charset.StandardCharsets; @@ -35,9 +33,6 @@ import java.nio.charset.StandardCharsets; @TbCoreComponent public class DeviceMsgConstructorV1 extends BaseDeviceMsgConstructor { - @Autowired - private DataDecodingEncodingService dataDecodingEncodingService; - @Override public DeviceUpdateMsg constructDeviceUpdatedMsg(UpdateMsgType msgType, Device device) { DeviceUpdateMsg.Builder builder = DeviceUpdateMsg.newBuilder() @@ -69,7 +64,7 @@ public class DeviceMsgConstructorV1 extends BaseDeviceMsgConstructor { .setSoftwareIdLSB(device.getSoftwareId().getId().getLeastSignificantBits()); } if (device.getDeviceData() != null) { - builder.setDeviceDataBytes(ByteString.copyFrom(dataDecodingEncodingService.encode(device.getDeviceData()))); + builder.setDeviceDataBytes(ByteString.copyFrom(device.getDeviceDataBytes())); } return builder.build(); } @@ -98,7 +93,7 @@ public class DeviceMsgConstructorV1 extends BaseDeviceMsgConstructor { .setName(deviceProfile.getName()) .setDefault(deviceProfile.isDefault()) .setType(deviceProfile.getType().name()) - .setProfileDataBytes(ByteString.copyFrom(dataDecodingEncodingService.encode(deviceProfile.getProfileData()))); + .setProfileDataBytes(ByteString.copyFrom(deviceProfile.getProfileDataBytes())); if (deviceProfile.getDefaultQueueName() != null) { builder.setDefaultQueueName(deviceProfile.getDefaultQueueName()); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/tenant/TenantMsgConstructorV1.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/tenant/TenantMsgConstructorV1.java index a3f815cc18..ca2cc5fbb4 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/tenant/TenantMsgConstructorV1.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/tenant/TenantMsgConstructorV1.java @@ -16,7 +16,6 @@ package org.thingsboard.server.service.edge.rpc.constructor.tenant; import com.google.protobuf.ByteString; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.Tenant; @@ -25,7 +24,6 @@ import org.thingsboard.server.gen.edge.v1.EdgeVersion; import org.thingsboard.server.gen.edge.v1.TenantProfileUpdateMsg; import org.thingsboard.server.gen.edge.v1.TenantUpdateMsg; import org.thingsboard.server.gen.edge.v1.UpdateMsgType; -import org.thingsboard.server.queue.util.DataDecodingEncodingService; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.edge.rpc.utils.EdgeVersionUtils; @@ -33,9 +31,6 @@ import org.thingsboard.server.service.edge.rpc.utils.EdgeVersionUtils; @TbCoreComponent public class TenantMsgConstructorV1 implements TenantMsgConstructor { - @Autowired - private DataDecodingEncodingService dataDecodingEncodingService; - @Override public TenantUpdateMsg constructTenantUpdateMsg(UpdateMsgType msgType, Tenant tenant) { TenantUpdateMsg.Builder builder = TenantUpdateMsg.newBuilder() @@ -79,7 +74,7 @@ public class TenantMsgConstructorV1 implements TenantMsgConstructor { @Override public TenantProfileUpdateMsg constructTenantProfileUpdateMsg(UpdateMsgType msgType, TenantProfile tenantProfile, EdgeVersion edgeVersion) { ByteString profileData = EdgeVersionUtils.isEdgeVersionOlderThan(edgeVersion, EdgeVersion.V_3_6_1) ? - ByteString.empty() : ByteString.copyFrom(dataDecodingEncodingService.encode(tenantProfile.getProfileData())); + ByteString.empty() : ByteString.copyFrom(tenantProfile.getProfileDataBytes()); TenantProfileUpdateMsg.Builder builder = TenantProfileUpdateMsg.newBuilder() .setMsgType(msgType) .setIdMSB(tenantProfile.getId().getId().getMostSignificantBits()) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessorV1.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessorV1.java index 98405ca44b..9d970b2bc6 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessorV1.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/DeviceEdgeProcessorV1.java @@ -16,11 +16,9 @@ package org.thingsboard.server.service.edge.rpc.processor.device; import com.datastax.oss.driver.api.core.uuid.Uuids; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.Device; -import org.thingsboard.server.common.data.device.data.DeviceData; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceProfileId; @@ -30,19 +28,14 @@ import org.thingsboard.server.common.data.security.DeviceCredentials; import org.thingsboard.server.common.data.security.DeviceCredentialsType; import org.thingsboard.server.gen.edge.v1.DeviceCredentialsUpdateMsg; import org.thingsboard.server.gen.edge.v1.DeviceUpdateMsg; -import org.thingsboard.server.queue.util.DataDecodingEncodingService; import org.thingsboard.server.queue.util.TbCoreComponent; -import java.util.Optional; import java.util.UUID; @Component @TbCoreComponent public class DeviceEdgeProcessorV1 extends DeviceEdgeProcessor { - @Autowired - private DataDecodingEncodingService dataDecodingEncodingService; - @Override protected Device constructDeviceFromUpdateMsg(TenantId tenantId, DeviceId deviceId, DeviceUpdateMsg deviceUpdateMsg) { Device device = new Device(); @@ -57,8 +50,7 @@ public class DeviceEdgeProcessorV1 extends DeviceEdgeProcessor { UUID deviceProfileUUID = safeGetUUID(deviceUpdateMsg.getDeviceProfileIdMSB(), deviceUpdateMsg.getDeviceProfileIdLSB()); device.setDeviceProfileId(deviceProfileUUID != null ? new DeviceProfileId(deviceProfileUUID) : null); - Optional deviceDataOpt = dataDecodingEncodingService.decode(deviceUpdateMsg.getDeviceDataBytes().toByteArray()); - device.setDeviceData(deviceDataOpt.orElse(null)); + device.setDeviceDataBytes(deviceUpdateMsg.getDeviceDataBytes().toByteArray()); UUID firmwareUUID = safeGetUUID(deviceUpdateMsg.getFirmwareIdMSB(), deviceUpdateMsg.getFirmwareIdLSB()); device.setFirmwareId(firmwareUUID != null ? new OtaPackageId(firmwareUUID) : null); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/profile/DeviceProfileEdgeProcessorV1.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/profile/DeviceProfileEdgeProcessorV1.java index 03461356df..d0f7c2004f 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/profile/DeviceProfileEdgeProcessorV1.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/device/profile/DeviceProfileEdgeProcessorV1.java @@ -16,34 +16,27 @@ package org.thingsboard.server.service.edge.rpc.processor.device.profile; import com.datastax.oss.driver.api.core.uuid.Uuids; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.DeviceProfileProvisionType; import org.thingsboard.server.common.data.DeviceProfileType; import org.thingsboard.server.common.data.DeviceTransportType; import org.thingsboard.server.common.data.StringUtils; -import org.thingsboard.server.common.data.device.profile.DeviceProfileData; import org.thingsboard.server.common.data.id.DashboardId; import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.OtaPackageId; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.gen.edge.v1.DeviceProfileUpdateMsg; -import org.thingsboard.server.queue.util.DataDecodingEncodingService; import org.thingsboard.server.queue.util.TbCoreComponent; import java.nio.charset.StandardCharsets; -import java.util.Optional; import java.util.UUID; @Component @TbCoreComponent public class DeviceProfileEdgeProcessorV1 extends DeviceProfileEdgeProcessor { - @Autowired - private DataDecodingEncodingService dataDecodingEncodingService; - @Override protected DeviceProfile constructDeviceProfileFromUpdateMsg(TenantId tenantId, DeviceProfileId deviceProfileId, DeviceProfileUpdateMsg deviceProfileUpdateMsg) { DeviceProfile deviceProfile = new DeviceProfile(); @@ -63,9 +56,7 @@ public class DeviceProfileEdgeProcessorV1 extends DeviceProfileEdgeProcessor { ? deviceProfileUpdateMsg.getProvisionDeviceKey() : null); deviceProfile.setDefaultQueueName(deviceProfileUpdateMsg.getDefaultQueueName()); - Optional profileDataOpt = - dataDecodingEncodingService.decode(deviceProfileUpdateMsg.getProfileDataBytes().toByteArray()); - deviceProfile.setProfileData(profileDataOpt.orElse(null)); + deviceProfile.setProfileDataBytes(deviceProfileUpdateMsg.getProfileDataBytes().toByteArray()); String defaultQueueName = StringUtils.isNotBlank(deviceProfileUpdateMsg.getDefaultQueueName()) ? deviceProfileUpdateMsg.getDefaultQueueName() : null; diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index 14930912d1..ccdf6ea595 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.service.queue; -import com.google.protobuf.ByteString; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -57,6 +56,7 @@ import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse; import org.thingsboard.server.common.msg.rule.engine.DeviceEdgeUpdateMsg; import org.thingsboard.server.common.msg.rule.engine.DeviceNameOrTypeUpdateMsg; +import org.thingsboard.server.common.util.ProtoUtils; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.FromDeviceRPCResponseProto; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; @@ -68,10 +68,9 @@ import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.common.MultipleTbQueueCallbackWrapper; import org.thingsboard.server.queue.common.TbProtoQueueMsg; -import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.provider.TbQueueProducerProvider; -import org.thingsboard.server.queue.util.DataDecodingEncodingService; import org.thingsboard.server.service.gateway_device.GatewayNotificationsService; import org.thingsboard.server.service.ota.OtaPackageStateService; import org.thingsboard.server.service.profile.TbAssetProfileCache; @@ -112,7 +111,6 @@ public class DefaultTbClusterService implements TbClusterService { private OtaPackageStateService otaPackageStateService; private final TopicService topicService; - private final DataDecodingEncodingService encodingService; private final TbDeviceProfileCache deviceProfileCache; private final TbAssetProfileCache assetProfileCache; private final GatewayNotificationsService gatewayNotificationsService; @@ -351,10 +349,7 @@ public class DefaultTbClusterService implements TbClusterService { public void broadcastEntityChangeToTransport(TenantId tenantId, EntityId entityid, T entity, TbQueueCallback callback) { String entityName = (entity instanceof HasName) ? ((HasName) entity).getName() : entity.getClass().getName(); log.trace("[{}][{}][{}] Processing [{}] change event", tenantId, entityid.getEntityType(), entityid.getId(), entityName); - TransportProtos.EntityUpdateMsg entityUpdateMsg = TransportProtos.EntityUpdateMsg.newBuilder() - .setEntityType(entityid.getEntityType().name()) - .setData(ByteString.copyFrom(encodingService.encode(entity))).build(); - ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setEntityUpdateMsg(entityUpdateMsg).build(); + ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setEntityUpdateMsg(ProtoUtils.toEntityUpdateProto(entity)).build(); broadcast(transportMsg, callback); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index 5283713126..54242f8c71 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -44,8 +44,10 @@ import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse; +import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequestActorMsg; import org.thingsboard.server.common.stats.StatsFactory; import org.thingsboard.server.common.util.ProtoUtils; +import org.thingsboard.server.dao.resource.ImageCacheKey; import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.DeviceStateServiceMsgProto; @@ -59,6 +61,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.TbAlarmDeleteProto; import org.thingsboard.server.gen.transport.TransportProtos.TbAlarmUpdateProto; import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeDeleteProto; import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeUpdateProto; +import org.thingsboard.server.gen.transport.TransportProtos.TbEntitySubEventProto; import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesDeleteProto; import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesUpdateProto; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; @@ -66,14 +69,12 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMs import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg; -import org.thingsboard.server.gen.transport.TransportProtos.TbEntitySubEventProto; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; import org.thingsboard.server.queue.provider.TbCoreQueueFactory; import org.thingsboard.server.queue.util.AfterStartUp; -import org.thingsboard.server.queue.util.DataDecodingEncodingService; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.apiusage.TbApiUsageStateService; import org.thingsboard.server.service.edge.EdgeNotificationService; @@ -83,10 +84,8 @@ import org.thingsboard.server.service.profile.TbAssetProfileCache; import org.thingsboard.server.service.profile.TbDeviceProfileCache; import org.thingsboard.server.service.queue.processing.AbstractConsumerService; import org.thingsboard.server.service.queue.processing.IdMsgPair; -import org.thingsboard.server.dao.resource.ImageCacheKey; import org.thingsboard.server.service.resource.TbImageService; import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService; -import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequestActorMsg; import org.thingsboard.server.service.security.auth.jwt.settings.JwtSettingsService; import org.thingsboard.server.service.state.DeviceStateService; import org.thingsboard.server.service.subscription.SubscriptionManagerService; @@ -154,7 +153,6 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService actorMsg = encodingService.decode(toCoreMsg.getToDeviceActorNotificationMsg().toByteArray()); - if (actorMsg.isPresent()) { - TbActorMsg tbActorMsg = actorMsg.get(); - if (tbActorMsg.getMsgType().equals(MsgType.DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG)) { - tbCoreDeviceRpcService.forwardRpcRequestToDeviceActor((ToDeviceRpcRequestActorMsg) tbActorMsg); - } else { - log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get()); - actorContext.tell(actorMsg.get()); - } - } - callback.onSuccess(); } else if (toCoreMsg.hasNotificationSchedulerServiceMsg()) { TransportProtos.NotificationSchedulerServiceMsg notificationSchedulerServiceMsg = toCoreMsg.getNotificationSchedulerServiceMsg(); log.trace("[{}] Forwarding message to notification scheduler service {}", id, toCoreMsg.getNotificationSchedulerServiceMsg()); @@ -373,28 +358,15 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService notificationRuleTrigger = encodingService.decode(toCoreNotification - .getNotificationRuleProcessorMsg().getTrigger().toByteArray()); - notificationRuleTrigger.ifPresent(notificationRuleProcessor::process); + NotificationRuleTrigger notificationRuleTrigger = + JacksonUtil.fromBytes(toCoreNotification.getNotificationRuleProcessorMsg().getTrigger().toByteArray(), NotificationRuleTrigger.class); + notificationRuleProcessor.process(notificationRuleTrigger); callback.onSuccess(); } else if (toCoreNotification.hasResourceCacheInvalidateMsg()) { forwardToResourceService(toCoreNotification.getResourceCacheInvalidateMsg(), callback); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java index 273d7afc97..d67f0f8354 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java @@ -39,7 +39,6 @@ import org.thingsboard.server.queue.discovery.QueueKey; import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; import org.thingsboard.server.queue.provider.TbRuleEngineQueueFactory; import org.thingsboard.server.queue.util.AfterStartUp; -import org.thingsboard.server.queue.util.DataDecodingEncodingService; import org.thingsboard.server.queue.util.TbRuleEngineComponent; import org.thingsboard.server.service.apiusage.TbApiUsageStateService; import org.thingsboard.server.service.profile.TbAssetProfileCache; @@ -71,7 +70,6 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< public DefaultTbRuleEngineConsumerService(TbRuleEngineConsumerContext ctx, TbRuleEngineQueueFactory tbRuleEngineQueueFactory, ActorSystemContext actorContext, - DataDecodingEncodingService encodingService, TbRuleEngineDeviceRpcService tbDeviceRpcService, QueueService queueService, TbDeviceProfileCache deviceProfileCache, @@ -79,7 +77,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< TbTenantProfileCache tenantProfileCache, TbApiUsageStateService apiUsageStateService, PartitionService partitionService, ApplicationEventPublisher eventPublisher) { - super(actorContext, encodingService, tenantProfileCache, deviceProfileCache, assetProfileCache, apiUsageStateService, partitionService, + super(actorContext, tenantProfileCache, deviceProfileCache, assetProfileCache, apiUsageStateService, partitionService, eventPublisher, tbRuleEngineQueueFactory.createToRuleEngineNotificationsMsgConsumer(), Optional.empty()); this.ctx = ctx; this.tbDeviceRpcService = tbDeviceRpcService; @@ -152,10 +150,6 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< if (nfMsg.hasComponentLifecycle()) { handleComponentLifecycleMsg(id, ProtoUtils.fromProto(nfMsg.getComponentLifecycle())); callback.onSuccess(); - } else if (!nfMsg.getComponentLifecycleMsg().isEmpty()) { - //will be removed in 3.6.1 in favour of hasComponentLifecycle() - handleComponentLifecycleMsg(id, nfMsg.getComponentLifecycleMsg()); - callback.onSuccess(); } else if (nfMsg.hasFromDeviceRpcResponse()) { TransportProtos.FromDeviceRPCResponseProto proto = nfMsg.getFromDeviceRpcResponse(); RpcError error = proto.getError() > 0 ? RpcError.values()[proto.getError()] : null; diff --git a/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java b/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java index 5ff72e97e3..5e45d8c9df 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/TbCoreConsumerStats.java @@ -109,10 +109,9 @@ public class TbCoreConsumerStats { this.toCoreNfSubscriptionServiceCounter = register(statsFactory.createStatsCounter(statsKey, TO_CORE_NF_SUBSCRIPTION_SERVICE)); this.toCoreNfSubscriptionManagerCounter = register(statsFactory.createStatsCounter(statsKey, TO_CORE_NF_SUBSCRIPTION_MANAGER)); this.toCoreNfVersionControlResponseCounter = register(statsFactory.createStatsCounter(statsKey, TO_CORE_NF_VC_RESPONSE)); - } - private StatsCounter register(StatsCounter counter){ + private StatsCounter register(StatsCounter counter) { counters.add(counter); return counter; } @@ -170,20 +169,12 @@ public class TbCoreConsumerStats { toCoreNfDeviceRpcResponseCounter.increment(); } else if (msg.hasComponentLifecycle()) { toCoreNfComponentLifecycleCounter.increment(); - } else if (!msg.getComponentLifecycleMsg().isEmpty()) { - toCoreNfComponentLifecycleCounter.increment(); } else if (msg.hasEdgeEventUpdate()) { toCoreNfEdgeEventUpdateCounter.increment(); - } else if (!msg.getEdgeEventUpdateMsg().isEmpty()) { - toCoreNfEdgeEventUpdateCounter.increment(); } else if (msg.hasToEdgeSyncRequest()) { toCoreNfEdgeSyncRequestCounter.increment(); - } else if (!msg.getToEdgeSyncRequestMsg().isEmpty()) { - toCoreNfEdgeSyncRequestCounter.increment(); } else if (msg.hasFromEdgeSyncResponse()) { toCoreNfEdgeSyncResponseCounter.increment(); - } else if (!msg.getFromEdgeSyncResponseMsg().isEmpty()) { - toCoreNfEdgeSyncResponseCounter.increment(); } else if (msg.hasQueueUpdateMsg()) { toCoreNfQueueUpdateCounter.increment(); } else if (msg.hasQueueDeleteMsg()) { diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java index 091a5a4988..2e2910beb0 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java @@ -15,7 +15,7 @@ */ package org.thingsboard.server.service.queue.processing; -import com.google.protobuf.ByteString; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.ApplicationEventPublisher; @@ -41,7 +41,6 @@ import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbApplicationEventListener; import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; import org.thingsboard.server.queue.util.AfterStartUp; -import org.thingsboard.server.queue.util.DataDecodingEncodingService; import org.thingsboard.server.service.apiusage.TbApiUsageStateService; import org.thingsboard.server.service.profile.TbAssetProfileCache; import org.thingsboard.server.service.profile.TbDeviceProfileCache; @@ -62,13 +61,13 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @Slf4j +@RequiredArgsConstructor public abstract class AbstractConsumerService extends TbApplicationEventListener { protected volatile ExecutorService notificationsConsumerExecutor; protected volatile boolean stopped = false; protected volatile boolean isReady = false; protected final ActorSystemContext actorContext; - protected final DataDecodingEncodingService encodingService; protected final TbTenantProfileCache tenantProfileCache; protected final TbDeviceProfileCache deviceProfileCache; protected final TbAssetProfileCache assetProfileCache; @@ -79,24 +78,6 @@ public abstract class AbstractConsumerService> nfConsumer; protected final Optional jwtSettingsService; - - public AbstractConsumerService(ActorSystemContext actorContext, DataDecodingEncodingService encodingService, - TbTenantProfileCache tenantProfileCache, TbDeviceProfileCache deviceProfileCache, - TbAssetProfileCache assetProfileCache, TbApiUsageStateService apiUsageStateService, - PartitionService partitionService, ApplicationEventPublisher eventPublisher, - TbQueueConsumer> nfConsumer, Optional jwtSettingsService) { - this.actorContext = actorContext; - this.encodingService = encodingService; - this.tenantProfileCache = tenantProfileCache; - this.deviceProfileCache = deviceProfileCache; - this.assetProfileCache = assetProfileCache; - this.apiUsageStateService = apiUsageStateService; - this.partitionService = partitionService; - this.eventPublisher = eventPublisher; - this.nfConsumer = nfConsumer; - this.jwtSettingsService = jwtSettingsService; - } - public void init(String nfConsumerThreadName) { this.notificationsConsumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName(nfConsumerThreadName)); } @@ -166,12 +147,6 @@ public abstract class AbstractConsumerService actorMsgOpt = encodingService.decode(nfMsg.toByteArray()); - actorMsgOpt.ifPresent(tbActorMsg -> handleComponentLifecycleMsg(id, tbActorMsg)); - } - protected void handleComponentLifecycleMsg(UUID id, TbActorMsg actorMsg) { if (actorMsg instanceof ComponentLifecycleMsg) { ComponentLifecycleMsg componentLifecycleMsg = (ComponentLifecycleMsg) actorMsg; diff --git a/application/src/main/java/org/thingsboard/server/service/sync/vc/DefaultGitVersionControlQueueService.java b/application/src/main/java/org/thingsboard/server/service/sync/vc/DefaultGitVersionControlQueueService.java index 0cc73cd945..c426d39ab5 100644 --- a/application/src/main/java/org/thingsboard/server/service/sync/vc/DefaultGitVersionControlQueueService.java +++ b/application/src/main/java/org/thingsboard/server/service/sync/vc/DefaultGitVersionControlQueueService.java @@ -20,7 +20,6 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; -import com.google.protobuf.ByteString; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; @@ -46,6 +45,7 @@ import org.thingsboard.server.common.data.sync.vc.VersionCreationResult; import org.thingsboard.server.common.data.sync.vc.VersionedEntityInfo; import org.thingsboard.server.common.data.sync.vc.request.create.VersionCreateRequest; import org.thingsboard.server.common.data.util.CollectionsUtil; +import org.thingsboard.server.common.util.ProtoUtils; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.CommitRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.EntitiesContentRequestMsg; @@ -60,7 +60,6 @@ import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.TbQueueMsgMetadata; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.scheduler.SchedulerComponent; -import org.thingsboard.server.queue.util.DataDecodingEncodingService; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.sync.vc.data.ClearRepositoryGitRequest; import org.thingsboard.server.service.sync.vc.data.CommitGitRequest; @@ -96,7 +95,6 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu private final TbServiceInfoProvider serviceInfoProvider; private final TbClusterService clusterService; - private final DataDecodingEncodingService encodingService; private final DefaultEntitiesVersionControlService entitiesVersionControlService; private final SchedulerComponent scheduler; @@ -109,12 +107,10 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu private int msgChunkSize; public DefaultGitVersionControlQueueService(TbServiceInfoProvider serviceInfoProvider, TbClusterService clusterService, - DataDecodingEncodingService encodingService, @Lazy DefaultEntitiesVersionControlService entitiesVersionControlService, SchedulerComponent scheduler) { this.serviceInfoProvider = serviceInfoProvider; this.clusterService = clusterService; - this.encodingService = encodingService; this.entitiesVersionControlService = entitiesVersionControlService; this.scheduler = scheduler; } @@ -588,7 +584,7 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu vcSettings = entitiesVersionControlService.getVersionControlSettings(tenantId); } if (vcSettings != null) { - builder.setVcSettings(ByteString.copyFrom(encodingService.encode(vcSettings))); + builder.setVcSettings(ProtoUtils.toProto(vcSettings)); } else if (request.requiresSettings()) { throw new RuntimeException("No entity version control settings provisioned!"); } diff --git a/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java index 76e8bf3abe..edaeb67c7e 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java @@ -99,7 +99,6 @@ import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceLwM2MC import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; -import org.thingsboard.server.queue.util.DataDecodingEncodingService; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.apiusage.TbApiUsageStateService; import org.thingsboard.server.service.executors.DbCallbackExecutorService; @@ -139,7 +138,6 @@ public class DefaultTransportApiService implements TransportApiService { private final DeviceCredentialsService deviceCredentialsService; private final DbCallbackExecutorService dbCallbackExecutorService; private final TbClusterService tbClusterService; - private final DataDecodingEncodingService dataDecodingEncodingService; private final DeviceProvisionService deviceProvisionService; private final ResourceService resourceService; private final OtaPackageService otaPackageService; @@ -388,7 +386,7 @@ public class DefaultTransportApiService implements TransportApiService { .setDeviceInfo(ProtoUtils.toDeviceInfoProto(device)); DeviceProfile deviceProfile = deviceProfileCache.get(device.getTenantId(), device.getDeviceProfileId()); if (deviceProfile != null) { - builder.setProfileBody(ByteString.copyFrom(dataDecodingEncodingService.encode(deviceProfile))); + builder.setDeviceProfile(ProtoUtils.toProto(deviceProfile)); } else { log.warn("[{}] Failed to find device profile [{}] for device. ", device.getId(), device.getDeviceProfileId()); } @@ -465,13 +463,13 @@ public class DefaultTransportApiService implements TransportApiService { if (entityType.equals(EntityType.DEVICE_PROFILE)) { DeviceProfileId deviceProfileId = new DeviceProfileId(entityUuid); DeviceProfile deviceProfile = deviceProfileCache.find(deviceProfileId); - builder.setData(ByteString.copyFrom(dataDecodingEncodingService.encode(deviceProfile))); + builder.setDeviceProfile(ProtoUtils.toProto(deviceProfile)); } else if (entityType.equals(EntityType.TENANT)) { TenantId tenantId = TenantId.fromUUID(entityUuid); TenantProfile tenantProfile = tenantProfileCache.get(tenantId); ApiUsageState state = apiUsageStateService.getApiUsageState(tenantId); - builder.setData(ByteString.copyFrom(dataDecodingEncodingService.encode(tenantProfile))); - builder.setApiState(ByteString.copyFrom(dataDecodingEncodingService.encode(state))); + builder.setTenantProfile(ProtoUtils.toProto(tenantProfile)); + builder.setApiState(ProtoUtils.toProto(state)); } else { throw new RuntimeException("Invalid entity profile request: " + entityType); } @@ -490,7 +488,7 @@ public class DefaultTransportApiService implements TransportApiService { .setDeviceProfileIdMSB(deviceProfileId.getMostSignificantBits()) .setDeviceProfileIdLSB(deviceProfileId.getLeastSignificantBits()) .setDeviceTransportConfiguration(ByteString.copyFrom( - dataDecodingEncodingService.encode(device.getDeviceData().getTransportConfiguration()) + JacksonUtil.writeValueAsBytes(device.getDeviceData().getTransportConfiguration()) ))) .build(); } else { @@ -506,11 +504,10 @@ public class DefaultTransportApiService implements TransportApiService { return Futures.immediateFuture(TransportApiResponseMsg.newBuilder() .setDeviceCredentialsResponseMsg(TransportProtos.GetDeviceCredentialsResponseMsg.newBuilder() - .setDeviceCredentialsData(ByteString.copyFrom(dataDecodingEncodingService.encode(deviceCredentials)))) + .setDeviceCredentialsData(ProtoUtils.toProto(deviceCredentials))) .build()); } - private ListenableFuture handle(GetResourceRequestMsg requestMsg) { TenantId tenantId = TenantId.fromUUID(new UUID(requestMsg.getTenantIdMSB(), requestMsg.getTenantIdLSB())); ResourceType resourceType = ResourceType.valueOf(requestMsg.getResourceType()); @@ -523,7 +520,7 @@ public class DefaultTransportApiService implements TransportApiService { } if (resource != null) { - builder.setResource(ByteString.copyFrom(dataDecodingEncodingService.encode(resource))); + builder.setResource(ProtoUtils.toProto(resource)); } return Futures.immediateFuture(TransportApiResponseMsg.newBuilder().setResourceResponseMsg(builder).build()); @@ -556,7 +553,7 @@ public class DefaultTransportApiService implements TransportApiService { builder.setDeviceInfo(ProtoUtils.toDeviceInfoProto(device)); DeviceProfile deviceProfile = deviceProfileCache.get(device.getTenantId(), device.getDeviceProfileId()); if (deviceProfile != null) { - builder.setProfileBody(ByteString.copyFrom(dataDecodingEncodingService.encode(deviceProfile))); + builder.setDeviceProfile(ProtoUtils.toProto(deviceProfile)); } else { log.warn("[{}] Failed to find device profile [{}] for device. ", device.getId(), device.getDeviceProfileId()); } diff --git a/application/src/test/java/org/thingsboard/server/edge/AbstractEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/AbstractEdgeTest.java index 7dd2547597..8e1ec146c9 100644 --- a/application/src/test/java/org/thingsboard/server/edge/AbstractEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/AbstractEdgeTest.java @@ -95,7 +95,6 @@ import org.thingsboard.server.gen.edge.v1.TenantUpdateMsg; import org.thingsboard.server.gen.edge.v1.UpdateMsgType; import org.thingsboard.server.gen.edge.v1.UplinkMsg; import org.thingsboard.server.gen.edge.v1.UserUpdateMsg; -import org.thingsboard.server.queue.util.DataDecodingEncodingService; import java.util.ArrayList; import java.util.List; @@ -124,9 +123,6 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest { @Autowired protected EdgeEventService edgeEventService; - @Autowired - protected DataDecodingEncodingService dataDecodingEncodingService; - @Autowired protected TbClusterService clusterService; @@ -165,7 +161,8 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest { private RuleChainId getEdgeRootRuleChainId() throws Exception { List edgeRuleChains = doGetTypedWithPageLink("/api/edge/" + edge.getUuidId() + "/ruleChains?", - new TypeReference>() {}, new PageLink(100)).getData(); + new TypeReference>() { + }, new PageLink(100)).getData(); for (RuleChain edgeRuleChain : edgeRuleChains) { if (edgeRuleChain.isRoot()) { return edgeRuleChain.getId(); @@ -182,7 +179,8 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest { doDelete("/api/edge/" + edge.getId().toString()) .andExpect(status().isOk()); edgeImitator.disconnect(); - } catch (Exception ignored) {} + } catch (Exception ignored) { + } } private void installation() throws Exception { @@ -379,7 +377,8 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest { Device device = doGet("/api/device/" + deviceUUID, Device.class); Assert.assertNotNull(device); List edgeDevices = doGetTypedWithPageLink("/api/edge/" + edge.getUuidId() + "/devices?", - new TypeReference>() {}, new PageLink(100)).getData(); + new TypeReference>() { + }, new PageLink(100)).getData(); Assert.assertTrue(edgeDevices.stream().map(DeviceInfo::getId).anyMatch(id -> id.equals(device.getId()))); testAutoGeneratedCodeByProtobuf(deviceUpdateMsg); @@ -398,7 +397,8 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest { Asset asset = doGet("/api/asset/" + assetUUID, Asset.class); Assert.assertNotNull(asset); List edgeAssets = doGetTypedWithPageLink("/api/edge/" + edge.getUuidId() + "/assets?", - new TypeReference>() {}, new PageLink(100)).getData(); + new TypeReference>() { + }, new PageLink(100)).getData(); Assert.assertTrue(edgeAssets.contains(asset)); testAutoGeneratedCodeByProtobuf(assetUpdateMsg); @@ -418,7 +418,8 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest { RuleChain ruleChain = doGet("/api/ruleChain/" + ruleChainUUID, RuleChain.class); Assert.assertNotNull(ruleChain); List edgeRuleChains = doGetTypedWithPageLink("/api/edge/" + edge.getUuidId() + "/ruleChains?", - new TypeReference>() {}, new PageLink(100)).getData(); + new TypeReference>() { + }, new PageLink(100)).getData(); Assert.assertTrue(edgeRuleChains.contains(ruleChain)); testAutoGeneratedCodeByProtobuf(ruleChainUpdateMsg); } diff --git a/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessorTest.java b/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessorTest.java index ba0c770632..d44ac535f9 100644 --- a/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessorTest.java +++ b/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/BaseEdgeProcessorTest.java @@ -59,7 +59,6 @@ import org.thingsboard.server.dao.widget.WidgetsBundleService; import org.thingsboard.server.gen.edge.v1.EdgeVersion; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.provider.TbQueueProducerProvider; -import org.thingsboard.server.queue.util.DataDecodingEncodingService; import org.thingsboard.server.service.edge.rpc.constructor.alarm.AlarmMsgConstructorFactory; import org.thingsboard.server.service.edge.rpc.constructor.alarm.AlarmMsgConstructorV1; import org.thingsboard.server.service.edge.rpc.constructor.alarm.AlarmMsgConstructorV2; @@ -477,9 +476,6 @@ public abstract class BaseEdgeProcessorTest { @MockBean protected DbCallbackExecutorService dbCallbackExecutorService; - - @MockBean - protected DataDecodingEncodingService dataDecodingEncodingService; protected EdgeId edgeId; protected TenantId tenantId; diff --git a/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/device/AbstractDeviceProcessorTest.java b/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/device/AbstractDeviceProcessorTest.java index 6f2249b173..491f10c145 100644 --- a/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/device/AbstractDeviceProcessorTest.java +++ b/application/src/test/java/org/thingsboard/server/service/edge/rpc/processor/device/AbstractDeviceProcessorTest.java @@ -74,8 +74,6 @@ public abstract class AbstractDeviceProcessorTest extends BaseEdgeProcessorTest willReturn(device).given(deviceService).findDeviceById(tenantId, deviceId); willReturn(deviceProfile).given(deviceProfileService).findDeviceProfileById(tenantId, deviceProfileId); - willReturn(new byte[]{0x00}).given(dataDecodingEncodingService).encode(deviceProfileData); - } protected void updateDeviceProfileDefaultFields(long expectedDashboardIdMSB, long expectedDashboardIdLSB, diff --git a/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbClusterServiceTest.java b/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbClusterServiceTest.java index 233839bb5f..203aa8306a 100644 --- a/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbClusterServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbClusterServiceTest.java @@ -32,10 +32,9 @@ import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; -import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.provider.TbQueueProducerProvider; -import org.thingsboard.server.queue.util.DataDecodingEncodingService; import org.thingsboard.server.service.gateway_device.GatewayNotificationsService; import org.thingsboard.server.service.profile.TbAssetProfileCache; import org.thingsboard.server.service.profile.TbDeviceProfileCache; @@ -64,8 +63,6 @@ public class DefaultTbClusterServiceTest { public static final String TRANSPORT = "transport"; - @MockBean - protected DataDecodingEncodingService encodingService; @MockBean protected TbDeviceProfileCache deviceProfileCache; @MockBean diff --git a/application/src/test/java/org/thingsboard/server/service/transport/DefaultTransportApiServiceTest.java b/application/src/test/java/org/thingsboard/server/service/transport/DefaultTransportApiServiceTest.java index 43d8960f3b..5de5bac8e3 100644 --- a/application/src/test/java/org/thingsboard/server/service/transport/DefaultTransportApiServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/transport/DefaultTransportApiServiceTest.java @@ -47,7 +47,6 @@ import org.thingsboard.server.dao.relation.RelationService; import org.thingsboard.server.dao.resource.ResourceService; import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.gen.transport.TransportProtos; -import org.thingsboard.server.queue.util.DataDecodingEncodingService; import org.thingsboard.server.service.apiusage.TbApiUsageStateService; import org.thingsboard.server.service.executors.DbCallbackExecutorService; import org.thingsboard.server.service.profile.TbDeviceProfileCache; @@ -92,8 +91,6 @@ public class DefaultTransportApiServiceTest { @MockBean protected TbClusterService tbClusterService; @MockBean - protected DataDecodingEncodingService dataDecodingEncodingService; - @MockBean protected DeviceProvisionService deviceProvisionService; @MockBean protected ResourceService resourceService; diff --git a/common/cache/src/main/java/org/thingsboard/server/cache/device/DeviceRedisCache.java b/common/cache/src/main/java/org/thingsboard/server/cache/device/DeviceRedisCache.java index f4ba0a9433..3ebb53afee 100644 --- a/common/cache/src/main/java/org/thingsboard/server/cache/device/DeviceRedisCache.java +++ b/common/cache/src/main/java/org/thingsboard/server/cache/device/DeviceRedisCache.java @@ -38,13 +38,13 @@ public class DeviceRedisCache extends RedisTbTransactionalCache TransportProtos.EntityUpdateMsg toEntityUpdateProto(T entity) { + var builder = TransportProtos.EntityUpdateMsg.newBuilder(); + if (entity instanceof Device) { + builder.setDevice(toProto((Device) entity)); + } else if (entity instanceof DeviceProfile) { + builder.setDeviceProfile(toProto((DeviceProfile) entity)); + } else if (entity instanceof Tenant) { + builder.setTenant(toProto((Tenant) entity)); + } else if (entity instanceof TenantProfile) { + builder.setTenantProfile(toProto((TenantProfile) entity)); + } else if (entity instanceof ApiUsageState) { + builder.setApiUsageState(toProto((ApiUsageState) entity)); + } else { + log.warn("[{}] entity does not support toProto serialization .", entity.getClass().getSimpleName()); + } + return builder.build(); + } + public static TransportProtos.DeviceInfoProto toDeviceInfoProto(Device device) throws JsonProcessingException { TransportProtos.DeviceInfoProto.Builder builder = TransportProtos.DeviceInfoProto.newBuilder() .setTenantIdMSB(device.getTenantId().getId().getMostSignificantBits()) @@ -657,36 +1040,29 @@ public class ProtoUtils { return builder.build(); } + private static boolean isNotNull(Object obj) { + return obj != null; + } + private static T getEntityId(long msb, long lsb, Function entityId) { - if (msb != 0 || lsb != 0) { - return entityId.apply(new UUID(msb, lsb)); - } - return null; - } - - private static String toProtoString(String str) { - return str != null ? str : ""; - } - - private static String fromProtoString(String str) { - return StringUtils.isNotEmpty(str) ? str : null; + return entityId.apply(new UUID(msb, lsb)); } private static Long getMsb(EntityId entityId) { - if (entityId != null) { + if (isNotNull(entityId)) { return entityId.getId().getMostSignificantBits(); } return 0L; } private static Long getLsb(EntityId entityId) { - if (entityId != null) { + if (isNotNull(entityId)) { return entityId.getId().getLeastSignificantBits(); } return 0L; } private static Long checkLong(Long l) { - return l != null ? l : 0; + return isNotNull(l) ? l : 0; } } diff --git a/common/proto/src/main/proto/queue.proto b/common/proto/src/main/proto/queue.proto index d8f5517e38..1d4c7c7f47 100644 --- a/common/proto/src/main/proto/queue.proto +++ b/common/proto/src/main/proto/queue.proto @@ -190,20 +190,20 @@ message DeviceProto { int64 deviceIdLSB = 4; int64 createdTime = 5; string deviceName = 6; - string deviceLabel = 7; + optional string deviceLabel = 7; string deviceType = 8; - string additionalInfo = 9; + optional string additionalInfo = 9; int64 deviceProfileIdMSB = 10; int64 deviceProfileIdLSB = 11; - int64 customerIdMSB = 12; - int64 customerIdLSB = 13; - bytes deviceData = 14; - int64 firmwareIdMSB = 15; - int64 firmwareIdLSB = 16; - int64 softwareIdMSB = 17; - int64 softwareIdLSB = 18; - int64 externalIdMSB = 19; - int64 externalIdLSB = 20; + optional int64 customerIdMSB = 12; + optional int64 customerIdLSB = 13; + optional bytes deviceData = 14; + optional int64 firmwareIdMSB = 15; + optional int64 firmwareIdLSB = 16; + optional int64 softwareIdMSB = 17; + optional int64 softwareIdLSB = 18; + optional int64 externalIdMSB = 19; + optional int64 externalIdLSB = 20; } message DeviceProfileProto { @@ -213,27 +213,108 @@ message DeviceProfileProto { int64 deviceProfileIdLSB = 4; int64 createdTime = 5; string name = 6; - string description = 7; - string image = 8; + optional string description = 7; + optional string image = 8; bool isDefault = 9; string type = 10; string transportType = 11; string provisionType = 12; - int64 defaultRuleChainIdMSB = 13; - int64 defaultRuleChainIdLSB = 14; - int64 defaultDashboardIdMSB = 15; - int64 defaultDashboardIdLSB = 16; - string defaultQueueName = 17; - bytes deviceProfileData = 18; - string provisionDeviceKey = 19; - int64 firmwareIdMSB = 20; - int64 firmwareIdLSB = 21; - int64 softwareIdMSB = 22; - int64 softwareIdLSB = 23; - int64 defaultEdgeRuleChainIdMSB = 24; - int64 defaultEdgeRuleChainIdLSB = 25; - int64 externalIdMSB = 26; - int64 externalIdLSB = 27; + optional int64 defaultRuleChainIdMSB = 13; + optional int64 defaultRuleChainIdLSB = 14; + optional int64 defaultDashboardIdMSB = 15; + optional int64 defaultDashboardIdLSB = 16; + optional string defaultQueueName = 17; + optional bytes deviceProfileData = 18; + optional string provisionDeviceKey = 19; + optional int64 firmwareIdMSB = 20; + optional int64 firmwareIdLSB = 21; + optional int64 softwareIdMSB = 22; + optional int64 softwareIdLSB = 23; + optional int64 defaultEdgeRuleChainIdMSB = 24; + optional int64 defaultEdgeRuleChainIdLSB = 25; + optional int64 externalIdMSB = 26; + optional int64 externalIdLSB = 27; +} + +message TenantProto { + int64 tenantIdMSB = 1; + int64 tenantIdLSB = 2; + int64 createdTime = 3; + string title = 4; + optional string region = 5; + int64 tenantProfileIdMSB = 6; + int64 tenantProfileIdLSB = 7; + optional string country = 8; + optional string state = 9; + optional string city = 10; + optional string address = 11; + optional string address2 = 12; + optional string zip = 13; + optional string phone = 14; + optional string email = 15; + optional string additionalInfo = 16; +} + +message TenantProfileProto { + int64 tenantProfileIdMSB = 1; + int64 tenantProfileIdLSB = 2; + int64 createdTime = 3; + string name = 4; + optional string description = 5; + bool isDefault = 6; + bool isolatedTbRuleEngine = 7; + optional bytes profileData = 8; +} + +message TbResourceProto { + int64 tenantIdMSB = 1; + int64 tenantIdLSB = 2; + int64 resourceIdMSB = 3; + int64 resourceIdLSB = 4; + int64 createdTime = 5; + string title = 6; + string resourceType = 7; + string resourceKey = 8; + string searchText = 9; + optional string etag = 10; + string fileName = 11; + optional string resourceDescriptor = 12; + optional int64 externalIdMSB = 13; + optional int64 externalIdLSB = 14; + optional bytes data = 15; + optional bytes preview = 16; +} + +message ApiUsageStateProto { + int64 tenantProfileIdMSB = 1; + int64 tenantProfileIdLSB = 2; + int64 apiUsageStateIdMSB = 3; + int64 apiUsageStateIdLSB = 4; + int64 createdTime = 5; + EntityTypeProto entityType = 6; + int64 entityIdMSB = 7; + int64 entityIdLSB = 8; + string transportState = 9; + string dbStorageState = 10; + string reExecState = 11; + string jsExecState = 12; + string tbelExecState = 13; + string emailExecState = 14; + string smsExecState = 15; + string alarmExecState = 16; +} + +message RepositorySettingsProto { + string repositoryUri = 1; + string authMethod = 2; + optional string username = 3; + optional string password = 4; + optional string privateKeyFileName = 5; + optional string privateKey = 6; + optional string privateKeyPassword = 7; + optional string defaultBranch = 8; + bool readOnly = 9; + bool showMergeCommits = 10; } /** @@ -295,7 +376,7 @@ message ValidateBasicMqttCredRequestMsg { message ValidateDeviceCredentialsResponseMsg { DeviceInfoProto deviceInfo = 1; string credentialsBody = 2; - bytes profileBody = 3; + DeviceProfileProto deviceProfile = 3; } message GetOrCreateDeviceFromGatewayRequestMsg { @@ -307,7 +388,7 @@ message GetOrCreateDeviceFromGatewayRequestMsg { message GetOrCreateDeviceFromGatewayResponseMsg { DeviceInfoProto deviceInfo = 1; - bytes profileBody = 2; + DeviceProfileProto deviceProfile = 2; TransportApiRequestErrorCode error = 3; } @@ -390,7 +471,7 @@ message GetResourceRequestMsg { } message GetResourceResponseMsg { - bytes resource = 1; + TbResourceProto resource = 1; } message ValidateDeviceLwM2MCredentialsRequestMsg { @@ -418,8 +499,11 @@ message GetDeviceProfileRequestMsg { message GetEntityProfileResponseMsg { string entityType = 1; - bytes data = 2; - bytes apiState = 3; + oneof data { + TenantProfileProto tenantProfile = 2; + DeviceProfileProto deviceProfile = 3; + } + ApiUsageStateProto apiState = 4; } message GetDeviceRequestMsg { @@ -430,6 +514,7 @@ message GetDeviceRequestMsg { message GetDeviceResponseMsg { int64 deviceProfileIdMSB = 1; int64 deviceProfileIdLSB = 2; + //Json bytes deviceTransportConfiguration = 3; } @@ -439,7 +524,7 @@ message GetDeviceCredentialsRequestMsg { } message GetDeviceCredentialsResponseMsg { - bytes deviceCredentialsData = 1; + DeviceCredentialsProto deviceCredentialsData = 1; } message GetSnmpDevicesRequestMsg { @@ -453,8 +538,13 @@ message GetSnmpDevicesResponseMsg { } message EntityUpdateMsg { - string entityType = 1; - bytes data = 2; + oneof entityUpdate { + TenantProto tenant = 1; + TenantProfileProto tenantProfile = 2; + DeviceProto device = 3; + DeviceProfileProto deviceProfile = 4; + ApiUsageStateProto apiUsageState = 5; + } } message EntityDeleteMsg { @@ -1242,7 +1332,7 @@ message ToVersionControlServiceMsg { int64 tenantIdLSB = 3; int64 requestIdMSB = 4; int64 requestIdLSB = 5; - bytes vcSettings = 6; + RepositorySettingsProto vcSettings = 6; GenericRepositoryRequestMsg initRepositoryRequest = 7; GenericRepositoryRequestMsg testRepositoryRequest = 8; GenericRepositoryRequestMsg clearRepositoryRequest = 9; @@ -1326,21 +1416,17 @@ message ToCoreMsg { message ToCoreNotificationMsg { LocalSubscriptionServiceMsgProto toLocalSubscriptionServiceMsg = 1; FromDeviceRPCResponseProto fromDeviceRpcResponse = 2; - bytes componentLifecycleMsg = 3 [deprecated = true]; - bytes edgeEventUpdateMsg = 4 [deprecated = true]; - QueueUpdateMsg queueUpdateMsg = 5; - QueueDeleteMsg queueDeleteMsg = 6; - VersionControlResponseMsg vcResponseMsg = 7; - bytes toEdgeSyncRequestMsg = 8 [deprecated = true]; - bytes fromEdgeSyncResponseMsg = 9 [deprecated = true]; - SubscriptionMgrMsgProto toSubscriptionMgrMsg = 10; - NotificationRuleProcessorMsg notificationRuleProcessorMsg = 11; - ComponentLifecycleMsgProto componentLifecycle = 12; - CoreStartupMsg coreStartupMsg = 13; - EdgeEventUpdateMsgProto edgeEventUpdate = 14; - ToEdgeSyncRequestMsgProto toEdgeSyncRequest = 15; - FromEdgeSyncResponseMsgProto fromEdgeSyncResponse = 16; - ResourceCacheInvalidateMsg resourceCacheInvalidateMsg = 17; + QueueUpdateMsg queueUpdateMsg = 3; + QueueDeleteMsg queueDeleteMsg = 4; + VersionControlResponseMsg vcResponseMsg = 5; + SubscriptionMgrMsgProto toSubscriptionMgrMsg = 6; + NotificationRuleProcessorMsg notificationRuleProcessorMsg = 7; + ComponentLifecycleMsgProto componentLifecycle = 8; + CoreStartupMsg coreStartupMsg = 9; + EdgeEventUpdateMsgProto edgeEventUpdate = 10; + ToEdgeSyncRequestMsgProto toEdgeSyncRequest = 11; + FromEdgeSyncResponseMsgProto fromEdgeSyncResponse = 12; + ResourceCacheInvalidateMsg resourceCacheInvalidateMsg = 13; } /* Messages that are handled by ThingsBoard RuleEngine Service */ @@ -1353,7 +1439,6 @@ message ToRuleEngineMsg { } message ToRuleEngineNotificationMsg { - bytes componentLifecycleMsg = 1 [deprecated = true]; FromDeviceRPCResponseProto fromDeviceRpcResponse = 2; QueueUpdateMsg queueUpdateMsg = 3; QueueDeleteMsg queueDeleteMsg = 4; @@ -1416,6 +1501,7 @@ message NotificationSchedulerServiceMsg { } message NotificationRuleProcessorMsg { + //Json bytes trigger = 1; } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/notification/RemoteNotificationRuleProcessor.java b/common/queue/src/main/java/org/thingsboard/server/queue/notification/RemoteNotificationRuleProcessor.java index 8ad5816b77..3208a8cf0d 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/notification/RemoteNotificationRuleProcessor.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/notification/RemoteNotificationRuleProcessor.java @@ -20,16 +20,16 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.stereotype.Service; +import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTrigger; import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.common.TbProtoQueueMsg; -import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.provider.TbQueueProducerProvider; -import org.thingsboard.server.queue.util.DataDecodingEncodingService; import java.util.UUID; @@ -43,7 +43,6 @@ public class RemoteNotificationRuleProcessor implements NotificationRuleProcesso private final TbQueueProducerProvider producerProvider; private final TopicService topicService; private final PartitionService partitionService; - private final DataDecodingEncodingService encodingService; @Override public void process(NotificationRuleTrigger trigger) { @@ -54,7 +53,7 @@ public class RemoteNotificationRuleProcessor implements NotificationRuleProcesso log.debug("Submitting notification rule trigger: {}", trigger); TransportProtos.NotificationRuleProcessorMsg.Builder msg = TransportProtos.NotificationRuleProcessorMsg.newBuilder() - .setTrigger(ByteString.copyFrom(encodingService.encode(trigger))); + .setTrigger(ByteString.copyFrom(JacksonUtil.writeValueAsBytes(trigger))); partitionService.getAllServiceIds(ServiceType.TB_CORE).stream().findAny().ifPresent(serviceId -> { TopicPartitionInfo tpi = topicService.getNotificationsTopic(ServiceType.TB_CORE, serviceId); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/util/DataDecodingEncodingService.java b/common/queue/src/main/java/org/thingsboard/server/queue/util/DataDecodingEncodingService.java deleted file mode 100644 index 2ac96142d9..0000000000 --- a/common/queue/src/main/java/org/thingsboard/server/queue/util/DataDecodingEncodingService.java +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Copyright © 2016-2023 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.queue.util; - -import java.util.Optional; - -public interface DataDecodingEncodingService { - - Optional decode(byte[] byteArray); - - byte[] encode(T msq); - -} - diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/util/ProtoWithFSTService.java b/common/queue/src/main/java/org/thingsboard/server/queue/util/ProtoWithFSTService.java deleted file mode 100644 index 204502068c..0000000000 --- a/common/queue/src/main/java/org/thingsboard/server/queue/util/ProtoWithFSTService.java +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Copyright © 2016-2023 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.queue.util; - -import lombok.extern.slf4j.Slf4j; -import org.nustaq.serialization.FSTConfiguration; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; -import org.thingsboard.server.common.data.FSTUtils; -import org.thingsboard.server.common.data.FstStatsService; - -import java.util.Optional; - -@Slf4j -@Service -public class ProtoWithFSTService implements DataDecodingEncodingService { - - @Autowired - private FstStatsService fstStatsService; - - public static final FSTConfiguration CONFIG = FSTConfiguration.createDefaultConfiguration(); - - @Override - public Optional decode(byte[] byteArray) { - try { - long startTime = System.nanoTime(); - Optional optional = Optional.ofNullable(FSTUtils.decode(byteArray)); - optional.ifPresent(obj -> { - fstStatsService.recordDecodeTime(obj.getClass(), startTime); - fstStatsService.incrementDecode(obj.getClass()); - }); - return optional; - } catch (IllegalArgumentException e) { - log.error("Error during deserialization message, [{}]", e.getMessage()); - return Optional.empty(); - } - } - - - @Override - public byte[] encode(T msq) { - long startTime = System.nanoTime(); - var bytes = FSTUtils.encode(msq); - fstStatsService.recordEncodeTime(msq.getClass(), startTime); - fstStatsService.incrementEncode(msq.getClass()); - return bytes; - } - - -} diff --git a/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/ProtoTransportEntityService.java b/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/ProtoTransportEntityService.java index 22cfaab8c7..0fdd480152 100644 --- a/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/ProtoTransportEntityService.java +++ b/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/ProtoTransportEntityService.java @@ -17,6 +17,7 @@ package org.thingsboard.server.transport.snmp.service; import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Service; +import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.device.data.DeviceData; import org.thingsboard.server.common.data.device.data.DeviceTransportConfiguration; @@ -24,8 +25,8 @@ import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.security.DeviceCredentials; import org.thingsboard.server.common.transport.TransportService; +import org.thingsboard.server.common.util.ProtoUtils; import org.thingsboard.server.gen.transport.TransportProtos; -import org.thingsboard.server.queue.util.DataDecodingEncodingService; import org.thingsboard.server.queue.util.TbSnmpTransportComponent; import java.util.UUID; @@ -35,7 +36,6 @@ import java.util.UUID; @RequiredArgsConstructor public class ProtoTransportEntityService { private final TransportService transportService; - private final DataDecodingEncodingService dataDecodingEncodingService; public Device getDeviceById(DeviceId id) { TransportProtos.GetDeviceResponseMsg deviceProto = transportService.getDevice(TransportProtos.GetDeviceRequestMsg.newBuilder() @@ -55,9 +55,8 @@ public class ProtoTransportEntityService { device.setId(id); device.setDeviceProfileId(deviceProfileId); - DeviceTransportConfiguration deviceTransportConfiguration = (DeviceTransportConfiguration) dataDecodingEncodingService.decode( - deviceProto.getDeviceTransportConfiguration().toByteArray() - ).orElseThrow(() -> new IllegalStateException("Can't find device transport configuration")); + DeviceTransportConfiguration deviceTransportConfiguration = JacksonUtil.fromBytes( + deviceProto.getDeviceTransportConfiguration().toByteArray(), DeviceTransportConfiguration.class); DeviceData deviceData = new DeviceData(); deviceData.setTransportConfiguration(deviceTransportConfiguration); @@ -74,8 +73,11 @@ public class ProtoTransportEntityService { .build() ); - return (DeviceCredentials) dataDecodingEncodingService.decode(deviceCredentialsResponse.getDeviceCredentialsData().toByteArray()) - .orElseThrow(() -> new IllegalArgumentException("Device credentials not found")); + if (deviceCredentialsResponse.hasDeviceCredentialsData()) { + return ProtoUtils.fromProto(deviceCredentialsResponse.getDeviceCredentialsData()); + } else { + throw new IllegalArgumentException("Device credentials not found"); + } } public TransportProtos.GetSnmpDevicesResponseMsg getSnmpDevicesIds(int page, int pageSize) { diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportDeviceProfileCache.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportDeviceProfileCache.java index dcef95c778..0d518f6d01 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportDeviceProfileCache.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportDeviceProfileCache.java @@ -15,19 +15,19 @@ */ package org.thingsboard.server.common.transport; -import com.google.protobuf.ByteString; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.id.DeviceProfileId; +import org.thingsboard.server.gen.transport.TransportProtos; public interface TransportDeviceProfileCache { - DeviceProfile getOrCreate(DeviceProfileId id, ByteString profileBody); + DeviceProfile getOrCreate(DeviceProfileId id, TransportProtos.DeviceProfileProto proto); DeviceProfile get(DeviceProfileId id); void put(DeviceProfile profile); - DeviceProfile put(ByteString profileBody); + DeviceProfile put(TransportProtos.DeviceProfileProto proto); void evict(DeviceProfileId id); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportTenantProfileCache.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportTenantProfileCache.java index 97fcd925ce..052e020108 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportTenantProfileCache.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportTenantProfileCache.java @@ -15,11 +15,11 @@ */ package org.thingsboard.server.common.transport; -import com.google.protobuf.ByteString; import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantProfileId; import org.thingsboard.server.common.transport.profile.TenantProfileUpdateResult; +import org.thingsboard.server.gen.transport.TransportProtos; import java.util.Set; @@ -27,7 +27,7 @@ public interface TransportTenantProfileCache { TenantProfile get(TenantId tenantId); - TenantProfileUpdateResult put(ByteString profileBody); + TenantProfileUpdateResult put(TransportProtos.TenantProfileProto proto); boolean put(TenantId tenantId, TenantProfileId profileId); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportDeviceProfileCache.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportDeviceProfileCache.java index 15ac6ba469..2f53c59bd0 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportDeviceProfileCache.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportDeviceProfileCache.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.common.transport.service; -import com.google.protobuf.ByteString; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; @@ -25,11 +24,10 @@ import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.transport.TransportDeviceProfileCache; import org.thingsboard.server.common.transport.TransportService; +import org.thingsboard.server.common.util.ProtoUtils; import org.thingsboard.server.gen.transport.TransportProtos; -import org.thingsboard.server.queue.util.DataDecodingEncodingService; import org.thingsboard.server.queue.util.TbTransportComponent; -import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.locks.Lock; @@ -42,7 +40,6 @@ public class DefaultTransportDeviceProfileCache implements TransportDeviceProfil private final Lock deviceProfileFetchLock = new ReentrantLock(); private final ConcurrentMap deviceProfiles = new ConcurrentHashMap<>(); - private final DataDecodingEncodingService dataDecodingEncodingService; private TransportService transportService; @@ -52,19 +49,12 @@ public class DefaultTransportDeviceProfileCache implements TransportDeviceProfil this.transportService = transportService; } - public DefaultTransportDeviceProfileCache(DataDecodingEncodingService dataDecodingEncodingService) { - this.dataDecodingEncodingService = dataDecodingEncodingService; - } - @Override - public DeviceProfile getOrCreate(DeviceProfileId id, ByteString profileBody) { + public DeviceProfile getOrCreate(DeviceProfileId id, TransportProtos.DeviceProfileProto proto) { DeviceProfile profile = deviceProfiles.get(id); if (profile == null) { - Optional deviceProfile = dataDecodingEncodingService.decode(profileBody.toByteArray()); - if (deviceProfile.isPresent()) { - profile = deviceProfile.get(); - deviceProfiles.put(id, profile); - } + profile = ProtoUtils.fromProto(proto); + deviceProfiles.put(id, profile); } return profile; } @@ -80,14 +70,10 @@ public class DefaultTransportDeviceProfileCache implements TransportDeviceProfil } @Override - public DeviceProfile put(ByteString profileBody) { - Optional deviceProfile = dataDecodingEncodingService.decode(profileBody.toByteArray()); - if (deviceProfile.isPresent()) { - put(deviceProfile.get()); - return deviceProfile.get(); - } else { - return null; - } + public DeviceProfile put(TransportProtos.DeviceProfileProto proto) { + DeviceProfile deviceProfile = ProtoUtils.fromProto(proto); + put(deviceProfile); + return deviceProfile; } @Override @@ -107,14 +93,8 @@ public class DefaultTransportDeviceProfileCache implements TransportDeviceProfil .setEntityIdLSB(id.getId().getLeastSignificantBits()) .build(); TransportProtos.GetEntityProfileResponseMsg entityProfileMsg = transportService.getEntityProfile(msg); - Optional profileOpt = dataDecodingEncodingService.decode(entityProfileMsg.getData().toByteArray()); - if (profileOpt.isPresent()) { - profile = profileOpt.get(); - this.put(profile); - } else { - log.warn("[{}] Can't find device profile: {}", id, entityProfileMsg.getData()); - throw new RuntimeException("Can't find device profile!"); - } + profile = ProtoUtils.fromProto(entityProfileMsg.getDeviceProfile()); + this.put(profile); } finally { deviceProfileFetchLock.unlock(); } diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportResourceCache.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportResourceCache.java index 2db0cdb9b8..c0e189c755 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportResourceCache.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportResourceCache.java @@ -16,6 +16,7 @@ package org.thingsboard.server.common.transport.service; import lombok.Data; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Component; @@ -24,8 +25,8 @@ import org.thingsboard.server.common.data.TbResource; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.transport.TransportResourceCache; import org.thingsboard.server.common.transport.TransportService; +import org.thingsboard.server.common.util.ProtoUtils; import org.thingsboard.server.gen.transport.TransportProtos; -import org.thingsboard.server.queue.util.DataDecodingEncodingService; import org.thingsboard.server.queue.util.TbTransportComponent; import java.util.Optional; @@ -39,19 +40,15 @@ import java.util.concurrent.locks.ReentrantLock; @Slf4j @Component @TbTransportComponent +@RequiredArgsConstructor public class DefaultTransportResourceCache implements TransportResourceCache { private final Lock resourceFetchLock = new ReentrantLock(); private final ConcurrentMap resources = new ConcurrentHashMap<>(); private final Set keys = ConcurrentHashMap.newKeySet(); - private final DataDecodingEncodingService dataDecodingEncodingService; + @Lazy private final TransportService transportService; - public DefaultTransportResourceCache(DataDecodingEncodingService dataDecodingEncodingService, @Lazy TransportService transportService) { - this.dataDecodingEncodingService = dataDecodingEncodingService; - this.transportService = transportService; - } - @Override public Optional get(TenantId tenantId, ResourceType resourceType, String resourceKey) { ResourceCompositeKey compositeKey = new ResourceCompositeKey(tenantId, resourceType, resourceKey); @@ -92,9 +89,8 @@ public class DefaultTransportResourceCache implements TransportResourceCache { .setResourceKey(compositeKey.resourceKey); TransportProtos.GetResourceResponseMsg responseMsg = transportService.getResource(builder.build()); - Optional optionalResource = dataDecodingEncodingService.decode(responseMsg.getResource().toByteArray()); - if (optionalResource.isPresent()) { - TbResource resource = optionalResource.get(); + if (responseMsg.hasResource()) { + TbResource resource = ProtoUtils.fromProto(responseMsg.getResource()); resources.put(new ResourceCompositeKey(resource.getTenantId(), resource.getResourceType(), resource.getResourceKey()), resource); return resource; } diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index c4840b2402..383c909d13 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -20,7 +20,6 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.gson.Gson; import com.google.gson.JsonObject; -import com.google.protobuf.ByteString; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.exception.ExceptionUtils; import org.springframework.beans.factory.annotation.Autowired; @@ -50,9 +49,9 @@ import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantProfileId; import org.thingsboard.server.common.data.limit.LimitedApi; +import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.notification.rule.trigger.RateLimitsTrigger; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; -import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.rpc.RpcStatus; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; @@ -80,6 +79,7 @@ import org.thingsboard.server.common.transport.limits.EntityLimitKey; import org.thingsboard.server.common.transport.limits.EntityLimitsCache; import org.thingsboard.server.common.transport.limits.TransportRateLimitService; import org.thingsboard.server.common.transport.util.JsonUtils; +import org.thingsboard.server.common.util.ProtoUtils; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.ProvisionDeviceRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.ProvisionDeviceResponseMsg; @@ -96,14 +96,13 @@ import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.TbQueueRequestTemplate; import org.thingsboard.server.queue.common.AsyncCallbackTemplate; import org.thingsboard.server.queue.common.TbProtoQueueMsg; -import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; +import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.provider.TbQueueProducerProvider; import org.thingsboard.server.queue.provider.TbTransportQueueFactory; import org.thingsboard.server.queue.scheduler.SchedulerComponent; import org.thingsboard.server.queue.util.AfterStartUp; -import org.thingsboard.server.queue.util.DataDecodingEncodingService; import org.thingsboard.server.queue.util.TbTransportComponent; import javax.annotation.PostConstruct; @@ -163,7 +162,6 @@ public class DefaultTransportService implements TransportService { @Value("${transport.stats.enabled:false}") private boolean statsEnabled; - @Autowired @Lazy private TbApiUsageReportClient apiUsageClient; @@ -181,7 +179,6 @@ public class DefaultTransportService implements TransportService { private final TransportTenantProfileCache tenantProfileCache; private final TransportRateLimitService rateLimitService; - private final DataDecodingEncodingService dataDecodingEncodingService; private final SchedulerComponent scheduler; private final ApplicationEventPublisher eventPublisher; private final TransportResourceCache transportResourceCache; @@ -216,7 +213,7 @@ public class DefaultTransportService implements TransportService { TransportDeviceProfileCache deviceProfileCache, TransportTenantProfileCache tenantProfileCache, TransportRateLimitService rateLimitService, - DataDecodingEncodingService dataDecodingEncodingService, SchedulerComponent scheduler, TransportResourceCache transportResourceCache, + SchedulerComponent scheduler, TransportResourceCache transportResourceCache, ApplicationEventPublisher eventPublisher, NotificationRuleProcessor notificationRuleProcessor, EntityLimitsCache entityLimitsCache) { this.partitionService = partitionService; @@ -228,7 +225,6 @@ public class DefaultTransportService implements TransportService { this.deviceProfileCache = deviceProfileCache; this.tenantProfileCache = tenantProfileCache; this.rateLimitService = rateLimitService; - this.dataDecodingEncodingService = dataDecodingEncodingService; this.scheduler = scheduler; this.transportResourceCache = transportResourceCache; this.eventPublisher = eventPublisher; @@ -430,9 +426,8 @@ public class DefaultTransportService implements TransportService { result.credentials(msg.getCredentialsBody()); TransportDeviceInfo tdi = getTransportDeviceInfo(msg.getDeviceInfo()); result.deviceInfo(tdi); - ByteString profileBody = msg.getProfileBody(); - if (!profileBody.isEmpty()) { - DeviceProfile profile = deviceProfileCache.getOrCreate(tdi.getDeviceProfileId(), profileBody); + if (msg.hasDeviceProfile()) { + DeviceProfile profile = deviceProfileCache.getOrCreate(tdi.getDeviceProfileId(), msg.getDeviceProfile()); result.deviceProfile(profile); } } @@ -464,9 +459,8 @@ public class DefaultTransportService implements TransportService { result.credentials(msg.getCredentialsBody()); TransportDeviceInfo tdi = getTransportDeviceInfo(msg.getDeviceInfo()); result.deviceInfo(tdi); - ByteString profileBody = msg.getProfileBody(); - if (!profileBody.isEmpty()) { - DeviceProfile profile = deviceProfileCache.getOrCreate(tdi.getDeviceProfileId(), profileBody); + if (msg.hasDeviceProfile()) { + DeviceProfile profile = deviceProfileCache.getOrCreate(tdi.getDeviceProfileId(), msg.getDeviceProfile()); if (transportType != DeviceTransportType.DEFAULT && profile != null && profile.getTransportType() != DeviceTransportType.DEFAULT && profile.getTransportType() != transportType) { log.debug("[{}] Device profile [{}] has different transport type: {}, expected: {}", tdi.getDeviceId(), tdi.getDeviceProfileId(), profile.getTransportType(), transportType); @@ -480,7 +474,6 @@ public class DefaultTransportService implements TransportService { AsyncCallbackTemplate.withCallback(response, callback::onSuccess, callback::onError, transportCallbackExecutor); } - @Override public void process(TenantId tenantId, TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg requestMsg, TransportServiceCallback callback) { TbProtoQueueMsg protoMsg = new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setGetOrCreateDeviceRequestMsg(requestMsg).build()); @@ -495,9 +488,8 @@ public class DefaultTransportService implements TransportService { if (msg.hasDeviceInfo()) { TransportDeviceInfo tdi = getTransportDeviceInfo(msg.getDeviceInfo()); result.deviceInfo(tdi); - ByteString profileBody = msg.getProfileBody(); - if (!profileBody.isEmpty()) { - result.deviceProfile(deviceProfileCache.getOrCreate(tdi.getDeviceProfileId(), profileBody)); + if (msg.hasDeviceProfile()) { + result.deviceProfile(deviceProfileCache.getOrCreate(tdi.getDeviceProfileId(), msg.getDeviceProfile())); } } else if (TransportProtos.TransportApiRequestErrorCode.ENTITY_LIMIT.equals(msg.getError())) { entityLimitsCache.put(key, true); @@ -987,37 +979,7 @@ public class DefaultTransportService implements TransportService { } else { log.trace("Processing broadcast notification: {}", toSessionMsg); if (toSessionMsg.hasEntityUpdateMsg()) { - TransportProtos.EntityUpdateMsg msg = toSessionMsg.getEntityUpdateMsg(); - EntityType entityType = EntityType.valueOf(msg.getEntityType()); - if (EntityType.DEVICE_PROFILE.equals(entityType)) { - DeviceProfile deviceProfile = deviceProfileCache.put(msg.getData()); - if (deviceProfile != null) { - log.debug("On device profile update: {}", deviceProfile); - onProfileUpdate(deviceProfile); - } - } else if (EntityType.TENANT_PROFILE.equals(entityType)) { - rateLimitService.update(tenantProfileCache.put(msg.getData())); - } else if (EntityType.TENANT.equals(entityType)) { - Optional profileOpt = dataDecodingEncodingService.decode(msg.getData().toByteArray()); - if (profileOpt.isPresent()) { - Tenant tenant = profileOpt.get(); - partitionService.removeTenant(tenant.getId()); - boolean updated = tenantProfileCache.put(tenant.getId(), tenant.getTenantProfileId()); - if (updated) { - rateLimitService.update(tenant.getId()); - } - } - } else if (EntityType.API_USAGE_STATE.equals(entityType)) { - Optional stateOpt = dataDecodingEncodingService.decode(msg.getData().toByteArray()); - if (stateOpt.isPresent()) { - ApiUsageState apiUsageState = stateOpt.get(); - rateLimitService.update(apiUsageState.getTenantId(), apiUsageState.isTransportEnabled()); - //TODO: if transport is disabled, we should close all sessions and not to check credentials. - } - } else if (EntityType.DEVICE.equals(entityType)) { - Optional deviceOpt = dataDecodingEncodingService.decode(msg.getData().toByteArray()); - deviceOpt.ifPresent(this::onDeviceUpdate); - } + onEntityUpdate(toSessionMsg.getEntityUpdateMsg()); } else if (toSessionMsg.hasEntityDeleteMsg()) { TransportProtos.EntityDeleteMsg msg = toSessionMsg.getEntityDeleteMsg(); EntityType entityType = EntityType.valueOf(msg.getEntityType()); @@ -1086,6 +1048,37 @@ public class DefaultTransportService implements TransportService { eventPublisher.publishEvent(new DeviceProfileUpdatedEvent(deviceProfile)); } + private void onEntityUpdate(TransportProtos.EntityUpdateMsg msg) { + switch (msg.getEntityUpdateCase()) { + case DEVICEPROFILE: + DeviceProfile deviceProfile = deviceProfileCache.put(msg.getDeviceProfile()); + log.debug("On device profile update: {}", deviceProfile); + onProfileUpdate(deviceProfile); + break; + case TENANTPROFILE: + rateLimitService.update(tenantProfileCache.put(msg.getTenantProfile())); + break; + case TENANT: + Tenant tenant = ProtoUtils.fromProto(msg.getTenant()); + partitionService.removeTenant(tenant.getId()); + boolean updated = tenantProfileCache.put(tenant.getId(), tenant.getTenantProfileId()); + if (updated) { + rateLimitService.update(tenant.getId()); + } + break; + case APIUSAGESTATE: + ApiUsageState apiUsageState = ProtoUtils.fromProto(msg.getApiUsageState()); + rateLimitService.update(apiUsageState.getTenantId(), apiUsageState.isTransportEnabled()); + //TODO: if transport is disabled, we should close all sessions and not to check credentials. + break; + case DEVICE: + onDeviceUpdate(ProtoUtils.fromProto(msg.getDevice())); + break; + default: + log.warn("UNKNOWN entity update type: [{}]", msg.getEntityUpdateCase()); + } + } + private void onDeviceUpdate(Device device) { long deviceIdMSB = device.getId().getId().getMostSignificantBits(); long deviceIdLSB = device.getId().getId().getLeastSignificantBits(); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportTenantProfileCache.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportTenantProfileCache.java index 12f89f29e6..1a8165c860 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportTenantProfileCache.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportTenantProfileCache.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.common.transport.service; -import com.google.protobuf.ByteString; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; @@ -29,12 +28,11 @@ import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.TransportTenantProfileCache; import org.thingsboard.server.common.transport.limits.TransportRateLimitService; import org.thingsboard.server.common.transport.profile.TenantProfileUpdateResult; +import org.thingsboard.server.common.util.ProtoUtils; import org.thingsboard.server.gen.transport.TransportProtos; -import org.thingsboard.server.queue.util.DataDecodingEncodingService; import org.thingsboard.server.queue.util.TbTransportComponent; import java.util.Collections; -import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -50,7 +48,6 @@ public class DefaultTransportTenantProfileCache implements TransportTenantProfil private final ConcurrentMap profiles = new ConcurrentHashMap<>(); private final ConcurrentMap tenantIds = new ConcurrentHashMap<>(); private final ConcurrentMap> tenantProfileIds = new ConcurrentHashMap<>(); - private final DataDecodingEncodingService dataDecodingEncodingService; private TransportRateLimitService rateLimitService; private TransportService transportService; @@ -67,28 +64,18 @@ public class DefaultTransportTenantProfileCache implements TransportTenantProfil this.transportService = transportService; } - public DefaultTransportTenantProfileCache(DataDecodingEncodingService dataDecodingEncodingService) { - this.dataDecodingEncodingService = dataDecodingEncodingService; - } - @Override public TenantProfile get(TenantId tenantId) { return getTenantProfile(tenantId); } @Override - public TenantProfileUpdateResult put(ByteString profileBody) { - Optional profileOpt = dataDecodingEncodingService.decode(profileBody.toByteArray()); - if (profileOpt.isPresent()) { - TenantProfile newProfile = profileOpt.get(); - log.trace("[{}] put: {}", newProfile.getId(), newProfile); - profiles.put(newProfile.getId(), newProfile); - Set affectedTenants = tenantProfileIds.get(newProfile.getId()); - return new TenantProfileUpdateResult(newProfile, affectedTenants != null ? affectedTenants : Collections.emptySet()); - } else { - log.warn("Failed to decode profile: {}", profileBody.toString()); - return new TenantProfileUpdateResult(null, Collections.emptySet()); - } + public TenantProfileUpdateResult put(TransportProtos.TenantProfileProto proto) { + TenantProfile profile = ProtoUtils.fromProto(proto); + log.trace("[{}] put: {}", profile.getId(), profile); + profiles.put(profile.getId(), profile); + Set affectedTenants = tenantProfileIds.get(profile.getId()); + return new TenantProfileUpdateResult(profile, affectedTenants != null ? affectedTenants : Collections.emptySet()); } @Override @@ -135,23 +122,17 @@ public class DefaultTransportTenantProfileCache implements TransportTenantProfil .setEntityIdLSB(tenantId.getId().getLeastSignificantBits()) .build(); TransportProtos.GetEntityProfileResponseMsg entityProfileMsg = transportService.getEntityProfile(msg); - Optional profileOpt = dataDecodingEncodingService.decode(entityProfileMsg.getData().toByteArray()); - if (profileOpt.isPresent()) { - profile = profileOpt.get(); - TenantProfile existingProfile = profiles.get(profile.getId()); - if (existingProfile != null) { - profile = existingProfile; - } else { - profiles.put(profile.getId(), profile); - } - tenantProfileIds.computeIfAbsent(profile.getId(), id -> ConcurrentHashMap.newKeySet()).add(tenantId); - tenantIds.put(tenantId, profile.getId()); + profile = ProtoUtils.fromProto(entityProfileMsg.getTenantProfile()); + TenantProfile existingProfile = profiles.get(profile.getId()); + if (existingProfile != null) { + profile = existingProfile; } else { - log.warn("[{}] Can't decode tenant profile: {}", tenantId, entityProfileMsg.getData()); - throw new RuntimeException("Can't decode tenant profile!"); + profiles.put(profile.getId(), profile); } - Optional apiStateOpt = dataDecodingEncodingService.decode(entityProfileMsg.getApiState().toByteArray()); - apiStateOpt.ifPresent(apiUsageState -> rateLimitService.update(tenantId, apiUsageState.isTransportEnabled())); + tenantProfileIds.computeIfAbsent(profile.getId(), id -> ConcurrentHashMap.newKeySet()).add(tenantId); + tenantIds.put(tenantId, profile.getId()); + ApiUsageState apiUsageState = ProtoUtils.fromProto(entityProfileMsg.getApiState()); + rateLimitService.update(tenantId, apiUsageState.isTransportEnabled()); } } finally { tenantProfileFetchLock.unlock(); diff --git a/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultClusterVersionControlService.java b/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultClusterVersionControlService.java index e98d49b1dc..1a1654af66 100644 --- a/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultClusterVersionControlService.java +++ b/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultClusterVersionControlService.java @@ -36,12 +36,12 @@ import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.page.SortOrder; -import org.thingsboard.server.common.data.sync.vc.RepositorySettings; import org.thingsboard.server.common.data.sync.vc.VersionCreationResult; import org.thingsboard.server.common.data.sync.vc.VersionedEntityInfo; import org.thingsboard.server.common.data.util.CollectionsUtil; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; +import org.thingsboard.server.common.util.ProtoUtils; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.AddMsg; import org.thingsboard.server.gen.transport.TransportProtos.BranchInfoProto; @@ -67,13 +67,12 @@ import org.thingsboard.server.gen.transport.TransportProtos.VersionedEntityInfoP import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; -import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbApplicationEventListener; +import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; import org.thingsboard.server.queue.provider.TbQueueProducerProvider; import org.thingsboard.server.queue.provider.TbVersionControlQueueFactory; -import org.thingsboard.server.queue.util.DataDecodingEncodingService; import org.thingsboard.server.queue.util.TbVersionControlComponent; import javax.annotation.PostConstruct; @@ -108,7 +107,6 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe private final PartitionService partitionService; private final TbQueueProducerProvider producerProvider; private final TbVersionControlQueueFactory queueFactory; - private final DataDecodingEncodingService encodingService; private final GitRepositoryService vcService; private final TopicService topicService; @@ -196,7 +194,7 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe } for (TbProtoQueueMsg msgWrapper : msgs) { ToVersionControlServiceMsg msg = msgWrapper.getValue(); - var ctx = new VersionControlRequestCtx(msg, msg.hasClearRepositoryRequest() ? null : getEntitiesVersionControlSettings(msg)); + var ctx = new VersionControlRequestCtx(msg, msg.hasClearRepositoryRequest() ? null : ProtoUtils.fromProto(msg.getVcSettings())); long startTs = System.currentTimeMillis(); log.trace("[{}][{}] RECEIVED task: {}", ctx.getTenantId(), ctx.getRequestId(), msg); int threadIdx = Math.abs(ctx.getTenantId().hashCode() % ioPoolSize); @@ -542,16 +540,6 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe producer.send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), msg), null); } - private RepositorySettings getEntitiesVersionControlSettings(ToVersionControlServiceMsg msg) { - Optional settingsOpt = encodingService.decode(msg.getVcSettings().toByteArray()); - if (settingsOpt.isPresent()) { - return settingsOpt.get(); - } else { - log.warn("Failed to parse VC settings: {}", msg.getVcSettings()); - throw new RuntimeException("Failed to parse vc settings!"); - } - } - private String getRelativePath(EntityType entityType, String entityId) { String path = entityType.name().toLowerCase(); if (entityId != null) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceProfileRedisCache.java b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceProfileRedisCache.java index a61762c9aa..0a8aa9a7fe 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceProfileRedisCache.java +++ b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceProfileRedisCache.java @@ -37,13 +37,13 @@ public class DeviceProfileRedisCache extends RedisTbTransactionalCache() { @Override public byte[] serialize(DeviceProfile deviceProfile) throws SerializationException { - return ProtoUtils.toDeviceProfileProto(deviceProfile).toByteArray(); + return ProtoUtils.toProto(deviceProfile).toByteArray(); } @Override public DeviceProfile deserialize(DeviceProfileCacheKey key, byte[] bytes) throws SerializationException { try { - return ProtoUtils.fromDeviceProfileProto(TransportProtos.DeviceProfileProto.parseFrom(bytes)); + return ProtoUtils.fromProto(TransportProtos.DeviceProfileProto.parseFrom(bytes)); } catch (InvalidProtocolBufferException e) { throw new SerializationException(e.getMessage()); }