From 43fe84a2840cfb25d6b36ffe9ef73ce9473ef71f Mon Sep 17 00:00:00 2001 From: Yevhen Bondarenko <56396344+YevhenBondarenko@users.noreply.github.com> Date: Fri, 12 Mar 2021 13:00:51 +0200 Subject: [PATCH] created TransportResourceCache (#4232) * created TransportResourceCache * return resource by sysadmin if not found by tenant in DefaultTransportApiService --- .../server/controller/ResourceController.java | 4 +- .../service/install/InstallScripts.java | 4 +- .../queue/DefaultTbClusterService.java | 2 +- .../service/queue/TbClusterService.java | 2 +- .../transport/DefaultTransportApiService.java | 50 ++----- .../server/dao/resource/ResourceService.java | 4 +- .../{transport/resource => }/Resource.java | 10 +- .../resource => }/ResourceType.java | 2 +- common/queue/src/main/proto/queue.proto | 18 +-- .../transport/TransportResourceCache.java | 29 ++++ .../common/transport/TransportService.java | 6 +- .../DefaultTransportResourceCache.java | 130 ++++++++++++++++++ .../service/DefaultTransportService.java | 25 +++- .../transport/util/ProtoWithFSTService.java | 6 +- .../dao/model/sql/ResourceCompositeKey.java | 3 +- .../server/dao/model/sql/ResourceEntity.java | 4 +- .../dao/resource/BaseResourceService.java | 4 +- .../server/dao/resource/ResourceDao.java | 4 +- .../dao/sql/resource/ResourceDaoImpl.java | 4 +- 19 files changed, 227 insertions(+), 84 deletions(-) rename common/data/src/main/java/org/thingsboard/server/common/data/{transport/resource => }/Resource.java (83%) rename common/data/src/main/java/org/thingsboard/server/common/data/{transport/resource => }/ResourceType.java (91%) create mode 100644 common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportResourceCache.java create mode 100644 common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportResourceCache.java diff --git a/application/src/main/java/org/thingsboard/server/controller/ResourceController.java b/application/src/main/java/org/thingsboard/server/controller/ResourceController.java index 8939fa28b7..3eaf797b86 100644 --- a/application/src/main/java/org/thingsboard/server/controller/ResourceController.java +++ b/application/src/main/java/org/thingsboard/server/controller/ResourceController.java @@ -27,8 +27,8 @@ import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; -import org.thingsboard.server.common.data.transport.resource.Resource; -import org.thingsboard.server.common.data.transport.resource.ResourceType; +import org.thingsboard.server.common.data.Resource; +import org.thingsboard.server.common.data.ResourceType; import org.thingsboard.server.dao.resource.ResourceService; import org.thingsboard.server.queue.util.TbCoreComponent; diff --git a/application/src/main/java/org/thingsboard/server/service/install/InstallScripts.java b/application/src/main/java/org/thingsboard/server/service/install/InstallScripts.java index 36b85da719..2800f97661 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/InstallScripts.java +++ b/application/src/main/java/org/thingsboard/server/service/install/InstallScripts.java @@ -28,8 +28,8 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.oauth2.OAuth2ClientRegistrationTemplate; import org.thingsboard.server.common.data.rule.RuleChain; import org.thingsboard.server.common.data.rule.RuleChainMetaData; -import org.thingsboard.server.common.data.transport.resource.Resource; -import org.thingsboard.server.common.data.transport.resource.ResourceType; +import org.thingsboard.server.common.data.Resource; +import org.thingsboard.server.common.data.ResourceType; import org.thingsboard.server.common.data.widget.WidgetType; import org.thingsboard.server.common.data.widget.WidgetsBundle; import org.thingsboard.server.dao.dashboard.DashboardService; diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index 3405ab28d8..a2a04ce3bd 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -34,7 +34,7 @@ import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; -import org.thingsboard.server.common.data.transport.resource.Resource; +import org.thingsboard.server.common.data.Resource; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.queue.ServiceType; diff --git a/application/src/main/java/org/thingsboard/server/service/queue/TbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/TbClusterService.java index c1cafa3379..ae6dca5378 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/TbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/TbClusterService.java @@ -24,7 +24,7 @@ import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; -import org.thingsboard.server.common.data.transport.resource.Resource; +import org.thingsboard.server.common.data.Resource; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.gen.transport.TransportProtos; diff --git a/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java index eea7af582c..b12d71e9f4 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java @@ -39,12 +39,11 @@ import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.security.DeviceCredentials; import org.thingsboard.server.common.data.security.DeviceCredentialsType; -import org.thingsboard.server.common.data.transport.resource.Resource; -import org.thingsboard.server.common.data.transport.resource.ResourceType; +import org.thingsboard.server.common.data.Resource; +import org.thingsboard.server.common.data.ResourceType; import org.thingsboard.server.common.msg.EncryptionUtil; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgDataType; @@ -65,7 +64,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.GetEntityProfileRequ import org.thingsboard.server.gen.transport.TransportProtos.GetEntityProfileResponseMsg; import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg; -import org.thingsboard.server.gen.transport.TransportProtos.GetResourcesRequestMsg; +import org.thingsboard.server.gen.transport.TransportProtos.GetResourceRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.ProvisionDeviceRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.ProvisionResponseStatus; import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; @@ -82,14 +81,11 @@ import org.thingsboard.server.service.profile.TbDeviceProfileCache; import org.thingsboard.server.service.queue.TbClusterService; import org.thingsboard.server.service.state.DeviceStateService; -import java.util.Collections; -import java.util.List; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import java.util.stream.Collectors; /** * Created by ashvayka on 05.10.18. @@ -167,8 +163,8 @@ public class DefaultTransportApiService implements TransportApiService { } else if (transportApiRequestMsg.hasProvisionDeviceRequestMsg()) { return Futures.transform(handle(transportApiRequestMsg.getProvisionDeviceRequestMsg()), value -> new TbProtoQueueMsg<>(tbProtoQueueMsg.getKey(), value, tbProtoQueueMsg.getHeaders()), MoreExecutors.directExecutor()); - } else if (transportApiRequestMsg.hasResourcesRequestMsg()) { - return Futures.transform(handle(transportApiRequestMsg.getResourcesRequestMsg()), + } else if (transportApiRequestMsg.hasResourceRequestMsg()) { + return Futures.transform(handle(transportApiRequestMsg.getResourceRequestMsg()), value -> new TbProtoQueueMsg<>(tbProtoQueueMsg.getKey(), value, tbProtoQueueMsg.getHeaders()), MoreExecutors.directExecutor()); } return Futures.transform(getEmptyTransportApiResponseFuture(), @@ -366,38 +362,22 @@ public class DefaultTransportApiService implements TransportApiService { return Futures.immediateFuture(TransportApiResponseMsg.newBuilder().setEntityProfileResponseMsg(builder).build()); } - private ListenableFuture handle(GetResourcesRequestMsg requestMsg) { + private ListenableFuture handle(GetResourceRequestMsg requestMsg) { TenantId tenantId = new TenantId(new UUID(requestMsg.getTenantIdMSB(), requestMsg.getTenantIdLSB())); - TransportProtos.GetResourcesResponseMsg.Builder builder = TransportProtos.GetResourcesResponseMsg.newBuilder(); - String resourceType = requestMsg.getResourceType(); + ResourceType resourceType = ResourceType.valueOf(requestMsg.getResourceType()); String resourceId = requestMsg.getResourceId(); + TransportProtos.GetResourceResponseMsg.Builder builder = TransportProtos.GetResourceResponseMsg.newBuilder(); + Resource resource = resourceService.getResource(tenantId, resourceType, resourceId); - List resources; - - if (resourceType != null && resourceId != null) { - resources = Collections.singletonList(toProto( - resourceService.getResource(tenantId, ResourceType.valueOf(resourceType), resourceId))); - } else { - //TODO: add page link params to request proto if need or remove this - resources = resourceService.findResourcesByTenantId(tenantId, new PageLink(100)) - .getData() - .stream() - .map(this::toProto) - .collect(Collectors.toList()); + if (resource == null && !tenantId.equals(TenantId.SYS_TENANT_ID)) { + resource = resourceService.getResource(TenantId.SYS_TENANT_ID, resourceType, resourceId); } - builder.addAllResources(resources); - return Futures.immediateFuture(TransportApiResponseMsg.newBuilder().setResourcesResponseMsg(builder).build()); - } + if (resource != null) { + builder.setResource(ByteString.copyFrom(dataDecodingEncodingService.encode(resource))); + } - private TransportProtos.ResourceMsg toProto(Resource resource) { - return TransportProtos.ResourceMsg.newBuilder() - .setTenantIdMSB(resource.getTenantId().getId().getMostSignificantBits()) - .setTenantIdLSB(resource.getTenantId().getId().getLeastSignificantBits()) - .setResourceType(resource.getResourceType().name()) - .setResourceId(resource.getResourceId()) - .setValue(resource.getValue()) - .build(); + return Futures.immediateFuture(TransportApiResponseMsg.newBuilder().setResourceResponseMsg(builder).build()); } private ListenableFuture getDeviceInfo(DeviceId deviceId, DeviceCredentials credentials) { diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/resource/ResourceService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/resource/ResourceService.java index d07b090494..e79a2297e1 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/resource/ResourceService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/resource/ResourceService.java @@ -18,8 +18,8 @@ package org.thingsboard.server.dao.resource; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; -import org.thingsboard.server.common.data.transport.resource.Resource; -import org.thingsboard.server.common.data.transport.resource.ResourceType; +import org.thingsboard.server.common.data.Resource; +import org.thingsboard.server.common.data.ResourceType; public interface ResourceService { diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/transport/resource/Resource.java b/common/data/src/main/java/org/thingsboard/server/common/data/Resource.java similarity index 83% rename from common/data/src/main/java/org/thingsboard/server/common/data/transport/resource/Resource.java rename to common/data/src/main/java/org/thingsboard/server/common/data/Resource.java index c29b704b04..2e7cde8185 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/transport/resource/Resource.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/Resource.java @@ -13,14 +13,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.common.data.transport.resource; +package org.thingsboard.server.common.data; import lombok.Data; -import org.thingsboard.server.common.data.HasTenantId; import org.thingsboard.server.common.data.id.TenantId; +import java.io.Serializable; + @Data -public class Resource implements HasTenantId { +public class Resource implements HasTenantId, Serializable { + + private static final long serialVersionUID = 7379609705527272306L; + private TenantId tenantId; private ResourceType resourceType; private String resourceId; diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/transport/resource/ResourceType.java b/common/data/src/main/java/org/thingsboard/server/common/data/ResourceType.java similarity index 91% rename from common/data/src/main/java/org/thingsboard/server/common/data/transport/resource/ResourceType.java rename to common/data/src/main/java/org/thingsboard/server/common/data/ResourceType.java index c43f1997da..a9bb27432a 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/transport/resource/ResourceType.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/ResourceType.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.common.data.transport.resource; +package org.thingsboard.server.common.data; public enum ResourceType { LWM2M_MODEL, JKS, PKCS_12 diff --git a/common/queue/src/main/proto/queue.proto b/common/queue/src/main/proto/queue.proto index 42f6267d42..0a9e6d7652 100644 --- a/common/queue/src/main/proto/queue.proto +++ b/common/queue/src/main/proto/queue.proto @@ -201,23 +201,15 @@ message LwM2MResponseMsg { LwM2MRegistrationResponseMsg registrationMsg = 1; } -message ResourceMsg { - int64 tenantIdMSB = 1; - int64 tenantIdLSB = 2; - string resourceType = 3; - string resourceId = 4; - string value = 5; -} - -message GetResourcesRequestMsg { +message GetResourceRequestMsg { int64 tenantIdMSB = 1; int64 tenantIdLSB = 2; string resourceType = 3; string resourceId = 4; } -message GetResourcesResponseMsg { - repeated ResourceMsg resources = 1; +message GetResourceResponseMsg { + bytes resource = 1; } message ValidateDeviceLwM2MCredentialsRequestMsg { @@ -558,7 +550,7 @@ message TransportApiRequestMsg { ValidateBasicMqttCredRequestMsg validateBasicMqttCredRequestMsg = 6; ProvisionDeviceRequestMsg provisionDeviceRequestMsg = 7; ValidateDeviceLwM2MCredentialsRequestMsg validateDeviceLwM2MCredentialsRequestMsg = 8; - GetResourcesRequestMsg resourcesRequestMsg = 9; + GetResourceRequestMsg resourceRequestMsg = 9; } /* Response from ThingsBoard Core Service to Transport Service */ @@ -568,7 +560,7 @@ message TransportApiResponseMsg { GetEntityProfileResponseMsg entityProfileResponseMsg = 3; ProvisionDeviceResponseMsg provisionDeviceResponseMsg = 4; LwM2MResponseMsg lwM2MResponseMsg = 6; - GetResourcesResponseMsg resourcesResponseMsg = 7; + GetResourceResponseMsg resourceResponseMsg = 7; } /* Messages that are handled by ThingsBoard Core Service */ diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportResourceCache.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportResourceCache.java new file mode 100644 index 0000000000..447ad3e6fa --- /dev/null +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportResourceCache.java @@ -0,0 +1,29 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.transport; + +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.Resource; +import org.thingsboard.server.common.data.ResourceType; + +public interface TransportResourceCache { + + Resource get(TenantId tenantId, ResourceType resourceType, String resourceId); + + void update(TenantId tenantId, ResourceType resourceType, String resourceI); + + void evict(TenantId tenantId, ResourceType resourceType, String resourceId); +} diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java index ab728af00d..04b08272b1 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportService.java @@ -25,8 +25,8 @@ import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestM import org.thingsboard.server.gen.transport.TransportProtos.GetEntityProfileRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.GetEntityProfileResponseMsg; import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg; -import org.thingsboard.server.gen.transport.TransportProtos.GetResourcesRequestMsg; -import org.thingsboard.server.gen.transport.TransportProtos.GetResourcesResponseMsg; +import org.thingsboard.server.gen.transport.TransportProtos.GetResourceRequestMsg; +import org.thingsboard.server.gen.transport.TransportProtos.GetResourceResponseMsg; import org.thingsboard.server.gen.transport.TransportProtos.LwM2MRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.LwM2MResponseMsg; import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg; @@ -53,7 +53,7 @@ public interface TransportService { GetEntityProfileResponseMsg getEntityProfile(GetEntityProfileRequestMsg msg); - GetResourcesResponseMsg getResources(GetResourcesRequestMsg msg); + GetResourceResponseMsg getResource(GetResourceRequestMsg msg); void process(DeviceTransportType transportType, ValidateDeviceTokenRequestMsg msg, TransportServiceCallback callback); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportResourceCache.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportResourceCache.java new file mode 100644 index 0000000000..9d14acbaf1 --- /dev/null +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportResourceCache.java @@ -0,0 +1,130 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.transport.service; + +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Lazy; +import org.springframework.stereotype.Component; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.Resource; +import org.thingsboard.server.common.data.ResourceType; +import org.thingsboard.server.common.transport.TransportResourceCache; +import org.thingsboard.server.common.transport.TransportService; +import org.thingsboard.server.common.transport.util.DataDecodingEncodingService; +import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.queue.util.TbTransportComponent; + +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +@Slf4j +@Component +@TbTransportComponent +public class DefaultTransportResourceCache implements TransportResourceCache { + + private final Lock resourceFetchLock = new ReentrantLock(); + private final ConcurrentMap resources = new ConcurrentHashMap<>(); + private final Set keys = ConcurrentHashMap.newKeySet(); + private final DataDecodingEncodingService dataDecodingEncodingService; + private final TransportService transportService; + + public DefaultTransportResourceCache(DataDecodingEncodingService dataDecodingEncodingService, @Lazy TransportService transportService) { + this.dataDecodingEncodingService = dataDecodingEncodingService; + this.transportService = transportService; + } + + @Override + public Resource get(TenantId tenantId, ResourceType resourceType, String resourceId) { + ResourceKey resourceKey = new ResourceKey(tenantId, resourceType, resourceId); + Resource resource; + + if (keys.contains(resourceKey)) { + resource = resources.get(resourceKey); + if (resource == null) { + resource = resources.get(resourceKey.getSystemKey()); + } + } else { + resourceFetchLock.lock(); + try { + if (keys.contains(resourceKey)) { + resource = resources.get(resourceKey); + if (resource == null) { + resource = resources.get(resourceKey.getSystemKey()); + } + } else { + resource = fetchResource(resourceKey); + keys.add(resourceKey); + } + } finally { + resourceFetchLock.unlock(); + } + } + + return resource; + } + + private Resource fetchResource(ResourceKey resourceKey) { + UUID tenantId = resourceKey.getTenantId().getId(); + TransportProtos.GetResourceRequestMsg.Builder builder = TransportProtos.GetResourceRequestMsg.newBuilder(); + builder + .setTenantIdLSB(tenantId.getLeastSignificantBits()) + .setTenantIdMSB(tenantId.getMostSignificantBits()) + .setResourceType(resourceKey.resourceType.name()) + .setResourceId(resourceKey.resourceId); + TransportProtos.GetResourceResponseMsg responseMsg = transportService.getResource(builder.build()); + + Optional optionalResource = dataDecodingEncodingService.decode(responseMsg.getResource().toByteArray()); + if (optionalResource.isPresent()) { + Resource resource = optionalResource.get(); + resources.put(new ResourceKey(resource.getTenantId(), resource.getResourceType(), resource.getResourceId()), resource); + return resource; + } + + return null; + } + + @Override + public void update(TenantId tenantId, ResourceType resourceType, String resourceId) { + ResourceKey resourceKey = new ResourceKey(tenantId, resourceType, resourceId); + if (keys.contains(resourceKey) || resources.containsKey(resourceKey)) { + fetchResource(resourceKey); + } + } + + @Override + public void evict(TenantId tenantId, ResourceType resourceType, String resourceId) { + ResourceKey resourceKey = new ResourceKey(tenantId, resourceType, resourceId); + keys.remove(resourceKey); + resources.remove(resourceKey); + } + + @Data + private static class ResourceKey { + private final TenantId tenantId; + private final ResourceType resourceType; + private final String resourceId; + + public ResourceKey getSystemKey() { + return new ResourceKey(TenantId.SYS_TENANT_ID, resourceType, resourceId); + } + } +} diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index b273ee48cf..539921492b 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -37,6 +37,7 @@ import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantProfileId; +import org.thingsboard.server.common.data.ResourceType; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.queue.ServiceQueue; @@ -49,6 +50,7 @@ import org.thingsboard.server.common.stats.StatsFactory; import org.thingsboard.server.common.stats.StatsType; import org.thingsboard.server.common.transport.SessionMsgListener; import org.thingsboard.server.common.transport.TransportDeviceProfileCache; +import org.thingsboard.server.common.transport.TransportResourceCache; import org.thingsboard.server.common.transport.TransportService; import org.thingsboard.server.common.transport.TransportServiceCallback; import org.thingsboard.server.common.transport.TransportTenantProfileCache; @@ -61,7 +63,6 @@ import org.thingsboard.server.common.transport.util.JsonUtils; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.ProvisionDeviceRequestMsg; import org.thingsboard.server.gen.transport.TransportProtos.ProvisionDeviceResponseMsg; -import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; @@ -131,6 +132,7 @@ public class DefaultTransportService implements TransportService { private final TransportRateLimitService rateLimitService; private final DataDecodingEncodingService dataDecodingEncodingService; private final SchedulerComponent scheduler; + private final TransportResourceCache transportResourceCache; protected TbQueueRequestTemplate, TbProtoQueueMsg> transportApiRequestTemplate; protected TbQueueProducer> ruleEngineMsgProducer; @@ -157,7 +159,7 @@ public class DefaultTransportService implements TransportService { TransportDeviceProfileCache deviceProfileCache, TransportTenantProfileCache tenantProfileCache, TbApiUsageClient apiUsageClient, TransportRateLimitService rateLimitService, - DataDecodingEncodingService dataDecodingEncodingService, SchedulerComponent scheduler) { + DataDecodingEncodingService dataDecodingEncodingService, SchedulerComponent scheduler, TransportResourceCache transportResourceCache) { this.serviceInfoProvider = serviceInfoProvider; this.queueProvider = queueProvider; this.producerProvider = producerProvider; @@ -169,6 +171,7 @@ public class DefaultTransportService implements TransportService { this.rateLimitService = rateLimitService; this.dataDecodingEncodingService = dataDecodingEncodingService; this.scheduler = scheduler; + this.transportResourceCache = transportResourceCache; } @PostConstruct @@ -255,12 +258,12 @@ public class DefaultTransportService implements TransportService { } @Override - public TransportProtos.GetResourcesResponseMsg getResources(TransportProtos.GetResourcesRequestMsg msg) { + public TransportProtos.GetResourceResponseMsg getResource(TransportProtos.GetResourceRequestMsg msg) { TbProtoQueueMsg protoMsg = - new TbProtoQueueMsg<>(UUID.randomUUID(), TransportProtos.TransportApiRequestMsg.newBuilder().setResourcesRequestMsg(msg).build()); + new TbProtoQueueMsg<>(UUID.randomUUID(), TransportProtos.TransportApiRequestMsg.newBuilder().setResourceRequestMsg(msg).build()); try { TbProtoQueueMsg response = transportApiRequestTemplate.send(protoMsg).get(); - return response.getValue().getResourcesResponseMsg(); + return response.getValue().getResourceResponseMsg(); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } @@ -701,9 +704,17 @@ public class DefaultTransportService implements TransportService { rateLimitService.remove(new DeviceId(entityUuid)); } } else if (toSessionMsg.hasResourceUpdateMsg()) { - //TODO: update resource cache + TransportProtos.ResourceUpdateMsg msg = toSessionMsg.getResourceUpdateMsg(); + TenantId tenantId = new TenantId(new UUID(msg.getTenantIdMSB(), msg.getTenantIdLSB())); + ResourceType resourceType = ResourceType.valueOf(msg.getResourceType()); + String resourceId = msg.getResourceId(); + transportResourceCache.update(tenantId, resourceType, resourceId); } else if (toSessionMsg.hasResourceDeleteMsg()) { - //TODO: remove resource from cache + TransportProtos.ResourceDeleteMsg msg = toSessionMsg.getResourceDeleteMsg(); + TenantId tenantId = new TenantId(new UUID(msg.getTenantIdMSB(), msg.getTenantIdLSB())); + ResourceType resourceType = ResourceType.valueOf(msg.getResourceType()); + String resourceId = msg.getResourceId(); + transportResourceCache.evict(tenantId, resourceType, resourceId); } else { //TODO: should we notify the device actor about missed session? log.debug("[{}] Missing session.", sessionId); diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/util/ProtoWithFSTService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/util/ProtoWithFSTService.java index a3b65c2248..4d0b8bb86a 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/util/ProtoWithFSTService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/util/ProtoWithFSTService.java @@ -18,8 +18,6 @@ package org.thingsboard.server.common.transport.util; import lombok.extern.slf4j.Slf4j; import org.nustaq.serialization.FSTConfiguration; import org.springframework.stereotype.Service; -import org.thingsboard.server.common.msg.TbActorMsg; -import org.thingsboard.server.common.transport.util.DataDecodingEncodingService; import java.util.Optional; @@ -33,8 +31,8 @@ public class ProtoWithFSTService implements DataDecodingEncodingService { public Optional decode(byte[] byteArray) { try { @SuppressWarnings("unchecked") - T msg = (T) config.asObject(byteArray); - return Optional.of(msg); + T msg = byteArray != null && byteArray.length > 0 ? (T) config.asObject(byteArray) : null; + return Optional.ofNullable(msg); } catch (IllegalArgumentException e) { log.error("Error during deserialization message, [{}]", e.getMessage()); return Optional.empty(); diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/ResourceCompositeKey.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/ResourceCompositeKey.java index 828f1f4341..6592cf757d 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/sql/ResourceCompositeKey.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/ResourceCompositeKey.java @@ -18,8 +18,7 @@ package org.thingsboard.server.dao.model.sql; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; -import org.thingsboard.server.common.data.relation.EntityRelation; -import org.thingsboard.server.common.data.transport.resource.Resource; +import org.thingsboard.server.common.data.Resource; import javax.persistence.Transient; import java.io.Serializable; diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/ResourceEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/ResourceEntity.java index a877afa027..c3b242bfe5 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/sql/ResourceEntity.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/ResourceEntity.java @@ -17,8 +17,8 @@ package org.thingsboard.server.dao.model.sql; import lombok.Data; import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.data.transport.resource.Resource; -import org.thingsboard.server.common.data.transport.resource.ResourceType; +import org.thingsboard.server.common.data.Resource; +import org.thingsboard.server.common.data.ResourceType; import org.thingsboard.server.dao.model.ToData; import javax.persistence.Column; diff --git a/dao/src/main/java/org/thingsboard/server/dao/resource/BaseResourceService.java b/dao/src/main/java/org/thingsboard/server/dao/resource/BaseResourceService.java index f5f23836b6..41eda77d54 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/resource/BaseResourceService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/resource/BaseResourceService.java @@ -20,8 +20,8 @@ import org.springframework.stereotype.Service; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; -import org.thingsboard.server.common.data.transport.resource.Resource; -import org.thingsboard.server.common.data.transport.resource.ResourceType; +import org.thingsboard.server.common.data.Resource; +import org.thingsboard.server.common.data.ResourceType; import org.thingsboard.server.dao.exception.DataValidationException; import static org.thingsboard.server.dao.device.DeviceServiceImpl.INCORRECT_TENANT_ID; diff --git a/dao/src/main/java/org/thingsboard/server/dao/resource/ResourceDao.java b/dao/src/main/java/org/thingsboard/server/dao/resource/ResourceDao.java index f8fae8b19d..0c03a474c2 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/resource/ResourceDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/resource/ResourceDao.java @@ -18,8 +18,8 @@ package org.thingsboard.server.dao.resource; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; -import org.thingsboard.server.common.data.transport.resource.Resource; -import org.thingsboard.server.common.data.transport.resource.ResourceType; +import org.thingsboard.server.common.data.Resource; +import org.thingsboard.server.common.data.ResourceType; public interface ResourceDao { diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/resource/ResourceDaoImpl.java b/dao/src/main/java/org/thingsboard/server/dao/sql/resource/ResourceDaoImpl.java index 0f97d51f8a..c5436b2535 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/resource/ResourceDaoImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/resource/ResourceDaoImpl.java @@ -21,8 +21,8 @@ import org.springframework.transaction.annotation.Transactional; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; -import org.thingsboard.server.common.data.transport.resource.Resource; -import org.thingsboard.server.common.data.transport.resource.ResourceType; +import org.thingsboard.server.common.data.Resource; +import org.thingsboard.server.common.data.ResourceType; import org.thingsboard.server.dao.DaoUtil; import org.thingsboard.server.dao.model.sql.ResourceCompositeKey; import org.thingsboard.server.dao.model.sql.ResourceEntity;