Used ProtoUtils instead of ProtoWIthFSTService for serialization

This commit is contained in:
YevhenBondarenko 2023-12-20 17:33:04 +01:00
parent 5eadc90d26
commit 8ccd70fc65
32 changed files with 725 additions and 537 deletions

View File

@ -29,7 +29,6 @@ import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.service.executors.PubSubRuleNodeExecutorProvider;
import org.thingsboard.rule.engine.api.MailService;
import org.thingsboard.rule.engine.api.NotificationCenter;
import org.thingsboard.rule.engine.api.SmsService;
@ -93,7 +92,6 @@ import org.thingsboard.server.dao.widget.WidgetsBundleService;
import org.thingsboard.server.queue.discovery.DiscoveryService;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
import org.thingsboard.server.service.component.ComponentDiscoveryService;
import org.thingsboard.server.service.edge.rpc.EdgeRpcService;
@ -101,6 +99,7 @@ import org.thingsboard.server.service.entitiy.entityview.TbEntityViewService;
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
import org.thingsboard.server.service.executors.ExternalCallExecutorService;
import org.thingsboard.server.service.executors.NotificationExecutorService;
import org.thingsboard.server.service.executors.PubSubRuleNodeExecutorProvider;
import org.thingsboard.server.service.executors.SharedEventLoopGroupService;
import org.thingsboard.server.service.mail.MailExecutorService;
import org.thingsboard.server.service.profile.TbAssetProfileCache;
@ -183,10 +182,6 @@ public class ActorSystemContext {
@Getter
private DiscoveryService discoveryService;
@Autowired
@Getter
private DataDecodingEncodingService encodingService;
@Autowired
@Getter
private DeviceService deviceService;

View File

@ -16,7 +16,6 @@
package org.thingsboard.server.service.edge.rpc.constructor.device;
import com.google.protobuf.ByteString;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.Device;
@ -26,7 +25,6 @@ import org.thingsboard.server.gen.edge.v1.DeviceCredentialsUpdateMsg;
import org.thingsboard.server.gen.edge.v1.DeviceProfileUpdateMsg;
import org.thingsboard.server.gen.edge.v1.DeviceUpdateMsg;
import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
import org.thingsboard.server.queue.util.TbCoreComponent;
import java.nio.charset.StandardCharsets;
@ -35,9 +33,6 @@ import java.nio.charset.StandardCharsets;
@TbCoreComponent
public class DeviceMsgConstructorV1 extends BaseDeviceMsgConstructor {
@Autowired
private DataDecodingEncodingService dataDecodingEncodingService;
@Override
public DeviceUpdateMsg constructDeviceUpdatedMsg(UpdateMsgType msgType, Device device) {
DeviceUpdateMsg.Builder builder = DeviceUpdateMsg.newBuilder()
@ -69,7 +64,7 @@ public class DeviceMsgConstructorV1 extends BaseDeviceMsgConstructor {
.setSoftwareIdLSB(device.getSoftwareId().getId().getLeastSignificantBits());
}
if (device.getDeviceData() != null) {
builder.setDeviceDataBytes(ByteString.copyFrom(dataDecodingEncodingService.encode(device.getDeviceData())));
builder.setDeviceDataBytes(ByteString.copyFrom(device.getDeviceDataBytes()));
}
return builder.build();
}
@ -98,7 +93,7 @@ public class DeviceMsgConstructorV1 extends BaseDeviceMsgConstructor {
.setName(deviceProfile.getName())
.setDefault(deviceProfile.isDefault())
.setType(deviceProfile.getType().name())
.setProfileDataBytes(ByteString.copyFrom(dataDecodingEncodingService.encode(deviceProfile.getProfileData())));
.setProfileDataBytes(ByteString.copyFrom(deviceProfile.getProfileDataBytes()));
if (deviceProfile.getDefaultQueueName() != null) {
builder.setDefaultQueueName(deviceProfile.getDefaultQueueName());
}

View File

@ -16,7 +16,6 @@
package org.thingsboard.server.service.edge.rpc.constructor.tenant;
import com.google.protobuf.ByteString;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.Tenant;
@ -25,7 +24,6 @@ import org.thingsboard.server.gen.edge.v1.EdgeVersion;
import org.thingsboard.server.gen.edge.v1.TenantProfileUpdateMsg;
import org.thingsboard.server.gen.edge.v1.TenantUpdateMsg;
import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.edge.rpc.utils.EdgeVersionUtils;
@ -33,9 +31,6 @@ import org.thingsboard.server.service.edge.rpc.utils.EdgeVersionUtils;
@TbCoreComponent
public class TenantMsgConstructorV1 implements TenantMsgConstructor {
@Autowired
private DataDecodingEncodingService dataDecodingEncodingService;
@Override
public TenantUpdateMsg constructTenantUpdateMsg(UpdateMsgType msgType, Tenant tenant) {
TenantUpdateMsg.Builder builder = TenantUpdateMsg.newBuilder()
@ -79,7 +74,7 @@ public class TenantMsgConstructorV1 implements TenantMsgConstructor {
@Override
public TenantProfileUpdateMsg constructTenantProfileUpdateMsg(UpdateMsgType msgType, TenantProfile tenantProfile, EdgeVersion edgeVersion) {
ByteString profileData = EdgeVersionUtils.isEdgeVersionOlderThan(edgeVersion, EdgeVersion.V_3_6_1) ?
ByteString.empty() : ByteString.copyFrom(dataDecodingEncodingService.encode(tenantProfile.getProfileData()));
ByteString.empty() : ByteString.copyFrom(tenantProfile.getProfileDataBytes());
TenantProfileUpdateMsg.Builder builder = TenantProfileUpdateMsg.newBuilder()
.setMsgType(msgType)
.setIdMSB(tenantProfile.getId().getId().getMostSignificantBits())

View File

@ -16,11 +16,9 @@
package org.thingsboard.server.service.edge.rpc.processor.device;
import com.datastax.oss.driver.api.core.uuid.Uuids;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.device.data.DeviceData;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.DeviceProfileId;
@ -30,19 +28,14 @@ import org.thingsboard.server.common.data.security.DeviceCredentials;
import org.thingsboard.server.common.data.security.DeviceCredentialsType;
import org.thingsboard.server.gen.edge.v1.DeviceCredentialsUpdateMsg;
import org.thingsboard.server.gen.edge.v1.DeviceUpdateMsg;
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
import org.thingsboard.server.queue.util.TbCoreComponent;
import java.util.Optional;
import java.util.UUID;
@Component
@TbCoreComponent
public class DeviceEdgeProcessorV1 extends DeviceEdgeProcessor {
@Autowired
private DataDecodingEncodingService dataDecodingEncodingService;
@Override
protected Device constructDeviceFromUpdateMsg(TenantId tenantId, DeviceId deviceId, DeviceUpdateMsg deviceUpdateMsg) {
Device device = new Device();
@ -57,8 +50,7 @@ public class DeviceEdgeProcessorV1 extends DeviceEdgeProcessor {
UUID deviceProfileUUID = safeGetUUID(deviceUpdateMsg.getDeviceProfileIdMSB(), deviceUpdateMsg.getDeviceProfileIdLSB());
device.setDeviceProfileId(deviceProfileUUID != null ? new DeviceProfileId(deviceProfileUUID) : null);
Optional<DeviceData> deviceDataOpt = dataDecodingEncodingService.decode(deviceUpdateMsg.getDeviceDataBytes().toByteArray());
device.setDeviceData(deviceDataOpt.orElse(null));
device.setDeviceDataBytes(deviceUpdateMsg.getDeviceDataBytes().toByteArray());
UUID firmwareUUID = safeGetUUID(deviceUpdateMsg.getFirmwareIdMSB(), deviceUpdateMsg.getFirmwareIdLSB());
device.setFirmwareId(firmwareUUID != null ? new OtaPackageId(firmwareUUID) : null);

View File

@ -16,34 +16,27 @@
package org.thingsboard.server.service.edge.rpc.processor.device.profile;
import com.datastax.oss.driver.api.core.uuid.Uuids;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.DeviceProfileProvisionType;
import org.thingsboard.server.common.data.DeviceProfileType;
import org.thingsboard.server.common.data.DeviceTransportType;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.device.profile.DeviceProfileData;
import org.thingsboard.server.common.data.id.DashboardId;
import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.OtaPackageId;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.gen.edge.v1.DeviceProfileUpdateMsg;
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
import org.thingsboard.server.queue.util.TbCoreComponent;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.UUID;
@Component
@TbCoreComponent
public class DeviceProfileEdgeProcessorV1 extends DeviceProfileEdgeProcessor {
@Autowired
private DataDecodingEncodingService dataDecodingEncodingService;
@Override
protected DeviceProfile constructDeviceProfileFromUpdateMsg(TenantId tenantId, DeviceProfileId deviceProfileId, DeviceProfileUpdateMsg deviceProfileUpdateMsg) {
DeviceProfile deviceProfile = new DeviceProfile();
@ -63,9 +56,7 @@ public class DeviceProfileEdgeProcessorV1 extends DeviceProfileEdgeProcessor {
? deviceProfileUpdateMsg.getProvisionDeviceKey() : null);
deviceProfile.setDefaultQueueName(deviceProfileUpdateMsg.getDefaultQueueName());
Optional<DeviceProfileData> profileDataOpt =
dataDecodingEncodingService.decode(deviceProfileUpdateMsg.getProfileDataBytes().toByteArray());
deviceProfile.setProfileData(profileDataOpt.orElse(null));
deviceProfile.setProfileDataBytes(deviceProfileUpdateMsg.getProfileDataBytes().toByteArray());
String defaultQueueName = StringUtils.isNotBlank(deviceProfileUpdateMsg.getDefaultQueueName())
? deviceProfileUpdateMsg.getDefaultQueueName() : null;

View File

@ -15,7 +15,6 @@
*/
package org.thingsboard.server.service.queue;
import com.google.protobuf.ByteString;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@ -57,6 +56,7 @@ import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse;
import org.thingsboard.server.common.msg.rule.engine.DeviceEdgeUpdateMsg;
import org.thingsboard.server.common.msg.rule.engine.DeviceNameOrTypeUpdateMsg;
import org.thingsboard.server.common.util.ProtoUtils;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.FromDeviceRPCResponseProto;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
@ -68,10 +68,9 @@ import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.MultipleTbQueueCallbackWrapper;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
import org.thingsboard.server.service.gateway_device.GatewayNotificationsService;
import org.thingsboard.server.service.ota.OtaPackageStateService;
import org.thingsboard.server.service.profile.TbAssetProfileCache;
@ -112,7 +111,6 @@ public class DefaultTbClusterService implements TbClusterService {
private OtaPackageStateService otaPackageStateService;
private final TopicService topicService;
private final DataDecodingEncodingService encodingService;
private final TbDeviceProfileCache deviceProfileCache;
private final TbAssetProfileCache assetProfileCache;
private final GatewayNotificationsService gatewayNotificationsService;
@ -351,10 +349,7 @@ public class DefaultTbClusterService implements TbClusterService {
public <T> void broadcastEntityChangeToTransport(TenantId tenantId, EntityId entityid, T entity, TbQueueCallback callback) {
String entityName = (entity instanceof HasName) ? ((HasName) entity).getName() : entity.getClass().getName();
log.trace("[{}][{}][{}] Processing [{}] change event", tenantId, entityid.getEntityType(), entityid.getId(), entityName);
TransportProtos.EntityUpdateMsg entityUpdateMsg = TransportProtos.EntityUpdateMsg.newBuilder()
.setEntityType(entityid.getEntityType().name())
.setData(ByteString.copyFrom(encodingService.encode(entity))).build();
ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setEntityUpdateMsg(entityUpdateMsg).build();
ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setEntityUpdateMsg(ProtoUtils.toEntityUpdateProto(entity)).build();
broadcast(transportMsg, callback);
}

View File

@ -44,8 +44,10 @@ import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse;
import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequestActorMsg;
import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.common.util.ProtoUtils;
import org.thingsboard.server.dao.resource.ImageCacheKey;
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.DeviceStateServiceMsgProto;
@ -59,6 +61,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.TbAlarmDeleteProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbAlarmUpdateProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeDeleteProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeUpdateProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbEntitySubEventProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesDeleteProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesUpdateProto;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
@ -66,14 +69,12 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMs
import org.thingsboard.server.gen.transport.TransportProtos.ToOtaPackageStateServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TbEntitySubEventProto;
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
import org.thingsboard.server.queue.util.AfterStartUp;
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
import org.thingsboard.server.service.edge.EdgeNotificationService;
@ -83,10 +84,8 @@ import org.thingsboard.server.service.profile.TbAssetProfileCache;
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
import org.thingsboard.server.service.queue.processing.AbstractConsumerService;
import org.thingsboard.server.service.queue.processing.IdMsgPair;
import org.thingsboard.server.dao.resource.ImageCacheKey;
import org.thingsboard.server.service.resource.TbImageService;
import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService;
import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequestActorMsg;
import org.thingsboard.server.service.security.auth.jwt.settings.JwtSettingsService;
import org.thingsboard.server.service.state.DeviceStateService;
import org.thingsboard.server.service.subscription.SubscriptionManagerService;
@ -154,7 +153,6 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
DeviceStateService stateService,
TbLocalSubscriptionService localSubscriptionService,
SubscriptionManagerService subscriptionManagerService,
DataDecodingEncodingService encodingService,
TbCoreDeviceRpcService tbCoreDeviceRpcService,
StatsFactory statsFactory,
TbDeviceProfileCache deviceProfileCache,
@ -171,7 +169,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
NotificationSchedulerService notificationSchedulerService,
NotificationRuleProcessor notificationRuleProcessor,
TbImageService imageService) {
super(actorContext, encodingService, tenantProfileCache, deviceProfileCache, assetProfileCache, apiUsageStateService, partitionService, eventPublisher, tbCoreQueueFactory.createToCoreNotificationsMsgConsumer(), jwtSettingsService);
super(actorContext, tenantProfileCache, deviceProfileCache, assetProfileCache, apiUsageStateService, partitionService, eventPublisher, tbCoreQueueFactory.createToCoreNotificationsMsgConsumer(), jwtSettingsService);
this.mainConsumer = tbCoreQueueFactory.createToCoreMsgConsumer();
this.usageStatsConsumer = tbCoreQueueFactory.createToUsageStatsServiceMsgConsumer();
this.firmwareStatesConsumer = tbCoreQueueFactory.createToOtaPackageStateServiceMsgConsumer();
@ -284,19 +282,6 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
}
}
callback.onSuccess();
} else if (!toCoreMsg.getToDeviceActorNotificationMsg().isEmpty()) {
// will be removed in 3.6.1 in favour of hasToDeviceActorNotification()
Optional<TbActorMsg> actorMsg = encodingService.decode(toCoreMsg.getToDeviceActorNotificationMsg().toByteArray());
if (actorMsg.isPresent()) {
TbActorMsg tbActorMsg = actorMsg.get();
if (tbActorMsg.getMsgType().equals(MsgType.DEVICE_RPC_REQUEST_TO_DEVICE_ACTOR_MSG)) {
tbCoreDeviceRpcService.forwardRpcRequestToDeviceActor((ToDeviceRpcRequestActorMsg) tbActorMsg);
} else {
log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get());
actorContext.tell(actorMsg.get());
}
}
callback.onSuccess();
} else if (toCoreMsg.hasNotificationSchedulerServiceMsg()) {
TransportProtos.NotificationSchedulerServiceMsg notificationSchedulerServiceMsg = toCoreMsg.getNotificationSchedulerServiceMsg();
log.trace("[{}] Forwarding message to notification scheduler service {}", id, toCoreMsg.getNotificationSchedulerServiceMsg());
@ -373,28 +358,15 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
} else if (toCoreNotification.hasComponentLifecycle()) {
handleComponentLifecycleMsg(id, ProtoUtils.fromProto(toCoreNotification.getComponentLifecycle()));
callback.onSuccess();
} else if (!toCoreNotification.getComponentLifecycleMsg().isEmpty()) {
//will be removed in 3.6.1 in favour of hasComponentLifecycle()
handleComponentLifecycleMsg(id, toCoreNotification.getComponentLifecycleMsg());
callback.onSuccess();
} else if (toCoreNotification.hasEdgeEventUpdate()) {
forwardToAppActor(id, ProtoUtils.fromProto(toCoreNotification.getEdgeEventUpdate()));
callback.onSuccess();
} else if (!toCoreNotification.getEdgeEventUpdateMsg().isEmpty()) {
//will be removed in 3.6.1 in favour of hasEdgeEventUpdate()
forwardToAppActor(id, encodingService.decode(toCoreNotification.getEdgeEventUpdateMsg().toByteArray()), callback);
} else if (toCoreNotification.hasToEdgeSyncRequest()) {
forwardToAppActor(id, ProtoUtils.fromProto(toCoreNotification.getToEdgeSyncRequest()));
callback.onSuccess();
} else if (!toCoreNotification.getToEdgeSyncRequestMsg().isEmpty()) {
//will be removed in 3.6.1 in favour of hasToEdgeSyncRequest()
forwardToAppActor(id, encodingService.decode(toCoreNotification.getToEdgeSyncRequestMsg().toByteArray()), callback);
} else if (toCoreNotification.hasFromEdgeSyncResponse()) {
forwardToAppActor(id, ProtoUtils.fromProto(toCoreNotification.getFromEdgeSyncResponse()));
callback.onSuccess();
} else if (!toCoreNotification.getFromEdgeSyncResponseMsg().isEmpty()) {
//will be removed in 3.6.1 in favour of hasFromEdgeSyncResponse()
forwardToAppActor(id, encodingService.decode(toCoreNotification.getFromEdgeSyncResponseMsg().toByteArray()), callback);
} else if (toCoreNotification.hasQueueUpdateMsg()) {
TransportProtos.QueueUpdateMsg queue = toCoreNotification.getQueueUpdateMsg();
partitionService.updateQueue(queue);
@ -409,9 +381,9 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
} else if (toCoreNotification.hasToSubscriptionMgrMsg()) {
forwardToSubMgrService(toCoreNotification.getToSubscriptionMgrMsg(), callback);
} else if (toCoreNotification.hasNotificationRuleProcessorMsg()) {
Optional<NotificationRuleTrigger> notificationRuleTrigger = encodingService.decode(toCoreNotification
.getNotificationRuleProcessorMsg().getTrigger().toByteArray());
notificationRuleTrigger.ifPresent(notificationRuleProcessor::process);
NotificationRuleTrigger notificationRuleTrigger =
JacksonUtil.fromBytes(toCoreNotification.getNotificationRuleProcessorMsg().getTrigger().toByteArray(), NotificationRuleTrigger.class);
notificationRuleProcessor.process(notificationRuleTrigger);
callback.onSuccess();
} else if (toCoreNotification.hasResourceCacheInvalidateMsg()) {
forwardToResourceService(toCoreNotification.getResourceCacheInvalidateMsg(), callback);

View File

@ -39,7 +39,6 @@ import org.thingsboard.server.queue.discovery.QueueKey;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
import org.thingsboard.server.queue.provider.TbRuleEngineQueueFactory;
import org.thingsboard.server.queue.util.AfterStartUp;
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
import org.thingsboard.server.queue.util.TbRuleEngineComponent;
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
import org.thingsboard.server.service.profile.TbAssetProfileCache;
@ -71,7 +70,6 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
public DefaultTbRuleEngineConsumerService(TbRuleEngineConsumerContext ctx,
TbRuleEngineQueueFactory tbRuleEngineQueueFactory,
ActorSystemContext actorContext,
DataDecodingEncodingService encodingService,
TbRuleEngineDeviceRpcService tbDeviceRpcService,
QueueService queueService,
TbDeviceProfileCache deviceProfileCache,
@ -79,7 +77,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
TbTenantProfileCache tenantProfileCache,
TbApiUsageStateService apiUsageStateService,
PartitionService partitionService, ApplicationEventPublisher eventPublisher) {
super(actorContext, encodingService, tenantProfileCache, deviceProfileCache, assetProfileCache, apiUsageStateService, partitionService,
super(actorContext, tenantProfileCache, deviceProfileCache, assetProfileCache, apiUsageStateService, partitionService,
eventPublisher, tbRuleEngineQueueFactory.createToRuleEngineNotificationsMsgConsumer(), Optional.empty());
this.ctx = ctx;
this.tbDeviceRpcService = tbDeviceRpcService;
@ -152,10 +150,6 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
if (nfMsg.hasComponentLifecycle()) {
handleComponentLifecycleMsg(id, ProtoUtils.fromProto(nfMsg.getComponentLifecycle()));
callback.onSuccess();
} else if (!nfMsg.getComponentLifecycleMsg().isEmpty()) {
//will be removed in 3.6.1 in favour of hasComponentLifecycle()
handleComponentLifecycleMsg(id, nfMsg.getComponentLifecycleMsg());
callback.onSuccess();
} else if (nfMsg.hasFromDeviceRpcResponse()) {
TransportProtos.FromDeviceRPCResponseProto proto = nfMsg.getFromDeviceRpcResponse();
RpcError error = proto.getError() > 0 ? RpcError.values()[proto.getError()] : null;

View File

@ -109,10 +109,9 @@ public class TbCoreConsumerStats {
this.toCoreNfSubscriptionServiceCounter = register(statsFactory.createStatsCounter(statsKey, TO_CORE_NF_SUBSCRIPTION_SERVICE));
this.toCoreNfSubscriptionManagerCounter = register(statsFactory.createStatsCounter(statsKey, TO_CORE_NF_SUBSCRIPTION_MANAGER));
this.toCoreNfVersionControlResponseCounter = register(statsFactory.createStatsCounter(statsKey, TO_CORE_NF_VC_RESPONSE));
}
private StatsCounter register(StatsCounter counter){
private StatsCounter register(StatsCounter counter) {
counters.add(counter);
return counter;
}
@ -170,20 +169,12 @@ public class TbCoreConsumerStats {
toCoreNfDeviceRpcResponseCounter.increment();
} else if (msg.hasComponentLifecycle()) {
toCoreNfComponentLifecycleCounter.increment();
} else if (!msg.getComponentLifecycleMsg().isEmpty()) {
toCoreNfComponentLifecycleCounter.increment();
} else if (msg.hasEdgeEventUpdate()) {
toCoreNfEdgeEventUpdateCounter.increment();
} else if (!msg.getEdgeEventUpdateMsg().isEmpty()) {
toCoreNfEdgeEventUpdateCounter.increment();
} else if (msg.hasToEdgeSyncRequest()) {
toCoreNfEdgeSyncRequestCounter.increment();
} else if (!msg.getToEdgeSyncRequestMsg().isEmpty()) {
toCoreNfEdgeSyncRequestCounter.increment();
} else if (msg.hasFromEdgeSyncResponse()) {
toCoreNfEdgeSyncResponseCounter.increment();
} else if (!msg.getFromEdgeSyncResponseMsg().isEmpty()) {
toCoreNfEdgeSyncResponseCounter.increment();
} else if (msg.hasQueueUpdateMsg()) {
toCoreNfQueueUpdateCounter.increment();
} else if (msg.hasQueueDeleteMsg()) {

View File

@ -15,7 +15,7 @@
*/
package org.thingsboard.server.service.queue.processing;
import com.google.protobuf.ByteString;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationEventPublisher;
@ -41,7 +41,6 @@ import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbApplicationEventListener;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
import org.thingsboard.server.queue.util.AfterStartUp;
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
import org.thingsboard.server.service.profile.TbAssetProfileCache;
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
@ -62,13 +61,13 @@ import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@Slf4j
@RequiredArgsConstructor
public abstract class AbstractConsumerService<N extends com.google.protobuf.GeneratedMessageV3> extends TbApplicationEventListener<PartitionChangeEvent> {
protected volatile ExecutorService notificationsConsumerExecutor;
protected volatile boolean stopped = false;
protected volatile boolean isReady = false;
protected final ActorSystemContext actorContext;
protected final DataDecodingEncodingService encodingService;
protected final TbTenantProfileCache tenantProfileCache;
protected final TbDeviceProfileCache deviceProfileCache;
protected final TbAssetProfileCache assetProfileCache;
@ -79,24 +78,6 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene
protected final TbQueueConsumer<TbProtoQueueMsg<N>> nfConsumer;
protected final Optional<JwtSettingsService> jwtSettingsService;
public AbstractConsumerService(ActorSystemContext actorContext, DataDecodingEncodingService encodingService,
TbTenantProfileCache tenantProfileCache, TbDeviceProfileCache deviceProfileCache,
TbAssetProfileCache assetProfileCache, TbApiUsageStateService apiUsageStateService,
PartitionService partitionService, ApplicationEventPublisher eventPublisher,
TbQueueConsumer<TbProtoQueueMsg<N>> nfConsumer, Optional<JwtSettingsService> jwtSettingsService) {
this.actorContext = actorContext;
this.encodingService = encodingService;
this.tenantProfileCache = tenantProfileCache;
this.deviceProfileCache = deviceProfileCache;
this.assetProfileCache = assetProfileCache;
this.apiUsageStateService = apiUsageStateService;
this.partitionService = partitionService;
this.eventPublisher = eventPublisher;
this.nfConsumer = nfConsumer;
this.jwtSettingsService = jwtSettingsService;
}
public void init(String nfConsumerThreadName) {
this.notificationsConsumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName(nfConsumerThreadName));
}
@ -166,12 +147,6 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene
});
}
// To be removed in 3.6.1 in favour of handleComponentLifecycleMsg(UUID id, TbActorMsg actorMsg)
protected void handleComponentLifecycleMsg(UUID id, ByteString nfMsg) {
Optional<TbActorMsg> actorMsgOpt = encodingService.decode(nfMsg.toByteArray());
actorMsgOpt.ifPresent(tbActorMsg -> handleComponentLifecycleMsg(id, tbActorMsg));
}
protected void handleComponentLifecycleMsg(UUID id, TbActorMsg actorMsg) {
if (actorMsg instanceof ComponentLifecycleMsg) {
ComponentLifecycleMsg componentLifecycleMsg = (ComponentLifecycleMsg) actorMsg;

View File

@ -20,7 +20,6 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.ByteString;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
@ -46,6 +45,7 @@ import org.thingsboard.server.common.data.sync.vc.VersionCreationResult;
import org.thingsboard.server.common.data.sync.vc.VersionedEntityInfo;
import org.thingsboard.server.common.data.sync.vc.request.create.VersionCreateRequest;
import org.thingsboard.server.common.data.util.CollectionsUtil;
import org.thingsboard.server.common.util.ProtoUtils;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.CommitRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.EntitiesContentRequestMsg;
@ -60,7 +60,6 @@ import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.TbQueueMsgMetadata;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.scheduler.SchedulerComponent;
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.sync.vc.data.ClearRepositoryGitRequest;
import org.thingsboard.server.service.sync.vc.data.CommitGitRequest;
@ -96,7 +95,6 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
private final TbServiceInfoProvider serviceInfoProvider;
private final TbClusterService clusterService;
private final DataDecodingEncodingService encodingService;
private final DefaultEntitiesVersionControlService entitiesVersionControlService;
private final SchedulerComponent scheduler;
@ -109,12 +107,10 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
private int msgChunkSize;
public DefaultGitVersionControlQueueService(TbServiceInfoProvider serviceInfoProvider, TbClusterService clusterService,
DataDecodingEncodingService encodingService,
@Lazy DefaultEntitiesVersionControlService entitiesVersionControlService,
SchedulerComponent scheduler) {
this.serviceInfoProvider = serviceInfoProvider;
this.clusterService = clusterService;
this.encodingService = encodingService;
this.entitiesVersionControlService = entitiesVersionControlService;
this.scheduler = scheduler;
}
@ -588,7 +584,7 @@ public class DefaultGitVersionControlQueueService implements GitVersionControlQu
vcSettings = entitiesVersionControlService.getVersionControlSettings(tenantId);
}
if (vcSettings != null) {
builder.setVcSettings(ByteString.copyFrom(encodingService.encode(vcSettings)));
builder.setVcSettings(ProtoUtils.toProto(vcSettings));
} else if (request.requiresSettings()) {
throw new RuntimeException("No entity version control settings provisioned!");
}

View File

@ -99,7 +99,6 @@ import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceLwM2MC
import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
@ -139,7 +138,6 @@ public class DefaultTransportApiService implements TransportApiService {
private final DeviceCredentialsService deviceCredentialsService;
private final DbCallbackExecutorService dbCallbackExecutorService;
private final TbClusterService tbClusterService;
private final DataDecodingEncodingService dataDecodingEncodingService;
private final DeviceProvisionService deviceProvisionService;
private final ResourceService resourceService;
private final OtaPackageService otaPackageService;
@ -388,7 +386,7 @@ public class DefaultTransportApiService implements TransportApiService {
.setDeviceInfo(ProtoUtils.toDeviceInfoProto(device));
DeviceProfile deviceProfile = deviceProfileCache.get(device.getTenantId(), device.getDeviceProfileId());
if (deviceProfile != null) {
builder.setProfileBody(ByteString.copyFrom(dataDecodingEncodingService.encode(deviceProfile)));
builder.setDeviceProfile(ProtoUtils.toProto(deviceProfile));
} else {
log.warn("[{}] Failed to find device profile [{}] for device. ", device.getId(), device.getDeviceProfileId());
}
@ -465,13 +463,13 @@ public class DefaultTransportApiService implements TransportApiService {
if (entityType.equals(EntityType.DEVICE_PROFILE)) {
DeviceProfileId deviceProfileId = new DeviceProfileId(entityUuid);
DeviceProfile deviceProfile = deviceProfileCache.find(deviceProfileId);
builder.setData(ByteString.copyFrom(dataDecodingEncodingService.encode(deviceProfile)));
builder.setDeviceProfile(ProtoUtils.toProto(deviceProfile));
} else if (entityType.equals(EntityType.TENANT)) {
TenantId tenantId = TenantId.fromUUID(entityUuid);
TenantProfile tenantProfile = tenantProfileCache.get(tenantId);
ApiUsageState state = apiUsageStateService.getApiUsageState(tenantId);
builder.setData(ByteString.copyFrom(dataDecodingEncodingService.encode(tenantProfile)));
builder.setApiState(ByteString.copyFrom(dataDecodingEncodingService.encode(state)));
builder.setTenantProfile(ProtoUtils.toProto(tenantProfile));
builder.setApiState(ProtoUtils.toProto(state));
} else {
throw new RuntimeException("Invalid entity profile request: " + entityType);
}
@ -490,7 +488,7 @@ public class DefaultTransportApiService implements TransportApiService {
.setDeviceProfileIdMSB(deviceProfileId.getMostSignificantBits())
.setDeviceProfileIdLSB(deviceProfileId.getLeastSignificantBits())
.setDeviceTransportConfiguration(ByteString.copyFrom(
dataDecodingEncodingService.encode(device.getDeviceData().getTransportConfiguration())
JacksonUtil.writeValueAsBytes(device.getDeviceData().getTransportConfiguration())
)))
.build();
} else {
@ -506,11 +504,10 @@ public class DefaultTransportApiService implements TransportApiService {
return Futures.immediateFuture(TransportApiResponseMsg.newBuilder()
.setDeviceCredentialsResponseMsg(TransportProtos.GetDeviceCredentialsResponseMsg.newBuilder()
.setDeviceCredentialsData(ByteString.copyFrom(dataDecodingEncodingService.encode(deviceCredentials))))
.setDeviceCredentialsData(ProtoUtils.toProto(deviceCredentials)))
.build());
}
private ListenableFuture<TransportApiResponseMsg> handle(GetResourceRequestMsg requestMsg) {
TenantId tenantId = TenantId.fromUUID(new UUID(requestMsg.getTenantIdMSB(), requestMsg.getTenantIdLSB()));
ResourceType resourceType = ResourceType.valueOf(requestMsg.getResourceType());
@ -523,7 +520,7 @@ public class DefaultTransportApiService implements TransportApiService {
}
if (resource != null) {
builder.setResource(ByteString.copyFrom(dataDecodingEncodingService.encode(resource)));
builder.setResource(ProtoUtils.toProto(resource));
}
return Futures.immediateFuture(TransportApiResponseMsg.newBuilder().setResourceResponseMsg(builder).build());
@ -556,7 +553,7 @@ public class DefaultTransportApiService implements TransportApiService {
builder.setDeviceInfo(ProtoUtils.toDeviceInfoProto(device));
DeviceProfile deviceProfile = deviceProfileCache.get(device.getTenantId(), device.getDeviceProfileId());
if (deviceProfile != null) {
builder.setProfileBody(ByteString.copyFrom(dataDecodingEncodingService.encode(deviceProfile)));
builder.setDeviceProfile(ProtoUtils.toProto(deviceProfile));
} else {
log.warn("[{}] Failed to find device profile [{}] for device. ", device.getId(), device.getDeviceProfileId());
}

View File

@ -95,7 +95,6 @@ import org.thingsboard.server.gen.edge.v1.TenantUpdateMsg;
import org.thingsboard.server.gen.edge.v1.UpdateMsgType;
import org.thingsboard.server.gen.edge.v1.UplinkMsg;
import org.thingsboard.server.gen.edge.v1.UserUpdateMsg;
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
import java.util.ArrayList;
import java.util.List;
@ -124,9 +123,6 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest {
@Autowired
protected EdgeEventService edgeEventService;
@Autowired
protected DataDecodingEncodingService dataDecodingEncodingService;
@Autowired
protected TbClusterService clusterService;
@ -165,7 +161,8 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest {
private RuleChainId getEdgeRootRuleChainId() throws Exception {
List<RuleChain> edgeRuleChains = doGetTypedWithPageLink("/api/edge/" + edge.getUuidId() + "/ruleChains?",
new TypeReference<PageData<RuleChain>>() {}, new PageLink(100)).getData();
new TypeReference<PageData<RuleChain>>() {
}, new PageLink(100)).getData();
for (RuleChain edgeRuleChain : edgeRuleChains) {
if (edgeRuleChain.isRoot()) {
return edgeRuleChain.getId();
@ -182,7 +179,8 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest {
doDelete("/api/edge/" + edge.getId().toString())
.andExpect(status().isOk());
edgeImitator.disconnect();
} catch (Exception ignored) {}
} catch (Exception ignored) {
}
}
private void installation() throws Exception {
@ -379,7 +377,8 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest {
Device device = doGet("/api/device/" + deviceUUID, Device.class);
Assert.assertNotNull(device);
List<DeviceInfo> edgeDevices = doGetTypedWithPageLink("/api/edge/" + edge.getUuidId() + "/devices?",
new TypeReference<PageData<DeviceInfo>>() {}, new PageLink(100)).getData();
new TypeReference<PageData<DeviceInfo>>() {
}, new PageLink(100)).getData();
Assert.assertTrue(edgeDevices.stream().map(DeviceInfo::getId).anyMatch(id -> id.equals(device.getId())));
testAutoGeneratedCodeByProtobuf(deviceUpdateMsg);
@ -398,7 +397,8 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest {
Asset asset = doGet("/api/asset/" + assetUUID, Asset.class);
Assert.assertNotNull(asset);
List<Asset> edgeAssets = doGetTypedWithPageLink("/api/edge/" + edge.getUuidId() + "/assets?",
new TypeReference<PageData<Asset>>() {}, new PageLink(100)).getData();
new TypeReference<PageData<Asset>>() {
}, new PageLink(100)).getData();
Assert.assertTrue(edgeAssets.contains(asset));
testAutoGeneratedCodeByProtobuf(assetUpdateMsg);
@ -418,7 +418,8 @@ abstract public class AbstractEdgeTest extends AbstractControllerTest {
RuleChain ruleChain = doGet("/api/ruleChain/" + ruleChainUUID, RuleChain.class);
Assert.assertNotNull(ruleChain);
List<RuleChain> edgeRuleChains = doGetTypedWithPageLink("/api/edge/" + edge.getUuidId() + "/ruleChains?",
new TypeReference<PageData<RuleChain>>() {}, new PageLink(100)).getData();
new TypeReference<PageData<RuleChain>>() {
}, new PageLink(100)).getData();
Assert.assertTrue(edgeRuleChains.contains(ruleChain));
testAutoGeneratedCodeByProtobuf(ruleChainUpdateMsg);
}

View File

@ -59,7 +59,6 @@ import org.thingsboard.server.dao.widget.WidgetsBundleService;
import org.thingsboard.server.gen.edge.v1.EdgeVersion;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
import org.thingsboard.server.service.edge.rpc.constructor.alarm.AlarmMsgConstructorFactory;
import org.thingsboard.server.service.edge.rpc.constructor.alarm.AlarmMsgConstructorV1;
import org.thingsboard.server.service.edge.rpc.constructor.alarm.AlarmMsgConstructorV2;
@ -477,9 +476,6 @@ public abstract class BaseEdgeProcessorTest {
@MockBean
protected DbCallbackExecutorService dbCallbackExecutorService;
@MockBean
protected DataDecodingEncodingService dataDecodingEncodingService;
protected EdgeId edgeId;
protected TenantId tenantId;

View File

@ -74,8 +74,6 @@ public abstract class AbstractDeviceProcessorTest extends BaseEdgeProcessorTest
willReturn(device).given(deviceService).findDeviceById(tenantId, deviceId);
willReturn(deviceProfile).given(deviceProfileService).findDeviceProfileById(tenantId, deviceProfileId);
willReturn(new byte[]{0x00}).given(dataDecodingEncodingService).encode(deviceProfileData);
}
protected void updateDeviceProfileDefaultFields(long expectedDashboardIdMSB, long expectedDashboardIdLSB,

View File

@ -32,10 +32,9 @@ import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
import org.thingsboard.server.service.gateway_device.GatewayNotificationsService;
import org.thingsboard.server.service.profile.TbAssetProfileCache;
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
@ -64,8 +63,6 @@ public class DefaultTbClusterServiceTest {
public static final String TRANSPORT = "transport";
@MockBean
protected DataDecodingEncodingService encodingService;
@MockBean
protected TbDeviceProfileCache deviceProfileCache;
@MockBean

View File

@ -47,7 +47,6 @@ import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.dao.resource.ResourceService;
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
@ -92,8 +91,6 @@ public class DefaultTransportApiServiceTest {
@MockBean
protected TbClusterService tbClusterService;
@MockBean
protected DataDecodingEncodingService dataDecodingEncodingService;
@MockBean
protected DeviceProvisionService deviceProvisionService;
@MockBean
protected ResourceService resourceService;

View File

@ -38,13 +38,13 @@ public class DeviceRedisCache extends RedisTbTransactionalCache<DeviceCacheKey,
@Override
public byte[] serialize(Device device) throws SerializationException {
return ProtoUtils.toDeviceProto(device).toByteArray();
return ProtoUtils.toProto(device).toByteArray();
}
@Override
public Device deserialize(DeviceCacheKey key, byte[] bytes) throws SerializationException {
try {
return ProtoUtils.fromDeviceProto(TransportProtos.DeviceProto.parseFrom(bytes));
return ProtoUtils.fromProto(TransportProtos.DeviceProto.parseFrom(bytes));
} catch (InvalidProtocolBufferException e) {
throw new SerializationException(e.getMessage());
}

View File

@ -17,18 +17,25 @@ package org.thingsboard.server.common.util;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.protobuf.ByteString;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.ApiUsageState;
import org.thingsboard.server.common.data.ApiUsageStateValue;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.DeviceProfileProvisionType;
import org.thingsboard.server.common.data.DeviceProfileType;
import org.thingsboard.server.common.data.DeviceTransportType;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.ResourceType;
import org.thingsboard.server.common.data.TbResource;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.TenantProfile;
import org.thingsboard.server.common.data.device.data.CoapDeviceTransportConfiguration;
import org.thingsboard.server.common.data.device.data.Lwm2mDeviceTransportConfiguration;
import org.thingsboard.server.common.data.device.data.PowerMode;
import org.thingsboard.server.common.data.device.data.PowerSavingConfiguration;
import org.thingsboard.server.common.data.id.ApiUsageStateId;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DashboardId;
import org.thingsboard.server.common.data.id.DeviceId;
@ -38,7 +45,9 @@ import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.OtaPackageId;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.TbResourceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.TenantProfileId;
import org.thingsboard.server.common.data.kv.AttributeKey;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
@ -53,6 +62,8 @@ import org.thingsboard.server.common.data.rpc.RpcError;
import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
import org.thingsboard.server.common.data.security.DeviceCredentials;
import org.thingsboard.server.common.data.security.DeviceCredentialsType;
import org.thingsboard.server.common.data.sync.vc.RepositoryAuthMethod;
import org.thingsboard.server.common.data.sync.vc.RepositorySettings;
import org.thingsboard.server.common.msg.ToDeviceActorNotificationMsg;
import org.thingsboard.server.common.msg.edge.EdgeEventUpdateMsg;
import org.thingsboard.server.common.msg.edge.FromEdgeSyncResponse;
@ -78,6 +89,7 @@ import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
@Slf4j
public class ProtoUtils {
private static final EntityType[] entityTypeByProtoNumber;
@ -517,54 +529,79 @@ public class ProtoUtils {
return result;
}
public static TransportProtos.DeviceProto toDeviceProto(Device device) {
public static TransportProtos.DeviceProto toProto(Device device) {
var builder = TransportProtos.DeviceProto.newBuilder()
.setTenantIdMSB(device.getTenantId().getId().getMostSignificantBits())
.setTenantIdLSB(device.getTenantId().getId().getLeastSignificantBits())
.setCustomerIdMSB(getMsb(device.getCustomerId()))
.setCustomerIdLSB(getLsb(device.getCustomerId()))
.setDeviceIdMSB(device.getId().getId().getMostSignificantBits())
.setDeviceIdLSB(device.getId().getId().getLeastSignificantBits())
.setCreatedTime(device.getCreatedTime())
.setDeviceName(device.getName())
.setDeviceLabel(toProtoString(device.getLabel()))
.setDeviceType(device.getType())
.setDeviceProfileIdMSB(device.getDeviceProfileId().getId().getMostSignificantBits())
.setDeviceProfileIdLSB(device.getDeviceProfileId().getId().getLeastSignificantBits())
.setAdditionalInfo(JacksonUtil.toString(device.getAdditionalInfo()))
.setFirmwareIdMSB(getMsb(device.getFirmwareId()))
.setFirmwareIdLSB(getLsb(device.getFirmwareId()))
.setSoftwareIdMSB(getMsb(device.getSoftwareId()))
.setSoftwareIdLSB(getLsb(device.getSoftwareId()))
.setExternalIdMSB(getMsb(device.getExternalId()))
.setExternalIdLSB(getLsb(device.getExternalId()));
.setDeviceProfileIdLSB(device.getDeviceProfileId().getId().getLeastSignificantBits());
if (device.getDeviceDataBytes() != null) {
if (isNotNull(device.getCustomerId())) {
builder.setCustomerIdMSB(getMsb(device.getCustomerId()))
.setCustomerIdLSB(getLsb(device.getCustomerId()));
}
if (isNotNull(device.getLabel())) {
builder.setDeviceLabel(device.getLabel());
}
if (isNotNull(device.getAdditionalInfo())) {
builder.setAdditionalInfo(JacksonUtil.toString(device.getAdditionalInfo()));
}
if (isNotNull(device.getFirmwareId())) {
builder.setFirmwareIdMSB(getMsb(device.getFirmwareId()))
.setFirmwareIdLSB(getLsb(device.getFirmwareId()));
}
if (isNotNull(device.getSoftwareId())) {
builder.setSoftwareIdMSB(getMsb(device.getSoftwareId()))
.setSoftwareIdLSB(getLsb(device.getSoftwareId()));
}
if (isNotNull(device.getExternalId())) {
builder.setExternalIdMSB(getMsb(device.getExternalId()))
.setExternalIdLSB(getLsb(device.getExternalId()));
}
if (isNotNull(device.getDeviceDataBytes())) {
builder.setDeviceData(ByteString.copyFrom(device.getDeviceDataBytes()));
}
return builder.build();
}
public static Device fromDeviceProto(TransportProtos.DeviceProto proto) {
public static Device fromProto(TransportProtos.DeviceProto proto) {
Device device = new Device(getEntityId(proto.getDeviceIdMSB(), proto.getDeviceIdLSB(), DeviceId::new));
device.setCreatedTime(proto.getCreatedTime());
device.setTenantId(getEntityId(proto.getTenantIdMSB(), proto.getTenantIdLSB(), TenantId::new));
device.setCustomerId(getEntityId(proto.getCustomerIdMSB(), proto.getCustomerIdLSB(), CustomerId::new));
device.setName(proto.getDeviceName());
device.setLabel(fromProtoString(proto.getDeviceLabel()));
device.setType(proto.getDeviceType());
device.setDeviceProfileId(getEntityId(proto.getDeviceProfileIdMSB(), proto.getDeviceProfileIdLSB(), DeviceProfileId::new));
device.setAdditionalInfo(JacksonUtil.toJsonNode(proto.getAdditionalInfo()));
device.setFirmwareId(getEntityId(proto.getFirmwareIdMSB(), proto.getFirmwareIdLSB(), OtaPackageId::new));
device.setSoftwareId(getEntityId(proto.getSoftwareIdMSB(), proto.getSoftwareIdLSB(), OtaPackageId::new));
device.setExternalId(getEntityId(proto.getExternalIdMSB(), proto.getExternalIdLSB(), DeviceId::new));
device.setDeviceDataBytes(proto.getDeviceData().toByteArray());
if (proto.hasCustomerIdMSB() && proto.hasCustomerIdLSB()) {
device.setCustomerId(getEntityId(proto.getCustomerIdMSB(), proto.getCustomerIdLSB(), CustomerId::new));
}
if (proto.hasDeviceLabel()) {
device.setLabel(proto.getDeviceLabel());
}
if (proto.hasAdditionalInfo()) {
device.setAdditionalInfo(JacksonUtil.toJsonNode(proto.getAdditionalInfo()));
}
if (proto.hasFirmwareIdMSB() && proto.hasFirmwareIdLSB()) {
device.setFirmwareId(getEntityId(proto.getFirmwareIdMSB(), proto.getFirmwareIdLSB(), OtaPackageId::new));
}
if (proto.hasSoftwareIdMSB() && proto.hasSoftwareIdLSB()) {
device.setSoftwareId(getEntityId(proto.getSoftwareIdMSB(), proto.getSoftwareIdLSB(), OtaPackageId::new));
}
if (proto.hasExternalIdMSB() && proto.hasExternalIdLSB()) {
device.setExternalId(getEntityId(proto.getExternalIdMSB(), proto.getExternalIdLSB(), DeviceId::new));
}
if (proto.hasDeviceData()) {
device.setDeviceDataBytes(proto.getDeviceData().toByteArray());
}
return device;
}
public static TransportProtos.DeviceProfileProto toDeviceProfileProto(DeviceProfile deviceProfile) {
return TransportProtos.DeviceProfileProto.newBuilder()
public static TransportProtos.DeviceProfileProto toProto(DeviceProfile deviceProfile) {
var builder = TransportProtos.DeviceProfileProto.newBuilder()
.setTenantIdMSB(getMsb(deviceProfile.getTenantId()))
.setTenantIdLSB(getLsb(deviceProfile.getTenantId()))
.setDeviceProfileIdMSB(getMsb(deviceProfile.getId()))
@ -574,27 +611,51 @@ public class ProtoUtils {
.setIsDefault(deviceProfile.isDefault())
.setType(deviceProfile.getType().name())
.setTransportType(deviceProfile.getTransportType().name())
.setProvisionType(deviceProfile.getProvisionType().name())
.setDeviceProfileData(ByteString.copyFrom(deviceProfile.getProfileDataBytes()))
.setDescription(toProtoString(deviceProfile.getDescription()))
.setImage(toProtoString(deviceProfile.getImage()))
.setDefaultRuleChainIdMSB(getMsb(deviceProfile.getDefaultRuleChainId()))
.setDefaultRuleChainIdLSB(getMsb(deviceProfile.getDefaultRuleChainId()))
.setDefaultDashboardIdMSB(getMsb(deviceProfile.getDefaultDashboardId()))
.setDefaultDashboardIdLSB(getLsb(deviceProfile.getDefaultDashboardId()))
.setDefaultQueueName(toProtoString(deviceProfile.getDefaultQueueName()))
.setProvisionDeviceKey(toProtoString(deviceProfile.getProvisionDeviceKey()))
.setFirmwareIdMSB(getMsb(deviceProfile.getFirmwareId()))
.setFirmwareIdLSB(getLsb(deviceProfile.getFirmwareId()))
.setSoftwareIdMSB(getMsb(deviceProfile.getSoftwareId()))
.setSoftwareIdLSB(getLsb(deviceProfile.getSoftwareId()))
.setDefaultEdgeRuleChainIdMSB(getMsb(deviceProfile.getDefaultEdgeRuleChainId()))
.setDefaultEdgeRuleChainIdLSB(getLsb(deviceProfile.getDefaultEdgeRuleChainId()))
.setExternalIdMSB(getMsb(deviceProfile.getExternalId()))
.setExternalIdLSB(getLsb(deviceProfile.getExternalId())).build();
.setProvisionType(deviceProfile.getProvisionType().name());
if (isNotNull(deviceProfile.getProfileDataBytes())) {
builder.setDeviceProfileData(ByteString.copyFrom(deviceProfile.getProfileDataBytes()));
}
if (isNotNull(deviceProfile.getDescription())) {
builder.setDescription(deviceProfile.getDescription());
}
if (isNotNull(deviceProfile.getImage())) {
builder.setImage(deviceProfile.getImage());
}
if (isNotNull(deviceProfile.getDefaultRuleChainId())) {
builder.setDefaultRuleChainIdMSB(getMsb(deviceProfile.getDefaultRuleChainId()))
.setDefaultRuleChainIdLSB(getMsb(deviceProfile.getDefaultRuleChainId()));
}
if (isNotNull(deviceProfile.getDefaultDashboardId())) {
builder.setDefaultDashboardIdMSB(getMsb(deviceProfile.getDefaultDashboardId()))
.setDefaultDashboardIdLSB(getLsb(deviceProfile.getDefaultDashboardId()));
}
if (isNotNull(deviceProfile.getDefaultQueueName())) {
builder.setDefaultQueueName(deviceProfile.getDefaultQueueName());
}
if (isNotNull(deviceProfile.getProvisionDeviceKey())) {
builder.setProvisionDeviceKey(deviceProfile.getProvisionDeviceKey());
}
if (isNotNull(deviceProfile.getFirmwareId())) {
builder.setFirmwareIdMSB(getMsb(deviceProfile.getFirmwareId()))
.setFirmwareIdLSB(getLsb(deviceProfile.getFirmwareId()));
}
if (isNotNull(deviceProfile.getSoftwareId())) {
builder.setSoftwareIdMSB(getMsb(deviceProfile.getSoftwareId()))
.setSoftwareIdLSB(getLsb(deviceProfile.getSoftwareId()));
}
if (isNotNull(deviceProfile.getExternalId())) {
builder.setExternalIdMSB(getMsb(deviceProfile.getExternalId()))
.setExternalIdLSB(getLsb(deviceProfile.getExternalId()));
}
if (isNotNull(deviceProfile.getDefaultEdgeRuleChainId())) {
builder.setDefaultEdgeRuleChainIdMSB(getMsb(deviceProfile.getDefaultEdgeRuleChainId()))
.setDefaultEdgeRuleChainIdLSB(getLsb(deviceProfile.getDefaultEdgeRuleChainId()));
}
return builder.build();
}
public static DeviceProfile fromDeviceProfileProto(TransportProtos.DeviceProfileProto proto) {
public static DeviceProfile fromProto(TransportProtos.DeviceProfileProto proto) {
DeviceProfile deviceProfile = new DeviceProfile(getEntityId(proto.getDeviceProfileIdMSB(), proto.getDeviceProfileIdLSB(), DeviceProfileId::new));
deviceProfile.setCreatedTime(proto.getCreatedTime());
deviceProfile.setTenantId(getEntityId(proto.getTenantIdMSB(), proto.getTenantIdLSB(), TenantId::new));
@ -603,21 +664,343 @@ public class ProtoUtils {
deviceProfile.setType(DeviceProfileType.valueOf(proto.getType()));
deviceProfile.setTransportType(DeviceTransportType.valueOf(proto.getTransportType()));
deviceProfile.setProvisionType(DeviceProfileProvisionType.valueOf(proto.getProvisionType()));
deviceProfile.setProfileDataBytes(proto.getDeviceProfileData().toByteArray());
deviceProfile.setDescription(fromProtoString(proto.getDescription()));
deviceProfile.setImage(fromProtoString(proto.getImage()));
deviceProfile.setDefaultRuleChainId(getEntityId(proto.getDefaultRuleChainIdMSB(), proto.getDefaultRuleChainIdLSB(), RuleChainId::new));
deviceProfile.setDefaultDashboardId(getEntityId(proto.getDefaultDashboardIdMSB(), proto.getDefaultDashboardIdLSB(), DashboardId::new));
deviceProfile.setDefaultQueueName(fromProtoString(proto.getDefaultQueueName()));
deviceProfile.setProvisionDeviceKey(fromProtoString(proto.getProvisionDeviceKey()));
deviceProfile.setFirmwareId(getEntityId(proto.getFirmwareIdMSB(), proto.getFirmwareIdLSB(), OtaPackageId::new));
deviceProfile.setSoftwareId(getEntityId(proto.getSoftwareIdMSB(), proto.getSoftwareIdLSB(), OtaPackageId::new));
deviceProfile.setDefaultEdgeRuleChainId(getEntityId(proto.getDefaultEdgeRuleChainIdMSB(), proto.getDefaultEdgeRuleChainIdLSB(), RuleChainId::new));
deviceProfile.setExternalId(getEntityId(proto.getExternalIdMSB(), proto.getExternalIdLSB(), DeviceProfileId::new));
if (proto.hasDeviceProfileData()) {
deviceProfile.setProfileDataBytes(proto.getDeviceProfileData().toByteArray());
}
if (proto.hasDescription()) {
deviceProfile.setDescription(proto.getDescription());
}
if (proto.hasImage()) {
deviceProfile.setImage(proto.getImage());
}
if (proto.hasDefaultRuleChainIdMSB() && proto.hasDefaultRuleChainIdLSB()) {
deviceProfile.setDefaultRuleChainId(getEntityId(proto.getDefaultRuleChainIdMSB(), proto.getDefaultRuleChainIdLSB(), RuleChainId::new));
deviceProfile.setDefaultDashboardId(getEntityId(proto.getDefaultDashboardIdMSB(), proto.getDefaultDashboardIdLSB(), DashboardId::new));
}
if (proto.hasDefaultQueueName()) {
deviceProfile.setDefaultQueueName(proto.getDefaultQueueName());
}
if (proto.hasProvisionDeviceKey()) {
deviceProfile.setProvisionDeviceKey(proto.getProvisionDeviceKey());
}
if (proto.hasFirmwareIdMSB() && proto.hasFirmwareIdLSB()) {
deviceProfile.setFirmwareId(getEntityId(proto.getFirmwareIdMSB(), proto.getFirmwareIdLSB(), OtaPackageId::new));
}
if (proto.hasSoftwareIdMSB() && proto.hasSoftwareIdLSB()) {
deviceProfile.setSoftwareId(getEntityId(proto.getSoftwareIdMSB(), proto.getSoftwareIdLSB(), OtaPackageId::new));
}
if (proto.hasExternalIdMSB() && proto.hasExternalIdLSB()) {
deviceProfile.setExternalId(getEntityId(proto.getExternalIdMSB(), proto.getExternalIdLSB(), DeviceProfileId::new));
}
if (proto.hasDefaultEdgeRuleChainIdMSB() && proto.hasDefaultEdgeRuleChainIdLSB()) {
deviceProfile.setDefaultEdgeRuleChainId(getEntityId(proto.getDefaultEdgeRuleChainIdMSB(), proto.getDefaultEdgeRuleChainIdLSB(), RuleChainId::new));
}
return deviceProfile;
}
public static TransportProtos.TenantProto toProto(Tenant tenant) {
var builder = TransportProtos.TenantProto.newBuilder()
.setTenantIdMSB(getMsb(tenant.getTenantId()))
.setTenantIdLSB(getLsb(tenant.getTenantId()))
.setCreatedTime(tenant.getCreatedTime())
.setTenantProfileIdMSB(getMsb(tenant.getTenantProfileId()))
.setTenantProfileIdLSB(getLsb(tenant.getTenantProfileId()))
.setTitle(tenant.getTitle());
if (isNotNull(tenant.getRegion())) {
builder.setRegion(tenant.getRegion());
}
if (isNotNull(tenant.getCountry())) {
builder.setCountry(tenant.getCountry());
}
if (isNotNull(tenant.getState())) {
builder.setState(tenant.getState());
}
if (isNotNull(tenant.getCity())) {
builder.setCity(tenant.getCity());
}
if (isNotNull(tenant.getAddress())) {
builder.setAddress(tenant.getAddress());
}
if (isNotNull(tenant.getAddress2())) {
builder.setAddress2(tenant.getAddress2());
}
if (isNotNull(tenant.getZip())) {
builder.setZip(tenant.getZip());
}
if (isNotNull(tenant.getPhone())) {
builder.setPhone(tenant.getPhone());
}
if (isNotNull(tenant.getEmail())) {
builder.setEmail(tenant.getEmail());
}
if (isNotNull(tenant.getAdditionalInfo())) {
builder.setAdditionalInfo(JacksonUtil.toString(tenant.getAdditionalInfo()));
}
return builder.build();
}
public static Tenant fromProto(TransportProtos.TenantProto proto) {
Tenant tenant = new Tenant(getEntityId(proto.getTenantIdMSB(), proto.getTenantIdLSB(), TenantId::new));
tenant.setCreatedTime(proto.getCreatedTime());
tenant.setTenantProfileId(getEntityId(proto.getTenantProfileIdMSB(), proto.getTenantProfileIdLSB(), TenantProfileId::new));
tenant.setTitle(tenant.getTitle());
if (proto.hasRegion()) {
tenant.setRegion(proto.getRegion());
}
if (proto.hasCountry()) {
tenant.setCountry(proto.getCountry());
}
if (proto.hasState()) {
tenant.setState(proto.getState());
}
if (proto.hasCity()) {
tenant.setCity(proto.getCity());
}
if (proto.hasAddress()) {
tenant.setAddress(proto.getAddress());
}
if (proto.hasAddress2()) {
tenant.setAddress2(proto.getAddress2());
}
if (proto.hasZip()) {
tenant.setZip(proto.getZip());
}
if (proto.hasPhone()) {
tenant.setPhone(proto.getPhone());
}
if (proto.hasEmail()) {
tenant.setEmail(proto.getEmail());
}
if (proto.hasAdditionalInfo()) {
tenant.setAdditionalInfo(JacksonUtil.toJsonNode(proto.getAdditionalInfo()));
}
return tenant;
}
public static TransportProtos.TenantProfileProto toProto(TenantProfile tenantProfile) {
var builder = TransportProtos.TenantProfileProto.newBuilder()
.setTenantProfileIdMSB(getMsb(tenantProfile.getId()))
.setTenantProfileIdLSB(getLsb(tenantProfile.getId()))
.setCreatedTime(tenantProfile.getCreatedTime())
.setName(tenantProfile.getName())
.setIsDefault(tenantProfile.isDefault())
.setIsolatedTbRuleEngine(tenantProfile.isIsolatedTbRuleEngine());
if (isNotNull(tenantProfile.getDescription())) {
builder.setDescription(tenantProfile.getDescription());
}
if (isNotNull(tenantProfile.getProfileDataBytes())) {
builder.setProfileData(ByteString.copyFrom(tenantProfile.getProfileDataBytes()));
}
return builder.build();
}
public static TenantProfile fromProto(TransportProtos.TenantProfileProto proto) {
TenantProfile tenantProfile = new TenantProfile(getEntityId(proto.getTenantProfileIdMSB(), proto.getTenantProfileIdLSB(), TenantProfileId::new));
tenantProfile.setCreatedTime(proto.getCreatedTime());
tenantProfile.setName(proto.getName());
tenantProfile.setDefault(proto.getIsDefault());
tenantProfile.setIsolatedTbRuleEngine(proto.getIsolatedTbRuleEngine());
if (proto.hasDescription()) {
tenantProfile.setDescription(proto.getDescription());
}
if (proto.hasProfileData()) {
tenantProfile.setProfileDataBytes(proto.getProfileData().toByteArray());
}
return tenantProfile;
}
public static TransportProtos.TbResourceProto toProto(TbResource resource) {
var builder = TransportProtos.TbResourceProto.newBuilder()
.setTenantIdMSB(getMsb(resource.getTenantId()))
.setTenantIdLSB(getLsb(resource.getTenantId()))
.setResourceIdMSB(getMsb(resource.getId()))
.setResourceIdLSB(getLsb(resource.getId()))
.setCreatedTime(resource.getCreatedTime())
.setTitle(resource.getTitle())
.setResourceType(resource.getResourceType().name())
.setResourceKey(resource.getResourceKey())
.setSearchText(resource.getSearchText())
.setFileName(resource.getFileName());
if (isNotNull(resource.getEtag())) {
builder.setEtag(resource.getEtag());
}
if (isNotNull(resource.getDescriptor())) {
builder.setResourceDescriptor(JacksonUtil.toString(resource.getDescriptor()));
}
if (isNotNull(resource.getExternalId())) {
builder.setExternalIdMSB(getMsb(resource.getExternalId()))
.setExternalIdLSB(getLsb(resource.getExternalId()));
}
if (isNotNull(resource.getData())) {
builder.setData(ByteString.copyFrom(resource.getData()));
}
if (isNotNull(resource.getPreview())) {
builder.setPreview(ByteString.copyFrom(resource.getPreview()));
}
return builder.build();
}
public static TbResource fromProto(TransportProtos.TbResourceProto proto) {
TbResource resource = new TbResource(getEntityId(proto.getResourceIdMSB(), proto.getResourceIdLSB(), TbResourceId::new));
resource.setTenantId(getEntityId(proto.getTenantIdMSB(), proto.getTenantIdLSB(), TenantId::new));
resource.setCreatedTime(proto.getCreatedTime());
resource.setTitle(proto.getTitle());
resource.setResourceType(ResourceType.valueOf(proto.getResourceType()));
resource.setResourceKey(proto.getResourceKey());
resource.setSearchText(proto.getSearchText());
resource.setFileName(proto.getFileName());
if (proto.hasEtag()) {
resource.setEtag(proto.getEtag());
}
if (proto.hasResourceDescriptor()) {
resource.setDescriptor(JacksonUtil.toJsonNode(proto.getResourceDescriptor()));
}
if (proto.hasExternalIdMSB() && proto.hasExternalIdLSB()) {
resource.setExternalId(getEntityId(proto.getExternalIdMSB(), proto.getExternalIdLSB(), TbResourceId::new));
}
if (proto.hasData()) {
resource.setData(proto.getData().toByteArray());
}
if (proto.hasPreview()) {
resource.setPreview(proto.getPreview().toByteArray());
}
return resource;
}
public static TransportProtos.ApiUsageStateProto toProto(ApiUsageState apiUsageState) {
return TransportProtos.ApiUsageStateProto.newBuilder()
.setTenantProfileIdMSB(getMsb(apiUsageState.getTenantId()))
.setTenantProfileIdLSB(getLsb(apiUsageState.getTenantId()))
.setApiUsageStateIdMSB(getMsb(apiUsageState.getId()))
.setApiUsageStateIdLSB(getLsb(apiUsageState.getId()))
.setCreatedTime(apiUsageState.getCreatedTime())
.setEntityType(TransportProtos.EntityTypeProto.forNumber(apiUsageState.getEntityId().getEntityType().ordinal()))
.setEntityIdMSB(getMsb(apiUsageState.getEntityId()))
.setEntityIdLSB(getLsb(apiUsageState.getEntityId()))
.setTransportState(apiUsageState.getTransportState().name())
.setDbStorageState(apiUsageState.getDbStorageState().name())
.setReExecState(apiUsageState.getReExecState().name())
.setJsExecState(apiUsageState.getJsExecState().name())
.setTbelExecState(apiUsageState.getTbelExecState().name())
.setEmailExecState(apiUsageState.getEmailExecState().name())
.setSmsExecState(apiUsageState.getSmsExecState().name())
.setAlarmExecState(apiUsageState.getAlarmExecState().name()).build();
}
public static ApiUsageState fromProto(TransportProtos.ApiUsageStateProto proto) {
ApiUsageState apiUsageState = new ApiUsageState(getEntityId(proto.getApiUsageStateIdMSB(), proto.getApiUsageStateIdLSB(), ApiUsageStateId::new));
apiUsageState.setTenantId(getEntityId(proto.getTenantProfileIdMSB(), proto.getTenantProfileIdLSB(), TenantId::new));
apiUsageState.setCreatedTime(proto.getCreatedTime());
apiUsageState.setEntityId(EntityIdFactory.getByTypeAndUuid(fromProto(proto.getEntityType()), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB())));
apiUsageState.setTransportState(ApiUsageStateValue.valueOf(proto.getTransportState()));
apiUsageState.setDbStorageState(ApiUsageStateValue.valueOf(proto.getDbStorageState()));
apiUsageState.setReExecState(ApiUsageStateValue.valueOf(proto.getReExecState()));
apiUsageState.setJsExecState(ApiUsageStateValue.valueOf(proto.getJsExecState()));
apiUsageState.setTbelExecState(ApiUsageStateValue.valueOf(proto.getTbelExecState()));
apiUsageState.setEmailExecState(ApiUsageStateValue.valueOf(proto.getEmailExecState()));
apiUsageState.setSmsExecState(ApiUsageStateValue.valueOf(proto.getSmsExecState()));
apiUsageState.setAlarmExecState(ApiUsageStateValue.valueOf(proto.getAlarmExecState()));
return apiUsageState;
}
public static TransportProtos.RepositorySettingsProto toProto(RepositorySettings repositorySettings) {
var builder = TransportProtos.RepositorySettingsProto.newBuilder()
.setRepositoryUri(repositorySettings.getRepositoryUri())
.setAuthMethod(repositorySettings.getAuthMethod().name())
.setReadOnly(repositorySettings.isReadOnly())
.setShowMergeCommits(repositorySettings.isShowMergeCommits());
if (isNotNull(repositorySettings.getUsername())) {
builder.setUsername(repositorySettings.getUsername());
}
if (isNotNull(repositorySettings.getPassword())) {
builder.setPassword(repositorySettings.getPassword());
}
if (isNotNull(repositorySettings.getPrivateKeyFileName())) {
builder.setPrivateKeyFileName(repositorySettings.getPrivateKeyFileName());
}
if (isNotNull(repositorySettings.getPrivateKey())) {
builder.setPrivateKey(repositorySettings.getPrivateKey());
}
if (isNotNull(repositorySettings.getPrivateKeyPassword())) {
builder.setPrivateKeyPassword(repositorySettings.getPrivateKeyPassword());
}
if (isNotNull(repositorySettings.getDefaultBranch())) {
builder.setDefaultBranch(repositorySettings.getDefaultBranch());
}
return builder.build();
}
public static RepositorySettings fromProto(TransportProtos.RepositorySettingsProto proto) {
RepositorySettings repositorySettings = new RepositorySettings();
repositorySettings.setRepositoryUri(proto.getRepositoryUri());
repositorySettings.setAuthMethod(RepositoryAuthMethod.valueOf(proto.getAuthMethod()));
repositorySettings.setReadOnly(proto.getReadOnly());
repositorySettings.setShowMergeCommits(proto.getShowMergeCommits());
if (proto.hasUsername()) {
repositorySettings.setUsername(proto.getUsername());
}
if (proto.hasPassword()) {
repositorySettings.setPassword(proto.getPassword());
}
if (proto.hasPrivateKeyFileName()) {
repositorySettings.setPrivateKeyFileName(proto.getPrivateKeyFileName());
}
if (proto.hasPrivateKey()) {
repositorySettings.setPrivateKey(proto.getPrivateKey());
}
if (proto.hasPrivateKeyPassword()) {
repositorySettings.setPrivateKeyPassword(proto.getPrivateKeyPassword());
}
if (proto.hasDefaultBranch()) {
repositorySettings.setDefaultBranch(proto.getDefaultBranch());
}
return repositorySettings;
}
public static TransportProtos.DeviceCredentialsProto toProto(DeviceCredentials deviceCredentials) {
TransportProtos.DeviceCredentialsProto.Builder builder = TransportProtos.DeviceCredentialsProto.newBuilder()
.setDeviceIdMSB(getMsb(deviceCredentials.getDeviceId()))
.setDeviceIdLSB(getLsb(deviceCredentials.getDeviceId()))
.setCredentialsId(deviceCredentials.getCredentialsId())
.setCredentialsType(TransportProtos.CredentialsType.valueOf(deviceCredentials.getCredentialsType().name()));
if (deviceCredentials.getCredentialsValue() != null) {
builder.setCredentialsValue(deviceCredentials.getCredentialsValue());
}
return builder.build();
}
public static DeviceCredentials fromProto(TransportProtos.DeviceCredentialsProto proto) {
DeviceCredentials deviceCredentials = new DeviceCredentials();
deviceCredentials.setDeviceId(getEntityId(proto.getDeviceIdMSB(), proto.getDeviceIdLSB(), DeviceId::new));
deviceCredentials.setCredentialsId(proto.getCredentialsId());
deviceCredentials.setCredentialsType(DeviceCredentialsType.valueOf(proto.getCredentialsType().name()));
deviceCredentials.setCredentialsValue(proto.hasCredentialsValue() ? proto.getCredentialsValue() : null);
return deviceCredentials;
}
public static <T> TransportProtos.EntityUpdateMsg toEntityUpdateProto(T entity) {
var builder = TransportProtos.EntityUpdateMsg.newBuilder();
if (entity instanceof Device) {
builder.setDevice(toProto((Device) entity));
} else if (entity instanceof DeviceProfile) {
builder.setDeviceProfile(toProto((DeviceProfile) entity));
} else if (entity instanceof Tenant) {
builder.setTenant(toProto((Tenant) entity));
} else if (entity instanceof TenantProfile) {
builder.setTenantProfile(toProto((TenantProfile) entity));
} else if (entity instanceof ApiUsageState) {
builder.setApiUsageState(toProto((ApiUsageState) entity));
} else {
log.warn("[{}] entity does not support toProto serialization .", entity.getClass().getSimpleName());
}
return builder.build();
}
public static TransportProtos.DeviceInfoProto toDeviceInfoProto(Device device) throws JsonProcessingException {
TransportProtos.DeviceInfoProto.Builder builder = TransportProtos.DeviceInfoProto.newBuilder()
.setTenantIdMSB(device.getTenantId().getId().getMostSignificantBits())
@ -657,36 +1040,29 @@ public class ProtoUtils {
return builder.build();
}
private static boolean isNotNull(Object obj) {
return obj != null;
}
private static <T extends EntityId> T getEntityId(long msb, long lsb, Function<UUID, T> entityId) {
if (msb != 0 || lsb != 0) {
return entityId.apply(new UUID(msb, lsb));
}
return null;
}
private static String toProtoString(String str) {
return str != null ? str : "";
}
private static String fromProtoString(String str) {
return StringUtils.isNotEmpty(str) ? str : null;
return entityId.apply(new UUID(msb, lsb));
}
private static Long getMsb(EntityId entityId) {
if (entityId != null) {
if (isNotNull(entityId)) {
return entityId.getId().getMostSignificantBits();
}
return 0L;
}
private static Long getLsb(EntityId entityId) {
if (entityId != null) {
if (isNotNull(entityId)) {
return entityId.getId().getLeastSignificantBits();
}
return 0L;
}
private static Long checkLong(Long l) {
return l != null ? l : 0;
return isNotNull(l) ? l : 0;
}
}

View File

@ -190,20 +190,20 @@ message DeviceProto {
int64 deviceIdLSB = 4;
int64 createdTime = 5;
string deviceName = 6;
string deviceLabel = 7;
optional string deviceLabel = 7;
string deviceType = 8;
string additionalInfo = 9;
optional string additionalInfo = 9;
int64 deviceProfileIdMSB = 10;
int64 deviceProfileIdLSB = 11;
int64 customerIdMSB = 12;
int64 customerIdLSB = 13;
bytes deviceData = 14;
int64 firmwareIdMSB = 15;
int64 firmwareIdLSB = 16;
int64 softwareIdMSB = 17;
int64 softwareIdLSB = 18;
int64 externalIdMSB = 19;
int64 externalIdLSB = 20;
optional int64 customerIdMSB = 12;
optional int64 customerIdLSB = 13;
optional bytes deviceData = 14;
optional int64 firmwareIdMSB = 15;
optional int64 firmwareIdLSB = 16;
optional int64 softwareIdMSB = 17;
optional int64 softwareIdLSB = 18;
optional int64 externalIdMSB = 19;
optional int64 externalIdLSB = 20;
}
message DeviceProfileProto {
@ -213,27 +213,108 @@ message DeviceProfileProto {
int64 deviceProfileIdLSB = 4;
int64 createdTime = 5;
string name = 6;
string description = 7;
string image = 8;
optional string description = 7;
optional string image = 8;
bool isDefault = 9;
string type = 10;
string transportType = 11;
string provisionType = 12;
int64 defaultRuleChainIdMSB = 13;
int64 defaultRuleChainIdLSB = 14;
int64 defaultDashboardIdMSB = 15;
int64 defaultDashboardIdLSB = 16;
string defaultQueueName = 17;
bytes deviceProfileData = 18;
string provisionDeviceKey = 19;
int64 firmwareIdMSB = 20;
int64 firmwareIdLSB = 21;
int64 softwareIdMSB = 22;
int64 softwareIdLSB = 23;
int64 defaultEdgeRuleChainIdMSB = 24;
int64 defaultEdgeRuleChainIdLSB = 25;
int64 externalIdMSB = 26;
int64 externalIdLSB = 27;
optional int64 defaultRuleChainIdMSB = 13;
optional int64 defaultRuleChainIdLSB = 14;
optional int64 defaultDashboardIdMSB = 15;
optional int64 defaultDashboardIdLSB = 16;
optional string defaultQueueName = 17;
optional bytes deviceProfileData = 18;
optional string provisionDeviceKey = 19;
optional int64 firmwareIdMSB = 20;
optional int64 firmwareIdLSB = 21;
optional int64 softwareIdMSB = 22;
optional int64 softwareIdLSB = 23;
optional int64 defaultEdgeRuleChainIdMSB = 24;
optional int64 defaultEdgeRuleChainIdLSB = 25;
optional int64 externalIdMSB = 26;
optional int64 externalIdLSB = 27;
}
message TenantProto {
int64 tenantIdMSB = 1;
int64 tenantIdLSB = 2;
int64 createdTime = 3;
string title = 4;
optional string region = 5;
int64 tenantProfileIdMSB = 6;
int64 tenantProfileIdLSB = 7;
optional string country = 8;
optional string state = 9;
optional string city = 10;
optional string address = 11;
optional string address2 = 12;
optional string zip = 13;
optional string phone = 14;
optional string email = 15;
optional string additionalInfo = 16;
}
message TenantProfileProto {
int64 tenantProfileIdMSB = 1;
int64 tenantProfileIdLSB = 2;
int64 createdTime = 3;
string name = 4;
optional string description = 5;
bool isDefault = 6;
bool isolatedTbRuleEngine = 7;
optional bytes profileData = 8;
}
message TbResourceProto {
int64 tenantIdMSB = 1;
int64 tenantIdLSB = 2;
int64 resourceIdMSB = 3;
int64 resourceIdLSB = 4;
int64 createdTime = 5;
string title = 6;
string resourceType = 7;
string resourceKey = 8;
string searchText = 9;
optional string etag = 10;
string fileName = 11;
optional string resourceDescriptor = 12;
optional int64 externalIdMSB = 13;
optional int64 externalIdLSB = 14;
optional bytes data = 15;
optional bytes preview = 16;
}
message ApiUsageStateProto {
int64 tenantProfileIdMSB = 1;
int64 tenantProfileIdLSB = 2;
int64 apiUsageStateIdMSB = 3;
int64 apiUsageStateIdLSB = 4;
int64 createdTime = 5;
EntityTypeProto entityType = 6;
int64 entityIdMSB = 7;
int64 entityIdLSB = 8;
string transportState = 9;
string dbStorageState = 10;
string reExecState = 11;
string jsExecState = 12;
string tbelExecState = 13;
string emailExecState = 14;
string smsExecState = 15;
string alarmExecState = 16;
}
message RepositorySettingsProto {
string repositoryUri = 1;
string authMethod = 2;
optional string username = 3;
optional string password = 4;
optional string privateKeyFileName = 5;
optional string privateKey = 6;
optional string privateKeyPassword = 7;
optional string defaultBranch = 8;
bool readOnly = 9;
bool showMergeCommits = 10;
}
/**
@ -295,7 +376,7 @@ message ValidateBasicMqttCredRequestMsg {
message ValidateDeviceCredentialsResponseMsg {
DeviceInfoProto deviceInfo = 1;
string credentialsBody = 2;
bytes profileBody = 3;
DeviceProfileProto deviceProfile = 3;
}
message GetOrCreateDeviceFromGatewayRequestMsg {
@ -307,7 +388,7 @@ message GetOrCreateDeviceFromGatewayRequestMsg {
message GetOrCreateDeviceFromGatewayResponseMsg {
DeviceInfoProto deviceInfo = 1;
bytes profileBody = 2;
DeviceProfileProto deviceProfile = 2;
TransportApiRequestErrorCode error = 3;
}
@ -390,7 +471,7 @@ message GetResourceRequestMsg {
}
message GetResourceResponseMsg {
bytes resource = 1;
TbResourceProto resource = 1;
}
message ValidateDeviceLwM2MCredentialsRequestMsg {
@ -418,8 +499,11 @@ message GetDeviceProfileRequestMsg {
message GetEntityProfileResponseMsg {
string entityType = 1;
bytes data = 2;
bytes apiState = 3;
oneof data {
TenantProfileProto tenantProfile = 2;
DeviceProfileProto deviceProfile = 3;
}
ApiUsageStateProto apiState = 4;
}
message GetDeviceRequestMsg {
@ -430,6 +514,7 @@ message GetDeviceRequestMsg {
message GetDeviceResponseMsg {
int64 deviceProfileIdMSB = 1;
int64 deviceProfileIdLSB = 2;
//Json
bytes deviceTransportConfiguration = 3;
}
@ -439,7 +524,7 @@ message GetDeviceCredentialsRequestMsg {
}
message GetDeviceCredentialsResponseMsg {
bytes deviceCredentialsData = 1;
DeviceCredentialsProto deviceCredentialsData = 1;
}
message GetSnmpDevicesRequestMsg {
@ -453,8 +538,13 @@ message GetSnmpDevicesResponseMsg {
}
message EntityUpdateMsg {
string entityType = 1;
bytes data = 2;
oneof entityUpdate {
TenantProto tenant = 1;
TenantProfileProto tenantProfile = 2;
DeviceProto device = 3;
DeviceProfileProto deviceProfile = 4;
ApiUsageStateProto apiUsageState = 5;
}
}
message EntityDeleteMsg {
@ -1242,7 +1332,7 @@ message ToVersionControlServiceMsg {
int64 tenantIdLSB = 3;
int64 requestIdMSB = 4;
int64 requestIdLSB = 5;
bytes vcSettings = 6;
RepositorySettingsProto vcSettings = 6;
GenericRepositoryRequestMsg initRepositoryRequest = 7;
GenericRepositoryRequestMsg testRepositoryRequest = 8;
GenericRepositoryRequestMsg clearRepositoryRequest = 9;
@ -1326,21 +1416,17 @@ message ToCoreMsg {
message ToCoreNotificationMsg {
LocalSubscriptionServiceMsgProto toLocalSubscriptionServiceMsg = 1;
FromDeviceRPCResponseProto fromDeviceRpcResponse = 2;
bytes componentLifecycleMsg = 3 [deprecated = true];
bytes edgeEventUpdateMsg = 4 [deprecated = true];
QueueUpdateMsg queueUpdateMsg = 5;
QueueDeleteMsg queueDeleteMsg = 6;
VersionControlResponseMsg vcResponseMsg = 7;
bytes toEdgeSyncRequestMsg = 8 [deprecated = true];
bytes fromEdgeSyncResponseMsg = 9 [deprecated = true];
SubscriptionMgrMsgProto toSubscriptionMgrMsg = 10;
NotificationRuleProcessorMsg notificationRuleProcessorMsg = 11;
ComponentLifecycleMsgProto componentLifecycle = 12;
CoreStartupMsg coreStartupMsg = 13;
EdgeEventUpdateMsgProto edgeEventUpdate = 14;
ToEdgeSyncRequestMsgProto toEdgeSyncRequest = 15;
FromEdgeSyncResponseMsgProto fromEdgeSyncResponse = 16;
ResourceCacheInvalidateMsg resourceCacheInvalidateMsg = 17;
QueueUpdateMsg queueUpdateMsg = 3;
QueueDeleteMsg queueDeleteMsg = 4;
VersionControlResponseMsg vcResponseMsg = 5;
SubscriptionMgrMsgProto toSubscriptionMgrMsg = 6;
NotificationRuleProcessorMsg notificationRuleProcessorMsg = 7;
ComponentLifecycleMsgProto componentLifecycle = 8;
CoreStartupMsg coreStartupMsg = 9;
EdgeEventUpdateMsgProto edgeEventUpdate = 10;
ToEdgeSyncRequestMsgProto toEdgeSyncRequest = 11;
FromEdgeSyncResponseMsgProto fromEdgeSyncResponse = 12;
ResourceCacheInvalidateMsg resourceCacheInvalidateMsg = 13;
}
/* Messages that are handled by ThingsBoard RuleEngine Service */
@ -1353,7 +1439,6 @@ message ToRuleEngineMsg {
}
message ToRuleEngineNotificationMsg {
bytes componentLifecycleMsg = 1 [deprecated = true];
FromDeviceRPCResponseProto fromDeviceRpcResponse = 2;
QueueUpdateMsg queueUpdateMsg = 3;
QueueDeleteMsg queueDeleteMsg = 4;
@ -1416,6 +1501,7 @@ message NotificationSchedulerServiceMsg {
}
message NotificationRuleProcessorMsg {
//Json
bytes trigger = 1;
}

View File

@ -20,16 +20,16 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.notification.rule.trigger.NotificationRuleTrigger;
import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
import java.util.UUID;
@ -43,7 +43,6 @@ public class RemoteNotificationRuleProcessor implements NotificationRuleProcesso
private final TbQueueProducerProvider producerProvider;
private final TopicService topicService;
private final PartitionService partitionService;
private final DataDecodingEncodingService encodingService;
@Override
public void process(NotificationRuleTrigger trigger) {
@ -54,7 +53,7 @@ public class RemoteNotificationRuleProcessor implements NotificationRuleProcesso
log.debug("Submitting notification rule trigger: {}", trigger);
TransportProtos.NotificationRuleProcessorMsg.Builder msg = TransportProtos.NotificationRuleProcessorMsg.newBuilder()
.setTrigger(ByteString.copyFrom(encodingService.encode(trigger)));
.setTrigger(ByteString.copyFrom(JacksonUtil.writeValueAsBytes(trigger)));
partitionService.getAllServiceIds(ServiceType.TB_CORE).stream().findAny().ifPresent(serviceId -> {
TopicPartitionInfo tpi = topicService.getNotificationsTopic(ServiceType.TB_CORE, serviceId);

View File

@ -1,27 +0,0 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.queue.util;
import java.util.Optional;
public interface DataDecodingEncodingService {
<T> Optional<T> decode(byte[] byteArray);
<T> byte[] encode(T msq);
}

View File

@ -1,63 +0,0 @@
/**
* Copyright © 2016-2023 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.queue.util;
import lombok.extern.slf4j.Slf4j;
import org.nustaq.serialization.FSTConfiguration;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.FSTUtils;
import org.thingsboard.server.common.data.FstStatsService;
import java.util.Optional;
@Slf4j
@Service
public class ProtoWithFSTService implements DataDecodingEncodingService {
@Autowired
private FstStatsService fstStatsService;
public static final FSTConfiguration CONFIG = FSTConfiguration.createDefaultConfiguration();
@Override
public <T> Optional<T> decode(byte[] byteArray) {
try {
long startTime = System.nanoTime();
Optional<T> optional = Optional.ofNullable(FSTUtils.decode(byteArray));
optional.ifPresent(obj -> {
fstStatsService.recordDecodeTime(obj.getClass(), startTime);
fstStatsService.incrementDecode(obj.getClass());
});
return optional;
} catch (IllegalArgumentException e) {
log.error("Error during deserialization message, [{}]", e.getMessage());
return Optional.empty();
}
}
@Override
public <T> byte[] encode(T msq) {
long startTime = System.nanoTime();
var bytes = FSTUtils.encode(msq);
fstStatsService.recordEncodeTime(msq.getClass(), startTime);
fstStatsService.incrementEncode(msq.getClass());
return bytes;
}
}

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.transport.snmp.service;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.device.data.DeviceData;
import org.thingsboard.server.common.data.device.data.DeviceTransportConfiguration;
@ -24,8 +25,8 @@ import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.security.DeviceCredentials;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.util.ProtoUtils;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
import org.thingsboard.server.queue.util.TbSnmpTransportComponent;
import java.util.UUID;
@ -35,7 +36,6 @@ import java.util.UUID;
@RequiredArgsConstructor
public class ProtoTransportEntityService {
private final TransportService transportService;
private final DataDecodingEncodingService dataDecodingEncodingService;
public Device getDeviceById(DeviceId id) {
TransportProtos.GetDeviceResponseMsg deviceProto = transportService.getDevice(TransportProtos.GetDeviceRequestMsg.newBuilder()
@ -55,9 +55,8 @@ public class ProtoTransportEntityService {
device.setId(id);
device.setDeviceProfileId(deviceProfileId);
DeviceTransportConfiguration deviceTransportConfiguration = (DeviceTransportConfiguration) dataDecodingEncodingService.decode(
deviceProto.getDeviceTransportConfiguration().toByteArray()
).orElseThrow(() -> new IllegalStateException("Can't find device transport configuration"));
DeviceTransportConfiguration deviceTransportConfiguration = JacksonUtil.fromBytes(
deviceProto.getDeviceTransportConfiguration().toByteArray(), DeviceTransportConfiguration.class);
DeviceData deviceData = new DeviceData();
deviceData.setTransportConfiguration(deviceTransportConfiguration);
@ -74,8 +73,11 @@ public class ProtoTransportEntityService {
.build()
);
return (DeviceCredentials) dataDecodingEncodingService.decode(deviceCredentialsResponse.getDeviceCredentialsData().toByteArray())
.orElseThrow(() -> new IllegalArgumentException("Device credentials not found"));
if (deviceCredentialsResponse.hasDeviceCredentialsData()) {
return ProtoUtils.fromProto(deviceCredentialsResponse.getDeviceCredentialsData());
} else {
throw new IllegalArgumentException("Device credentials not found");
}
}
public TransportProtos.GetSnmpDevicesResponseMsg getSnmpDevicesIds(int page, int pageSize) {

View File

@ -15,19 +15,19 @@
*/
package org.thingsboard.server.common.transport;
import com.google.protobuf.ByteString;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.gen.transport.TransportProtos;
public interface TransportDeviceProfileCache {
DeviceProfile getOrCreate(DeviceProfileId id, ByteString profileBody);
DeviceProfile getOrCreate(DeviceProfileId id, TransportProtos.DeviceProfileProto proto);
DeviceProfile get(DeviceProfileId id);
void put(DeviceProfile profile);
DeviceProfile put(ByteString profileBody);
DeviceProfile put(TransportProtos.DeviceProfileProto proto);
void evict(DeviceProfileId id);

View File

@ -15,11 +15,11 @@
*/
package org.thingsboard.server.common.transport;
import com.google.protobuf.ByteString;
import org.thingsboard.server.common.data.TenantProfile;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.TenantProfileId;
import org.thingsboard.server.common.transport.profile.TenantProfileUpdateResult;
import org.thingsboard.server.gen.transport.TransportProtos;
import java.util.Set;
@ -27,7 +27,7 @@ public interface TransportTenantProfileCache {
TenantProfile get(TenantId tenantId);
TenantProfileUpdateResult put(ByteString profileBody);
TenantProfileUpdateResult put(TransportProtos.TenantProfileProto proto);
boolean put(TenantId tenantId, TenantProfileId profileId);

View File

@ -15,7 +15,6 @@
*/
package org.thingsboard.server.common.transport.service;
import com.google.protobuf.ByteString;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
@ -25,11 +24,10 @@ import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.transport.TransportDeviceProfileCache;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.util.ProtoUtils;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
import org.thingsboard.server.queue.util.TbTransportComponent;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
@ -42,7 +40,6 @@ public class DefaultTransportDeviceProfileCache implements TransportDeviceProfil
private final Lock deviceProfileFetchLock = new ReentrantLock();
private final ConcurrentMap<DeviceProfileId, DeviceProfile> deviceProfiles = new ConcurrentHashMap<>();
private final DataDecodingEncodingService dataDecodingEncodingService;
private TransportService transportService;
@ -52,19 +49,12 @@ public class DefaultTransportDeviceProfileCache implements TransportDeviceProfil
this.transportService = transportService;
}
public DefaultTransportDeviceProfileCache(DataDecodingEncodingService dataDecodingEncodingService) {
this.dataDecodingEncodingService = dataDecodingEncodingService;
}
@Override
public DeviceProfile getOrCreate(DeviceProfileId id, ByteString profileBody) {
public DeviceProfile getOrCreate(DeviceProfileId id, TransportProtos.DeviceProfileProto proto) {
DeviceProfile profile = deviceProfiles.get(id);
if (profile == null) {
Optional<DeviceProfile> deviceProfile = dataDecodingEncodingService.decode(profileBody.toByteArray());
if (deviceProfile.isPresent()) {
profile = deviceProfile.get();
deviceProfiles.put(id, profile);
}
profile = ProtoUtils.fromProto(proto);
deviceProfiles.put(id, profile);
}
return profile;
}
@ -80,14 +70,10 @@ public class DefaultTransportDeviceProfileCache implements TransportDeviceProfil
}
@Override
public DeviceProfile put(ByteString profileBody) {
Optional<DeviceProfile> deviceProfile = dataDecodingEncodingService.decode(profileBody.toByteArray());
if (deviceProfile.isPresent()) {
put(deviceProfile.get());
return deviceProfile.get();
} else {
return null;
}
public DeviceProfile put(TransportProtos.DeviceProfileProto proto) {
DeviceProfile deviceProfile = ProtoUtils.fromProto(proto);
put(deviceProfile);
return deviceProfile;
}
@Override
@ -107,14 +93,8 @@ public class DefaultTransportDeviceProfileCache implements TransportDeviceProfil
.setEntityIdLSB(id.getId().getLeastSignificantBits())
.build();
TransportProtos.GetEntityProfileResponseMsg entityProfileMsg = transportService.getEntityProfile(msg);
Optional<DeviceProfile> profileOpt = dataDecodingEncodingService.decode(entityProfileMsg.getData().toByteArray());
if (profileOpt.isPresent()) {
profile = profileOpt.get();
this.put(profile);
} else {
log.warn("[{}] Can't find device profile: {}", id, entityProfileMsg.getData());
throw new RuntimeException("Can't find device profile!");
}
profile = ProtoUtils.fromProto(entityProfileMsg.getDeviceProfile());
this.put(profile);
} finally {
deviceProfileFetchLock.unlock();
}

View File

@ -16,6 +16,7 @@
package org.thingsboard.server.common.transport.service;
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
@ -24,8 +25,8 @@ import org.thingsboard.server.common.data.TbResource;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.transport.TransportResourceCache;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.util.ProtoUtils;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
import org.thingsboard.server.queue.util.TbTransportComponent;
import java.util.Optional;
@ -39,19 +40,15 @@ import java.util.concurrent.locks.ReentrantLock;
@Slf4j
@Component
@TbTransportComponent
@RequiredArgsConstructor
public class DefaultTransportResourceCache implements TransportResourceCache {
private final Lock resourceFetchLock = new ReentrantLock();
private final ConcurrentMap<ResourceCompositeKey, TbResource> resources = new ConcurrentHashMap<>();
private final Set<ResourceCompositeKey> keys = ConcurrentHashMap.newKeySet();
private final DataDecodingEncodingService dataDecodingEncodingService;
@Lazy
private final TransportService transportService;
public DefaultTransportResourceCache(DataDecodingEncodingService dataDecodingEncodingService, @Lazy TransportService transportService) {
this.dataDecodingEncodingService = dataDecodingEncodingService;
this.transportService = transportService;
}
@Override
public Optional<TbResource> get(TenantId tenantId, ResourceType resourceType, String resourceKey) {
ResourceCompositeKey compositeKey = new ResourceCompositeKey(tenantId, resourceType, resourceKey);
@ -92,9 +89,8 @@ public class DefaultTransportResourceCache implements TransportResourceCache {
.setResourceKey(compositeKey.resourceKey);
TransportProtos.GetResourceResponseMsg responseMsg = transportService.getResource(builder.build());
Optional<TbResource> optionalResource = dataDecodingEncodingService.decode(responseMsg.getResource().toByteArray());
if (optionalResource.isPresent()) {
TbResource resource = optionalResource.get();
if (responseMsg.hasResource()) {
TbResource resource = ProtoUtils.fromProto(responseMsg.getResource());
resources.put(new ResourceCompositeKey(resource.getTenantId(), resource.getResourceType(), resource.getResourceKey()), resource);
return resource;
}

View File

@ -20,7 +20,6 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.protobuf.ByteString;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.beans.factory.annotation.Autowired;
@ -50,9 +49,9 @@ import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.TenantProfileId;
import org.thingsboard.server.common.data.limit.LimitedApi;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.notification.rule.trigger.RateLimitsTrigger;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.rpc.RpcStatus;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
@ -80,6 +79,7 @@ import org.thingsboard.server.common.transport.limits.EntityLimitKey;
import org.thingsboard.server.common.transport.limits.EntityLimitsCache;
import org.thingsboard.server.common.transport.limits.TransportRateLimitService;
import org.thingsboard.server.common.transport.util.JsonUtils;
import org.thingsboard.server.common.util.ProtoUtils;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.ProvisionDeviceRequestMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ProvisionDeviceResponseMsg;
@ -96,14 +96,13 @@ import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.TbQueueRequestTemplate;
import org.thingsboard.server.queue.common.AsyncCallbackTemplate;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
import org.thingsboard.server.queue.provider.TbTransportQueueFactory;
import org.thingsboard.server.queue.scheduler.SchedulerComponent;
import org.thingsboard.server.queue.util.AfterStartUp;
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
import org.thingsboard.server.queue.util.TbTransportComponent;
import javax.annotation.PostConstruct;
@ -163,7 +162,6 @@ public class DefaultTransportService implements TransportService {
@Value("${transport.stats.enabled:false}")
private boolean statsEnabled;
@Autowired
@Lazy
private TbApiUsageReportClient apiUsageClient;
@ -181,7 +179,6 @@ public class DefaultTransportService implements TransportService {
private final TransportTenantProfileCache tenantProfileCache;
private final TransportRateLimitService rateLimitService;
private final DataDecodingEncodingService dataDecodingEncodingService;
private final SchedulerComponent scheduler;
private final ApplicationEventPublisher eventPublisher;
private final TransportResourceCache transportResourceCache;
@ -216,7 +213,7 @@ public class DefaultTransportService implements TransportService {
TransportDeviceProfileCache deviceProfileCache,
TransportTenantProfileCache tenantProfileCache,
TransportRateLimitService rateLimitService,
DataDecodingEncodingService dataDecodingEncodingService, SchedulerComponent scheduler, TransportResourceCache transportResourceCache,
SchedulerComponent scheduler, TransportResourceCache transportResourceCache,
ApplicationEventPublisher eventPublisher, NotificationRuleProcessor notificationRuleProcessor,
EntityLimitsCache entityLimitsCache) {
this.partitionService = partitionService;
@ -228,7 +225,6 @@ public class DefaultTransportService implements TransportService {
this.deviceProfileCache = deviceProfileCache;
this.tenantProfileCache = tenantProfileCache;
this.rateLimitService = rateLimitService;
this.dataDecodingEncodingService = dataDecodingEncodingService;
this.scheduler = scheduler;
this.transportResourceCache = transportResourceCache;
this.eventPublisher = eventPublisher;
@ -430,9 +426,8 @@ public class DefaultTransportService implements TransportService {
result.credentials(msg.getCredentialsBody());
TransportDeviceInfo tdi = getTransportDeviceInfo(msg.getDeviceInfo());
result.deviceInfo(tdi);
ByteString profileBody = msg.getProfileBody();
if (!profileBody.isEmpty()) {
DeviceProfile profile = deviceProfileCache.getOrCreate(tdi.getDeviceProfileId(), profileBody);
if (msg.hasDeviceProfile()) {
DeviceProfile profile = deviceProfileCache.getOrCreate(tdi.getDeviceProfileId(), msg.getDeviceProfile());
result.deviceProfile(profile);
}
}
@ -464,9 +459,8 @@ public class DefaultTransportService implements TransportService {
result.credentials(msg.getCredentialsBody());
TransportDeviceInfo tdi = getTransportDeviceInfo(msg.getDeviceInfo());
result.deviceInfo(tdi);
ByteString profileBody = msg.getProfileBody();
if (!profileBody.isEmpty()) {
DeviceProfile profile = deviceProfileCache.getOrCreate(tdi.getDeviceProfileId(), profileBody);
if (msg.hasDeviceProfile()) {
DeviceProfile profile = deviceProfileCache.getOrCreate(tdi.getDeviceProfileId(), msg.getDeviceProfile());
if (transportType != DeviceTransportType.DEFAULT
&& profile != null && profile.getTransportType() != DeviceTransportType.DEFAULT && profile.getTransportType() != transportType) {
log.debug("[{}] Device profile [{}] has different transport type: {}, expected: {}", tdi.getDeviceId(), tdi.getDeviceProfileId(), profile.getTransportType(), transportType);
@ -480,7 +474,6 @@ public class DefaultTransportService implements TransportService {
AsyncCallbackTemplate.withCallback(response, callback::onSuccess, callback::onError, transportCallbackExecutor);
}
@Override
public void process(TenantId tenantId, TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg requestMsg, TransportServiceCallback<GetOrCreateDeviceFromGatewayResponse> callback) {
TbProtoQueueMsg<TransportApiRequestMsg> protoMsg = new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setGetOrCreateDeviceRequestMsg(requestMsg).build());
@ -495,9 +488,8 @@ public class DefaultTransportService implements TransportService {
if (msg.hasDeviceInfo()) {
TransportDeviceInfo tdi = getTransportDeviceInfo(msg.getDeviceInfo());
result.deviceInfo(tdi);
ByteString profileBody = msg.getProfileBody();
if (!profileBody.isEmpty()) {
result.deviceProfile(deviceProfileCache.getOrCreate(tdi.getDeviceProfileId(), profileBody));
if (msg.hasDeviceProfile()) {
result.deviceProfile(deviceProfileCache.getOrCreate(tdi.getDeviceProfileId(), msg.getDeviceProfile()));
}
} else if (TransportProtos.TransportApiRequestErrorCode.ENTITY_LIMIT.equals(msg.getError())) {
entityLimitsCache.put(key, true);
@ -987,37 +979,7 @@ public class DefaultTransportService implements TransportService {
} else {
log.trace("Processing broadcast notification: {}", toSessionMsg);
if (toSessionMsg.hasEntityUpdateMsg()) {
TransportProtos.EntityUpdateMsg msg = toSessionMsg.getEntityUpdateMsg();
EntityType entityType = EntityType.valueOf(msg.getEntityType());
if (EntityType.DEVICE_PROFILE.equals(entityType)) {
DeviceProfile deviceProfile = deviceProfileCache.put(msg.getData());
if (deviceProfile != null) {
log.debug("On device profile update: {}", deviceProfile);
onProfileUpdate(deviceProfile);
}
} else if (EntityType.TENANT_PROFILE.equals(entityType)) {
rateLimitService.update(tenantProfileCache.put(msg.getData()));
} else if (EntityType.TENANT.equals(entityType)) {
Optional<Tenant> profileOpt = dataDecodingEncodingService.decode(msg.getData().toByteArray());
if (profileOpt.isPresent()) {
Tenant tenant = profileOpt.get();
partitionService.removeTenant(tenant.getId());
boolean updated = tenantProfileCache.put(tenant.getId(), tenant.getTenantProfileId());
if (updated) {
rateLimitService.update(tenant.getId());
}
}
} else if (EntityType.API_USAGE_STATE.equals(entityType)) {
Optional<ApiUsageState> stateOpt = dataDecodingEncodingService.decode(msg.getData().toByteArray());
if (stateOpt.isPresent()) {
ApiUsageState apiUsageState = stateOpt.get();
rateLimitService.update(apiUsageState.getTenantId(), apiUsageState.isTransportEnabled());
//TODO: if transport is disabled, we should close all sessions and not to check credentials.
}
} else if (EntityType.DEVICE.equals(entityType)) {
Optional<Device> deviceOpt = dataDecodingEncodingService.decode(msg.getData().toByteArray());
deviceOpt.ifPresent(this::onDeviceUpdate);
}
onEntityUpdate(toSessionMsg.getEntityUpdateMsg());
} else if (toSessionMsg.hasEntityDeleteMsg()) {
TransportProtos.EntityDeleteMsg msg = toSessionMsg.getEntityDeleteMsg();
EntityType entityType = EntityType.valueOf(msg.getEntityType());
@ -1086,6 +1048,37 @@ public class DefaultTransportService implements TransportService {
eventPublisher.publishEvent(new DeviceProfileUpdatedEvent(deviceProfile));
}
private void onEntityUpdate(TransportProtos.EntityUpdateMsg msg) {
switch (msg.getEntityUpdateCase()) {
case DEVICEPROFILE:
DeviceProfile deviceProfile = deviceProfileCache.put(msg.getDeviceProfile());
log.debug("On device profile update: {}", deviceProfile);
onProfileUpdate(deviceProfile);
break;
case TENANTPROFILE:
rateLimitService.update(tenantProfileCache.put(msg.getTenantProfile()));
break;
case TENANT:
Tenant tenant = ProtoUtils.fromProto(msg.getTenant());
partitionService.removeTenant(tenant.getId());
boolean updated = tenantProfileCache.put(tenant.getId(), tenant.getTenantProfileId());
if (updated) {
rateLimitService.update(tenant.getId());
}
break;
case APIUSAGESTATE:
ApiUsageState apiUsageState = ProtoUtils.fromProto(msg.getApiUsageState());
rateLimitService.update(apiUsageState.getTenantId(), apiUsageState.isTransportEnabled());
//TODO: if transport is disabled, we should close all sessions and not to check credentials.
break;
case DEVICE:
onDeviceUpdate(ProtoUtils.fromProto(msg.getDevice()));
break;
default:
log.warn("UNKNOWN entity update type: [{}]", msg.getEntityUpdateCase());
}
}
private void onDeviceUpdate(Device device) {
long deviceIdMSB = device.getId().getId().getMostSignificantBits();
long deviceIdLSB = device.getId().getId().getLeastSignificantBits();

View File

@ -15,7 +15,6 @@
*/
package org.thingsboard.server.common.transport.service;
import com.google.protobuf.ByteString;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
@ -29,12 +28,11 @@ import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportTenantProfileCache;
import org.thingsboard.server.common.transport.limits.TransportRateLimitService;
import org.thingsboard.server.common.transport.profile.TenantProfileUpdateResult;
import org.thingsboard.server.common.util.ProtoUtils;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
import org.thingsboard.server.queue.util.TbTransportComponent;
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -50,7 +48,6 @@ public class DefaultTransportTenantProfileCache implements TransportTenantProfil
private final ConcurrentMap<TenantProfileId, TenantProfile> profiles = new ConcurrentHashMap<>();
private final ConcurrentMap<TenantId, TenantProfileId> tenantIds = new ConcurrentHashMap<>();
private final ConcurrentMap<TenantProfileId, Set<TenantId>> tenantProfileIds = new ConcurrentHashMap<>();
private final DataDecodingEncodingService dataDecodingEncodingService;
private TransportRateLimitService rateLimitService;
private TransportService transportService;
@ -67,28 +64,18 @@ public class DefaultTransportTenantProfileCache implements TransportTenantProfil
this.transportService = transportService;
}
public DefaultTransportTenantProfileCache(DataDecodingEncodingService dataDecodingEncodingService) {
this.dataDecodingEncodingService = dataDecodingEncodingService;
}
@Override
public TenantProfile get(TenantId tenantId) {
return getTenantProfile(tenantId);
}
@Override
public TenantProfileUpdateResult put(ByteString profileBody) {
Optional<TenantProfile> profileOpt = dataDecodingEncodingService.decode(profileBody.toByteArray());
if (profileOpt.isPresent()) {
TenantProfile newProfile = profileOpt.get();
log.trace("[{}] put: {}", newProfile.getId(), newProfile);
profiles.put(newProfile.getId(), newProfile);
Set<TenantId> affectedTenants = tenantProfileIds.get(newProfile.getId());
return new TenantProfileUpdateResult(newProfile, affectedTenants != null ? affectedTenants : Collections.emptySet());
} else {
log.warn("Failed to decode profile: {}", profileBody.toString());
return new TenantProfileUpdateResult(null, Collections.emptySet());
}
public TenantProfileUpdateResult put(TransportProtos.TenantProfileProto proto) {
TenantProfile profile = ProtoUtils.fromProto(proto);
log.trace("[{}] put: {}", profile.getId(), profile);
profiles.put(profile.getId(), profile);
Set<TenantId> affectedTenants = tenantProfileIds.get(profile.getId());
return new TenantProfileUpdateResult(profile, affectedTenants != null ? affectedTenants : Collections.emptySet());
}
@Override
@ -135,23 +122,17 @@ public class DefaultTransportTenantProfileCache implements TransportTenantProfil
.setEntityIdLSB(tenantId.getId().getLeastSignificantBits())
.build();
TransportProtos.GetEntityProfileResponseMsg entityProfileMsg = transportService.getEntityProfile(msg);
Optional<TenantProfile> profileOpt = dataDecodingEncodingService.decode(entityProfileMsg.getData().toByteArray());
if (profileOpt.isPresent()) {
profile = profileOpt.get();
TenantProfile existingProfile = profiles.get(profile.getId());
if (existingProfile != null) {
profile = existingProfile;
} else {
profiles.put(profile.getId(), profile);
}
tenantProfileIds.computeIfAbsent(profile.getId(), id -> ConcurrentHashMap.newKeySet()).add(tenantId);
tenantIds.put(tenantId, profile.getId());
profile = ProtoUtils.fromProto(entityProfileMsg.getTenantProfile());
TenantProfile existingProfile = profiles.get(profile.getId());
if (existingProfile != null) {
profile = existingProfile;
} else {
log.warn("[{}] Can't decode tenant profile: {}", tenantId, entityProfileMsg.getData());
throw new RuntimeException("Can't decode tenant profile!");
profiles.put(profile.getId(), profile);
}
Optional<ApiUsageState> apiStateOpt = dataDecodingEncodingService.decode(entityProfileMsg.getApiState().toByteArray());
apiStateOpt.ifPresent(apiUsageState -> rateLimitService.update(tenantId, apiUsageState.isTransportEnabled()));
tenantProfileIds.computeIfAbsent(profile.getId(), id -> ConcurrentHashMap.newKeySet()).add(tenantId);
tenantIds.put(tenantId, profile.getId());
ApiUsageState apiUsageState = ProtoUtils.fromProto(entityProfileMsg.getApiState());
rateLimitService.update(tenantId, apiUsageState.isTransportEnabled());
}
} finally {
tenantProfileFetchLock.unlock();

View File

@ -36,12 +36,12 @@ import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.page.SortOrder;
import org.thingsboard.server.common.data.sync.vc.RepositorySettings;
import org.thingsboard.server.common.data.sync.vc.VersionCreationResult;
import org.thingsboard.server.common.data.sync.vc.VersionedEntityInfo;
import org.thingsboard.server.common.data.util.CollectionsUtil;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.common.util.ProtoUtils;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.AddMsg;
import org.thingsboard.server.gen.transport.TransportProtos.BranchInfoProto;
@ -67,13 +67,12 @@ import org.thingsboard.server.gen.transport.TransportProtos.VersionedEntityInfoP
import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbApplicationEventListener;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
import org.thingsboard.server.queue.provider.TbVersionControlQueueFactory;
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
import org.thingsboard.server.queue.util.TbVersionControlComponent;
import javax.annotation.PostConstruct;
@ -108,7 +107,6 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe
private final PartitionService partitionService;
private final TbQueueProducerProvider producerProvider;
private final TbVersionControlQueueFactory queueFactory;
private final DataDecodingEncodingService encodingService;
private final GitRepositoryService vcService;
private final TopicService topicService;
@ -196,7 +194,7 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe
}
for (TbProtoQueueMsg<ToVersionControlServiceMsg> msgWrapper : msgs) {
ToVersionControlServiceMsg msg = msgWrapper.getValue();
var ctx = new VersionControlRequestCtx(msg, msg.hasClearRepositoryRequest() ? null : getEntitiesVersionControlSettings(msg));
var ctx = new VersionControlRequestCtx(msg, msg.hasClearRepositoryRequest() ? null : ProtoUtils.fromProto(msg.getVcSettings()));
long startTs = System.currentTimeMillis();
log.trace("[{}][{}] RECEIVED task: {}", ctx.getTenantId(), ctx.getRequestId(), msg);
int threadIdx = Math.abs(ctx.getTenantId().hashCode() % ioPoolSize);
@ -542,16 +540,6 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe
producer.send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), msg), null);
}
private RepositorySettings getEntitiesVersionControlSettings(ToVersionControlServiceMsg msg) {
Optional<RepositorySettings> settingsOpt = encodingService.decode(msg.getVcSettings().toByteArray());
if (settingsOpt.isPresent()) {
return settingsOpt.get();
} else {
log.warn("Failed to parse VC settings: {}", msg.getVcSettings());
throw new RuntimeException("Failed to parse vc settings!");
}
}
private String getRelativePath(EntityType entityType, String entityId) {
String path = entityType.name().toLowerCase();
if (entityId != null) {

View File

@ -37,13 +37,13 @@ public class DeviceProfileRedisCache extends RedisTbTransactionalCache<DevicePro
super(CacheConstants.DEVICE_PROFILE_CACHE, cacheSpecsMap, connectionFactory, configuration, new TbRedisSerializer<DeviceProfileCacheKey, DeviceProfile>() {
@Override
public byte[] serialize(DeviceProfile deviceProfile) throws SerializationException {
return ProtoUtils.toDeviceProfileProto(deviceProfile).toByteArray();
return ProtoUtils.toProto(deviceProfile).toByteArray();
}
@Override
public DeviceProfile deserialize(DeviceProfileCacheKey key, byte[] bytes) throws SerializationException {
try {
return ProtoUtils.fromDeviceProfileProto(TransportProtos.DeviceProfileProto.parseFrom(bytes));
return ProtoUtils.fromProto(TransportProtos.DeviceProfileProto.parseFrom(bytes));
} catch (InvalidProtocolBufferException e) {
throw new SerializationException(e.getMessage());
}