From 0245ff24a8151dc5a39452cde2402ec74b09a0c1 Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Fri, 4 Sep 2020 15:35:50 +0300 Subject: [PATCH] Transport Profile cache --- .../transport/TransportProfileCache.java | 35 +++++++++ .../service/DefaultTransportProfileCache.java | 77 +++++++++++++++++++ .../service/DefaultTransportService.java | 73 +++++------------- 3 files changed, 130 insertions(+), 55 deletions(-) create mode 100644 common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportProfileCache.java create mode 100644 common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportProfileCache.java diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportProfileCache.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportProfileCache.java new file mode 100644 index 0000000000..f34b76ade9 --- /dev/null +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportProfileCache.java @@ -0,0 +1,35 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.transport; + +import com.google.protobuf.ByteString; +import org.thingsboard.server.common.data.DeviceProfile; +import org.thingsboard.server.common.data.id.DeviceProfileId; + +import java.util.Optional; + +public interface TransportProfileCache { + + + DeviceProfile getOrCreate(DeviceProfileId id, ByteString profileBody); + + DeviceProfile get(DeviceProfileId id); + + void put(DeviceProfile profile); + + DeviceProfile put(ByteString profileBody); + +} diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportProfileCache.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportProfileCache.java new file mode 100644 index 0000000000..afa8e15e20 --- /dev/null +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportProfileCache.java @@ -0,0 +1,77 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.transport.service; + +import com.google.protobuf.ByteString; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.stereotype.Component; +import org.thingsboard.server.common.data.DeviceProfile; +import org.thingsboard.server.common.data.id.DeviceProfileId; +import org.thingsboard.server.common.transport.TransportProfileCache; +import org.thingsboard.server.common.transport.util.DataDecodingEncodingService; + +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +@Slf4j +@Component +@ConditionalOnExpression("('${service.type:null}'=='monolith' && '${transport.api_enabled:true}'=='true') || '${service.type:null}'=='tb-transport'") +public class DefaultTransportProfileCache implements TransportProfileCache { + + private final ConcurrentMap deviceProfiles = new ConcurrentHashMap<>(); + + private final DataDecodingEncodingService dataDecodingEncodingService; + + public DefaultTransportProfileCache(DataDecodingEncodingService dataDecodingEncodingService) { + this.dataDecodingEncodingService = dataDecodingEncodingService; + } + + @Override + public DeviceProfile getOrCreate(DeviceProfileId id, ByteString profileBody) { + DeviceProfile profile = deviceProfiles.get(id); + if (profile == null) { + Optional deviceProfile = dataDecodingEncodingService.decode(profileBody.toByteArray()); + if (deviceProfile.isPresent()) { + profile = deviceProfile.get(); + deviceProfiles.put(id, profile); + } + } + return profile; + } + + @Override + public DeviceProfile get(DeviceProfileId id) { + return deviceProfiles.get(id); + } + + @Override + public void put(DeviceProfile profile) { + deviceProfiles.put(profile.getId(), profile); + } + + @Override + public DeviceProfile put(ByteString profileBody) { + Optional deviceProfile = dataDecodingEncodingService.decode(profileBody.toByteArray()); + if (deviceProfile.isPresent()) { + put(deviceProfile.get()); + return deviceProfile.get(); + } else { + return null; + } + } +} diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index 3bf9d78b48..b572835971 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -43,6 +43,7 @@ import org.thingsboard.server.common.msg.session.SessionMsgType; import org.thingsboard.server.common.msg.tools.TbRateLimits; import org.thingsboard.server.common.msg.tools.TbRateLimitsException; import org.thingsboard.server.common.transport.SessionMsgListener; +import org.thingsboard.server.common.transport.TransportProfileCache; import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.TransportServiceCallback; import org.thingsboard.server.common.transport.auth.GetOrCreateDeviceFromGatewayResponse; @@ -121,8 +122,7 @@ public class DefaultTransportService implements TransportService { private final PartitionService partitionService; private final TbServiceInfoProvider serviceInfoProvider; private final StatsFactory statsFactory; - private final DataDecodingEncodingService dataDecodingEncodingService; - + private final TransportProfileCache transportProfileCache; protected TbQueueRequestTemplate, TbProtoQueueMsg> transportApiRequestTemplate; protected TbQueueProducer> ruleEngineMsgProducer; @@ -141,7 +141,6 @@ public class DefaultTransportService implements TransportService { //TODO 3.2: @ybondarenko Implement cleanup of this maps. private final ConcurrentMap perTenantLimits = new ConcurrentHashMap<>(); private final ConcurrentMap perDeviceLimits = new ConcurrentHashMap<>(); - private final ConcurrentMap deviceProfiles = new ConcurrentHashMap<>(); private ExecutorService mainConsumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("transport-consumer")); private volatile boolean stopped = false; @@ -151,13 +150,13 @@ public class DefaultTransportService implements TransportService { TbQueueProducerProvider producerProvider, PartitionService partitionService, StatsFactory statsFactory, - DataDecodingEncodingService dataDecodingEncodingService) { + TransportProfileCache transportProfileCache) { this.serviceInfoProvider = serviceInfoProvider; this.queueProvider = queueProvider; this.producerProvider = producerProvider; this.partitionService = partitionService; this.statsFactory = statsFactory; - this.dataDecodingEncodingService = dataDecodingEncodingService; + this.transportProfileCache = transportProfileCache; } @PostConstruct @@ -276,14 +275,7 @@ public class DefaultTransportService implements TransportService { result.deviceInfo(tdi); ByteString profileBody = msg.getProfileBody(); if (profileBody != null && !profileBody.isEmpty()) { - DeviceProfile profile = deviceProfiles.get(tdi.getDeviceProfileId()); - if (profile == null) { - Optional deviceProfile = dataDecodingEncodingService.decode(profileBody.toByteArray()); - if (deviceProfile.isPresent()) { - profile = deviceProfile.get(); - deviceProfiles.put(tdi.getDeviceProfileId(), profile); - } - } + DeviceProfile profile = transportProfileCache.getOrCreate(tdi.getDeviceProfileId(), profileBody); if (transportType != DeviceTransportType.DEFAULT && profile != null && profile.getTransportType() != DeviceTransportType.DEFAULT && profile.getTransportType() != transportType) { log.debug("[{}] Device profile [{}] has different transport type: {}, expected: {}", tdi.getDeviceId(), tdi.getDeviceProfileId(), profile.getTransportType(), transportType); @@ -309,15 +301,7 @@ public class DefaultTransportService implements TransportService { result.deviceInfo(tdi); ByteString profileBody = msg.getProfileBody(); if (profileBody != null && !profileBody.isEmpty()) { - DeviceProfile profile = deviceProfiles.get(tdi.getDeviceProfileId()); - if (profile == null) { - Optional deviceProfile = dataDecodingEncodingService.decode(profileBody.toByteArray()); - if (deviceProfile.isPresent()) { - profile = deviceProfile.get(); - deviceProfiles.put(tdi.getDeviceProfileId(), profile); - } - } - result.deviceProfile(profile); + result.deviceProfile(transportProfileCache.getOrCreate(tdi.getDeviceProfileId(), profileBody)); } } return result.build(); @@ -629,8 +613,10 @@ public class DefaultTransportService implements TransportService { } } else { if (toSessionMsg.hasDeviceProfileUpdateMsg()) { - Optional deviceProfile = dataDecodingEncodingService.decode(toSessionMsg.getDeviceProfileUpdateMsg().getData().toByteArray()); - deviceProfile.ifPresent(this::onProfileUpdate); + DeviceProfile deviceProfile = transportProfileCache.put(toSessionMsg.getDeviceProfileUpdateMsg().getData()); + if (deviceProfile != null) { + onProfileUpdate(deviceProfile); + } } else { //TODO: should we notify the device actor about missed session? log.debug("[{}] Missing session.", sessionId); @@ -640,7 +626,7 @@ public class DefaultTransportService implements TransportService { @Override public void getDeviceProfile(DeviceProfileId deviceProfileId, TransportServiceCallback callback) { - DeviceProfile deviceProfile = deviceProfiles.get(deviceProfileId); + DeviceProfile deviceProfile = transportProfileCache.get(deviceProfileId); if (deviceProfile != null) { callback.onSuccess(deviceProfile); } else { @@ -653,14 +639,13 @@ public class DefaultTransportService implements TransportService { TransportApiRequestMsg.newBuilder().setGetDeviceProfileRequestMsg(msg).build()); AsyncCallbackTemplate.withCallback(transportApiRequestTemplate.send(protoMsg), response -> { - byte[] devProfileBody = response.getValue().getGetDeviceProfileResponseMsg().getData().toByteArray(); - if (devProfileBody != null && devProfileBody.length > 0) { - Optional deviceProfileOpt = dataDecodingEncodingService.decode(devProfileBody); - if (deviceProfileOpt.isPresent()) { - deviceProfiles.put(deviceProfileOpt.get().getId(), deviceProfile); - callback.onSuccess(deviceProfileOpt.get()); + ByteString devProfileBody = response.getValue().getGetDeviceProfileResponseMsg().getData(); + if (devProfileBody != null && !devProfileBody.isEmpty()) { + DeviceProfile profile = transportProfileCache.put(devProfileBody); + if (profile != null) { + callback.onSuccess(profile); } else { - log.warn("Failed to decode device profile: {}", Arrays.toString(devProfileBody)); + log.warn("Failed to decode device profile: {}", devProfileBody); callback.onError(new IllegalArgumentException("Failed to decode device profile!")); } } else { @@ -673,7 +658,6 @@ public class DefaultTransportService implements TransportService { @Override public void onProfileUpdate(DeviceProfile deviceProfile) { - deviceProfiles.put(deviceProfile.getId(), deviceProfile); long deviceProfileIdMSB = deviceProfile.getId().getId().getMostSignificantBits(); long deviceProfileIdLSB = deviceProfile.getId().getId().getLeastSignificantBits(); sessions.forEach((id, md) -> { @@ -736,7 +720,7 @@ public class DefaultTransportService implements TransportService { private RuleChainId resolveRuleChainId(TransportProtos.SessionInfoProto sessionInfo) { DeviceProfileId deviceProfileId = new DeviceProfileId(new UUID(sessionInfo.getDeviceProfileIdMSB(), sessionInfo.getDeviceProfileIdLSB())); - DeviceProfile deviceProfile = deviceProfiles.get(deviceProfileId); + DeviceProfile deviceProfile = transportProfileCache.get(deviceProfileId); RuleChainId ruleChainId; if (deviceProfile == null) { log.warn("[{}] Device profile is null!", deviceProfileId); @@ -747,27 +731,6 @@ public class DefaultTransportService implements TransportService { return ruleChainId; } - private ListenableFuture> extractProfile(ListenableFuture> send, - Function hasDeviceInfo, - Function deviceInfoF, - Function profileBodyF) { - return Futures.transform(send, response -> { - T value = response.getValue(); - if (hasDeviceInfo.apply(value)) { - TransportProtos.DeviceInfoProto deviceInfo = deviceInfoF.apply(value); - ByteString profileBody = profileBodyF.apply(value); - if (profileBody != null && !profileBody.isEmpty()) { - DeviceProfileId deviceProfileId = new DeviceProfileId(new UUID(deviceInfo.getDeviceProfileIdMSB(), deviceInfo.getDeviceProfileIdLSB())); - if (!deviceProfiles.containsKey(deviceProfileId)) { - Optional deviceProfile = dataDecodingEncodingService.decode(profileBody.toByteArray()); - deviceProfile.ifPresent(profile -> deviceProfiles.put(deviceProfileId, profile)); - } - } - } - return response; - }, transportCallbackExecutor); - } - private class TransportTbQueueCallback implements TbQueueCallback { private final TransportServiceCallback callback;