created TransportResourceCache (#4232)
* created TransportResourceCache * return resource by sysadmin if not found by tenant in DefaultTransportApiService
This commit is contained in:
parent
e2dd5b96ae
commit
43fe84a284
@ -27,8 +27,8 @@ import org.thingsboard.server.common.data.exception.ThingsboardException;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.common.data.transport.resource.Resource;
|
||||
import org.thingsboard.server.common.data.transport.resource.ResourceType;
|
||||
import org.thingsboard.server.common.data.Resource;
|
||||
import org.thingsboard.server.common.data.ResourceType;
|
||||
import org.thingsboard.server.dao.resource.ResourceService;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
|
||||
|
||||
@ -28,8 +28,8 @@ import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.oauth2.OAuth2ClientRegistrationTemplate;
|
||||
import org.thingsboard.server.common.data.rule.RuleChain;
|
||||
import org.thingsboard.server.common.data.rule.RuleChainMetaData;
|
||||
import org.thingsboard.server.common.data.transport.resource.Resource;
|
||||
import org.thingsboard.server.common.data.transport.resource.ResourceType;
|
||||
import org.thingsboard.server.common.data.Resource;
|
||||
import org.thingsboard.server.common.data.ResourceType;
|
||||
import org.thingsboard.server.common.data.widget.WidgetType;
|
||||
import org.thingsboard.server.common.data.widget.WidgetsBundle;
|
||||
import org.thingsboard.server.dao.dashboard.DashboardService;
|
||||
|
||||
@ -34,7 +34,7 @@ import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.RuleChainId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
|
||||
import org.thingsboard.server.common.data.transport.resource.Resource;
|
||||
import org.thingsboard.server.common.data.Resource;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
|
||||
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||
|
||||
@ -24,7 +24,7 @@ import org.thingsboard.server.common.data.TenantProfile;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
|
||||
import org.thingsboard.server.common.data.transport.resource.Resource;
|
||||
import org.thingsboard.server.common.data.Resource;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
|
||||
@ -39,12 +39,11 @@ import org.thingsboard.server.common.data.id.CustomerId;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.DeviceProfileId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.common.data.relation.EntityRelation;
|
||||
import org.thingsboard.server.common.data.security.DeviceCredentials;
|
||||
import org.thingsboard.server.common.data.security.DeviceCredentialsType;
|
||||
import org.thingsboard.server.common.data.transport.resource.Resource;
|
||||
import org.thingsboard.server.common.data.transport.resource.ResourceType;
|
||||
import org.thingsboard.server.common.data.Resource;
|
||||
import org.thingsboard.server.common.data.ResourceType;
|
||||
import org.thingsboard.server.common.msg.EncryptionUtil;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
import org.thingsboard.server.common.msg.TbMsgDataType;
|
||||
@ -65,7 +64,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.GetEntityProfileRequ
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.GetEntityProfileResponseMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayResponseMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.GetResourcesRequestMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.GetResourceRequestMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ProvisionDeviceRequestMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ProvisionResponseStatus;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
|
||||
@ -82,14 +81,11 @@ import org.thingsboard.server.service.profile.TbDeviceProfileCache;
|
||||
import org.thingsboard.server.service.queue.TbClusterService;
|
||||
import org.thingsboard.server.service.state.DeviceStateService;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Created by ashvayka on 05.10.18.
|
||||
@ -167,8 +163,8 @@ public class DefaultTransportApiService implements TransportApiService {
|
||||
} else if (transportApiRequestMsg.hasProvisionDeviceRequestMsg()) {
|
||||
return Futures.transform(handle(transportApiRequestMsg.getProvisionDeviceRequestMsg()),
|
||||
value -> new TbProtoQueueMsg<>(tbProtoQueueMsg.getKey(), value, tbProtoQueueMsg.getHeaders()), MoreExecutors.directExecutor());
|
||||
} else if (transportApiRequestMsg.hasResourcesRequestMsg()) {
|
||||
return Futures.transform(handle(transportApiRequestMsg.getResourcesRequestMsg()),
|
||||
} else if (transportApiRequestMsg.hasResourceRequestMsg()) {
|
||||
return Futures.transform(handle(transportApiRequestMsg.getResourceRequestMsg()),
|
||||
value -> new TbProtoQueueMsg<>(tbProtoQueueMsg.getKey(), value, tbProtoQueueMsg.getHeaders()), MoreExecutors.directExecutor());
|
||||
}
|
||||
return Futures.transform(getEmptyTransportApiResponseFuture(),
|
||||
@ -366,38 +362,22 @@ public class DefaultTransportApiService implements TransportApiService {
|
||||
return Futures.immediateFuture(TransportApiResponseMsg.newBuilder().setEntityProfileResponseMsg(builder).build());
|
||||
}
|
||||
|
||||
private ListenableFuture<TransportApiResponseMsg> handle(GetResourcesRequestMsg requestMsg) {
|
||||
private ListenableFuture<TransportApiResponseMsg> handle(GetResourceRequestMsg requestMsg) {
|
||||
TenantId tenantId = new TenantId(new UUID(requestMsg.getTenantIdMSB(), requestMsg.getTenantIdLSB()));
|
||||
TransportProtos.GetResourcesResponseMsg.Builder builder = TransportProtos.GetResourcesResponseMsg.newBuilder();
|
||||
String resourceType = requestMsg.getResourceType();
|
||||
ResourceType resourceType = ResourceType.valueOf(requestMsg.getResourceType());
|
||||
String resourceId = requestMsg.getResourceId();
|
||||
TransportProtos.GetResourceResponseMsg.Builder builder = TransportProtos.GetResourceResponseMsg.newBuilder();
|
||||
Resource resource = resourceService.getResource(tenantId, resourceType, resourceId);
|
||||
|
||||
List<TransportProtos.ResourceMsg> resources;
|
||||
|
||||
if (resourceType != null && resourceId != null) {
|
||||
resources = Collections.singletonList(toProto(
|
||||
resourceService.getResource(tenantId, ResourceType.valueOf(resourceType), resourceId)));
|
||||
} else {
|
||||
//TODO: add page link params to request proto if need or remove this
|
||||
resources = resourceService.findResourcesByTenantId(tenantId, new PageLink(100))
|
||||
.getData()
|
||||
.stream()
|
||||
.map(this::toProto)
|
||||
.collect(Collectors.toList());
|
||||
if (resource == null && !tenantId.equals(TenantId.SYS_TENANT_ID)) {
|
||||
resource = resourceService.getResource(TenantId.SYS_TENANT_ID, resourceType, resourceId);
|
||||
}
|
||||
|
||||
builder.addAllResources(resources);
|
||||
return Futures.immediateFuture(TransportApiResponseMsg.newBuilder().setResourcesResponseMsg(builder).build());
|
||||
if (resource != null) {
|
||||
builder.setResource(ByteString.copyFrom(dataDecodingEncodingService.encode(resource)));
|
||||
}
|
||||
|
||||
private TransportProtos.ResourceMsg toProto(Resource resource) {
|
||||
return TransportProtos.ResourceMsg.newBuilder()
|
||||
.setTenantIdMSB(resource.getTenantId().getId().getMostSignificantBits())
|
||||
.setTenantIdLSB(resource.getTenantId().getId().getLeastSignificantBits())
|
||||
.setResourceType(resource.getResourceType().name())
|
||||
.setResourceId(resource.getResourceId())
|
||||
.setValue(resource.getValue())
|
||||
.build();
|
||||
return Futures.immediateFuture(TransportApiResponseMsg.newBuilder().setResourceResponseMsg(builder).build());
|
||||
}
|
||||
|
||||
private ListenableFuture<TransportApiResponseMsg> getDeviceInfo(DeviceId deviceId, DeviceCredentials credentials) {
|
||||
|
||||
@ -18,8 +18,8 @@ package org.thingsboard.server.dao.resource;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.common.data.transport.resource.Resource;
|
||||
import org.thingsboard.server.common.data.transport.resource.ResourceType;
|
||||
import org.thingsboard.server.common.data.Resource;
|
||||
import org.thingsboard.server.common.data.ResourceType;
|
||||
|
||||
|
||||
public interface ResourceService {
|
||||
|
||||
@ -13,14 +13,18 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.common.data.transport.resource;
|
||||
package org.thingsboard.server.common.data;
|
||||
|
||||
import lombok.Data;
|
||||
import org.thingsboard.server.common.data.HasTenantId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
@Data
|
||||
public class Resource implements HasTenantId {
|
||||
public class Resource implements HasTenantId, Serializable {
|
||||
|
||||
private static final long serialVersionUID = 7379609705527272306L;
|
||||
|
||||
private TenantId tenantId;
|
||||
private ResourceType resourceType;
|
||||
private String resourceId;
|
||||
@ -13,7 +13,7 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.common.data.transport.resource;
|
||||
package org.thingsboard.server.common.data;
|
||||
|
||||
public enum ResourceType {
|
||||
LWM2M_MODEL, JKS, PKCS_12
|
||||
@ -201,23 +201,15 @@ message LwM2MResponseMsg {
|
||||
LwM2MRegistrationResponseMsg registrationMsg = 1;
|
||||
}
|
||||
|
||||
message ResourceMsg {
|
||||
int64 tenantIdMSB = 1;
|
||||
int64 tenantIdLSB = 2;
|
||||
string resourceType = 3;
|
||||
string resourceId = 4;
|
||||
string value = 5;
|
||||
}
|
||||
|
||||
message GetResourcesRequestMsg {
|
||||
message GetResourceRequestMsg {
|
||||
int64 tenantIdMSB = 1;
|
||||
int64 tenantIdLSB = 2;
|
||||
string resourceType = 3;
|
||||
string resourceId = 4;
|
||||
}
|
||||
|
||||
message GetResourcesResponseMsg {
|
||||
repeated ResourceMsg resources = 1;
|
||||
message GetResourceResponseMsg {
|
||||
bytes resource = 1;
|
||||
}
|
||||
|
||||
message ValidateDeviceLwM2MCredentialsRequestMsg {
|
||||
@ -558,7 +550,7 @@ message TransportApiRequestMsg {
|
||||
ValidateBasicMqttCredRequestMsg validateBasicMqttCredRequestMsg = 6;
|
||||
ProvisionDeviceRequestMsg provisionDeviceRequestMsg = 7;
|
||||
ValidateDeviceLwM2MCredentialsRequestMsg validateDeviceLwM2MCredentialsRequestMsg = 8;
|
||||
GetResourcesRequestMsg resourcesRequestMsg = 9;
|
||||
GetResourceRequestMsg resourceRequestMsg = 9;
|
||||
}
|
||||
|
||||
/* Response from ThingsBoard Core Service to Transport Service */
|
||||
@ -568,7 +560,7 @@ message TransportApiResponseMsg {
|
||||
GetEntityProfileResponseMsg entityProfileResponseMsg = 3;
|
||||
ProvisionDeviceResponseMsg provisionDeviceResponseMsg = 4;
|
||||
LwM2MResponseMsg lwM2MResponseMsg = 6;
|
||||
GetResourcesResponseMsg resourcesResponseMsg = 7;
|
||||
GetResourceResponseMsg resourceResponseMsg = 7;
|
||||
}
|
||||
|
||||
/* Messages that are handled by ThingsBoard Core Service */
|
||||
|
||||
@ -0,0 +1,29 @@
|
||||
/**
|
||||
* Copyright © 2016-2021 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.common.transport;
|
||||
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.Resource;
|
||||
import org.thingsboard.server.common.data.ResourceType;
|
||||
|
||||
public interface TransportResourceCache {
|
||||
|
||||
Resource get(TenantId tenantId, ResourceType resourceType, String resourceId);
|
||||
|
||||
void update(TenantId tenantId, ResourceType resourceType, String resourceI);
|
||||
|
||||
void evict(TenantId tenantId, ResourceType resourceType, String resourceId);
|
||||
}
|
||||
@ -25,8 +25,8 @@ import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeRequestM
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.GetEntityProfileRequestMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.GetEntityProfileResponseMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.GetOrCreateDeviceFromGatewayRequestMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.GetResourcesRequestMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.GetResourcesResponseMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.GetResourceRequestMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.GetResourceResponseMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.LwM2MRequestMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.LwM2MResponseMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg;
|
||||
@ -53,7 +53,7 @@ public interface TransportService {
|
||||
|
||||
GetEntityProfileResponseMsg getEntityProfile(GetEntityProfileRequestMsg msg);
|
||||
|
||||
GetResourcesResponseMsg getResources(GetResourcesRequestMsg msg);
|
||||
GetResourceResponseMsg getResource(GetResourceRequestMsg msg);
|
||||
|
||||
void process(DeviceTransportType transportType, ValidateDeviceTokenRequestMsg msg,
|
||||
TransportServiceCallback<ValidateDeviceCredentialsResponse> callback);
|
||||
|
||||
@ -0,0 +1,130 @@
|
||||
/**
|
||||
* Copyright © 2016-2021 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.common.transport.service;
|
||||
|
||||
import lombok.Data;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.Resource;
|
||||
import org.thingsboard.server.common.data.ResourceType;
|
||||
import org.thingsboard.server.common.transport.TransportResourceCache;
|
||||
import org.thingsboard.server.common.transport.TransportService;
|
||||
import org.thingsboard.server.common.transport.util.DataDecodingEncodingService;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.queue.util.TbTransportComponent;
|
||||
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
@TbTransportComponent
|
||||
public class DefaultTransportResourceCache implements TransportResourceCache {
|
||||
|
||||
private final Lock resourceFetchLock = new ReentrantLock();
|
||||
private final ConcurrentMap<ResourceKey, Resource> resources = new ConcurrentHashMap<>();
|
||||
private final Set<ResourceKey> keys = ConcurrentHashMap.newKeySet();
|
||||
private final DataDecodingEncodingService dataDecodingEncodingService;
|
||||
private final TransportService transportService;
|
||||
|
||||
public DefaultTransportResourceCache(DataDecodingEncodingService dataDecodingEncodingService, @Lazy TransportService transportService) {
|
||||
this.dataDecodingEncodingService = dataDecodingEncodingService;
|
||||
this.transportService = transportService;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Resource get(TenantId tenantId, ResourceType resourceType, String resourceId) {
|
||||
ResourceKey resourceKey = new ResourceKey(tenantId, resourceType, resourceId);
|
||||
Resource resource;
|
||||
|
||||
if (keys.contains(resourceKey)) {
|
||||
resource = resources.get(resourceKey);
|
||||
if (resource == null) {
|
||||
resource = resources.get(resourceKey.getSystemKey());
|
||||
}
|
||||
} else {
|
||||
resourceFetchLock.lock();
|
||||
try {
|
||||
if (keys.contains(resourceKey)) {
|
||||
resource = resources.get(resourceKey);
|
||||
if (resource == null) {
|
||||
resource = resources.get(resourceKey.getSystemKey());
|
||||
}
|
||||
} else {
|
||||
resource = fetchResource(resourceKey);
|
||||
keys.add(resourceKey);
|
||||
}
|
||||
} finally {
|
||||
resourceFetchLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
return resource;
|
||||
}
|
||||
|
||||
private Resource fetchResource(ResourceKey resourceKey) {
|
||||
UUID tenantId = resourceKey.getTenantId().getId();
|
||||
TransportProtos.GetResourceRequestMsg.Builder builder = TransportProtos.GetResourceRequestMsg.newBuilder();
|
||||
builder
|
||||
.setTenantIdLSB(tenantId.getLeastSignificantBits())
|
||||
.setTenantIdMSB(tenantId.getMostSignificantBits())
|
||||
.setResourceType(resourceKey.resourceType.name())
|
||||
.setResourceId(resourceKey.resourceId);
|
||||
TransportProtos.GetResourceResponseMsg responseMsg = transportService.getResource(builder.build());
|
||||
|
||||
Optional<Resource> optionalResource = dataDecodingEncodingService.decode(responseMsg.getResource().toByteArray());
|
||||
if (optionalResource.isPresent()) {
|
||||
Resource resource = optionalResource.get();
|
||||
resources.put(new ResourceKey(resource.getTenantId(), resource.getResourceType(), resource.getResourceId()), resource);
|
||||
return resource;
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void update(TenantId tenantId, ResourceType resourceType, String resourceId) {
|
||||
ResourceKey resourceKey = new ResourceKey(tenantId, resourceType, resourceId);
|
||||
if (keys.contains(resourceKey) || resources.containsKey(resourceKey)) {
|
||||
fetchResource(resourceKey);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void evict(TenantId tenantId, ResourceType resourceType, String resourceId) {
|
||||
ResourceKey resourceKey = new ResourceKey(tenantId, resourceType, resourceId);
|
||||
keys.remove(resourceKey);
|
||||
resources.remove(resourceKey);
|
||||
}
|
||||
|
||||
@Data
|
||||
private static class ResourceKey {
|
||||
private final TenantId tenantId;
|
||||
private final ResourceType resourceType;
|
||||
private final String resourceId;
|
||||
|
||||
public ResourceKey getSystemKey() {
|
||||
return new ResourceKey(TenantId.SYS_TENANT_ID, resourceType, resourceId);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -37,6 +37,7 @@ import org.thingsboard.server.common.data.id.DeviceProfileId;
|
||||
import org.thingsboard.server.common.data.id.RuleChainId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.id.TenantProfileId;
|
||||
import org.thingsboard.server.common.data.ResourceType;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
||||
import org.thingsboard.server.common.msg.queue.ServiceQueue;
|
||||
@ -49,6 +50,7 @@ import org.thingsboard.server.common.stats.StatsFactory;
|
||||
import org.thingsboard.server.common.stats.StatsType;
|
||||
import org.thingsboard.server.common.transport.SessionMsgListener;
|
||||
import org.thingsboard.server.common.transport.TransportDeviceProfileCache;
|
||||
import org.thingsboard.server.common.transport.TransportResourceCache;
|
||||
import org.thingsboard.server.common.transport.TransportService;
|
||||
import org.thingsboard.server.common.transport.TransportServiceCallback;
|
||||
import org.thingsboard.server.common.transport.TransportTenantProfileCache;
|
||||
@ -61,7 +63,6 @@ import org.thingsboard.server.common.transport.util.JsonUtils;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ProvisionDeviceRequestMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ProvisionDeviceResponseMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
|
||||
@ -131,6 +132,7 @@ public class DefaultTransportService implements TransportService {
|
||||
private final TransportRateLimitService rateLimitService;
|
||||
private final DataDecodingEncodingService dataDecodingEncodingService;
|
||||
private final SchedulerComponent scheduler;
|
||||
private final TransportResourceCache transportResourceCache;
|
||||
|
||||
protected TbQueueRequestTemplate<TbProtoQueueMsg<TransportApiRequestMsg>, TbProtoQueueMsg<TransportApiResponseMsg>> transportApiRequestTemplate;
|
||||
protected TbQueueProducer<TbProtoQueueMsg<ToRuleEngineMsg>> ruleEngineMsgProducer;
|
||||
@ -157,7 +159,7 @@ public class DefaultTransportService implements TransportService {
|
||||
TransportDeviceProfileCache deviceProfileCache,
|
||||
TransportTenantProfileCache tenantProfileCache,
|
||||
TbApiUsageClient apiUsageClient, TransportRateLimitService rateLimitService,
|
||||
DataDecodingEncodingService dataDecodingEncodingService, SchedulerComponent scheduler) {
|
||||
DataDecodingEncodingService dataDecodingEncodingService, SchedulerComponent scheduler, TransportResourceCache transportResourceCache) {
|
||||
this.serviceInfoProvider = serviceInfoProvider;
|
||||
this.queueProvider = queueProvider;
|
||||
this.producerProvider = producerProvider;
|
||||
@ -169,6 +171,7 @@ public class DefaultTransportService implements TransportService {
|
||||
this.rateLimitService = rateLimitService;
|
||||
this.dataDecodingEncodingService = dataDecodingEncodingService;
|
||||
this.scheduler = scheduler;
|
||||
this.transportResourceCache = transportResourceCache;
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
@ -255,12 +258,12 @@ public class DefaultTransportService implements TransportService {
|
||||
}
|
||||
|
||||
@Override
|
||||
public TransportProtos.GetResourcesResponseMsg getResources(TransportProtos.GetResourcesRequestMsg msg) {
|
||||
public TransportProtos.GetResourceResponseMsg getResource(TransportProtos.GetResourceRequestMsg msg) {
|
||||
TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg> protoMsg =
|
||||
new TbProtoQueueMsg<>(UUID.randomUUID(), TransportProtos.TransportApiRequestMsg.newBuilder().setResourcesRequestMsg(msg).build());
|
||||
new TbProtoQueueMsg<>(UUID.randomUUID(), TransportProtos.TransportApiRequestMsg.newBuilder().setResourceRequestMsg(msg).build());
|
||||
try {
|
||||
TbProtoQueueMsg<TransportApiResponseMsg> response = transportApiRequestTemplate.send(protoMsg).get();
|
||||
return response.getValue().getResourcesResponseMsg();
|
||||
return response.getValue().getResourceResponseMsg();
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
@ -701,9 +704,17 @@ public class DefaultTransportService implements TransportService {
|
||||
rateLimitService.remove(new DeviceId(entityUuid));
|
||||
}
|
||||
} else if (toSessionMsg.hasResourceUpdateMsg()) {
|
||||
//TODO: update resource cache
|
||||
TransportProtos.ResourceUpdateMsg msg = toSessionMsg.getResourceUpdateMsg();
|
||||
TenantId tenantId = new TenantId(new UUID(msg.getTenantIdMSB(), msg.getTenantIdLSB()));
|
||||
ResourceType resourceType = ResourceType.valueOf(msg.getResourceType());
|
||||
String resourceId = msg.getResourceId();
|
||||
transportResourceCache.update(tenantId, resourceType, resourceId);
|
||||
} else if (toSessionMsg.hasResourceDeleteMsg()) {
|
||||
//TODO: remove resource from cache
|
||||
TransportProtos.ResourceDeleteMsg msg = toSessionMsg.getResourceDeleteMsg();
|
||||
TenantId tenantId = new TenantId(new UUID(msg.getTenantIdMSB(), msg.getTenantIdLSB()));
|
||||
ResourceType resourceType = ResourceType.valueOf(msg.getResourceType());
|
||||
String resourceId = msg.getResourceId();
|
||||
transportResourceCache.evict(tenantId, resourceType, resourceId);
|
||||
} else {
|
||||
//TODO: should we notify the device actor about missed session?
|
||||
log.debug("[{}] Missing session.", sessionId);
|
||||
|
||||
@ -18,8 +18,6 @@ package org.thingsboard.server.common.transport.util;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.nustaq.serialization.FSTConfiguration;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.server.common.msg.TbActorMsg;
|
||||
import org.thingsboard.server.common.transport.util.DataDecodingEncodingService;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
@ -33,8 +31,8 @@ public class ProtoWithFSTService implements DataDecodingEncodingService {
|
||||
public <T> Optional<T> decode(byte[] byteArray) {
|
||||
try {
|
||||
@SuppressWarnings("unchecked")
|
||||
T msg = (T) config.asObject(byteArray);
|
||||
return Optional.of(msg);
|
||||
T msg = byteArray != null && byteArray.length > 0 ? (T) config.asObject(byteArray) : null;
|
||||
return Optional.ofNullable(msg);
|
||||
} catch (IllegalArgumentException e) {
|
||||
log.error("Error during deserialization message, [{}]", e.getMessage());
|
||||
return Optional.empty();
|
||||
|
||||
@ -18,8 +18,7 @@ package org.thingsboard.server.dao.model.sql;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import org.thingsboard.server.common.data.relation.EntityRelation;
|
||||
import org.thingsboard.server.common.data.transport.resource.Resource;
|
||||
import org.thingsboard.server.common.data.Resource;
|
||||
|
||||
import javax.persistence.Transient;
|
||||
import java.io.Serializable;
|
||||
|
||||
@ -17,8 +17,8 @@ package org.thingsboard.server.dao.model.sql;
|
||||
|
||||
import lombok.Data;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.transport.resource.Resource;
|
||||
import org.thingsboard.server.common.data.transport.resource.ResourceType;
|
||||
import org.thingsboard.server.common.data.Resource;
|
||||
import org.thingsboard.server.common.data.ResourceType;
|
||||
import org.thingsboard.server.dao.model.ToData;
|
||||
|
||||
import javax.persistence.Column;
|
||||
|
||||
@ -20,8 +20,8 @@ import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.common.data.transport.resource.Resource;
|
||||
import org.thingsboard.server.common.data.transport.resource.ResourceType;
|
||||
import org.thingsboard.server.common.data.Resource;
|
||||
import org.thingsboard.server.common.data.ResourceType;
|
||||
import org.thingsboard.server.dao.exception.DataValidationException;
|
||||
|
||||
import static org.thingsboard.server.dao.device.DeviceServiceImpl.INCORRECT_TENANT_ID;
|
||||
|
||||
@ -18,8 +18,8 @@ package org.thingsboard.server.dao.resource;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.common.data.transport.resource.Resource;
|
||||
import org.thingsboard.server.common.data.transport.resource.ResourceType;
|
||||
import org.thingsboard.server.common.data.Resource;
|
||||
import org.thingsboard.server.common.data.ResourceType;
|
||||
|
||||
public interface ResourceDao {
|
||||
|
||||
|
||||
@ -21,8 +21,8 @@ import org.springframework.transaction.annotation.Transactional;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.common.data.transport.resource.Resource;
|
||||
import org.thingsboard.server.common.data.transport.resource.ResourceType;
|
||||
import org.thingsboard.server.common.data.Resource;
|
||||
import org.thingsboard.server.common.data.ResourceType;
|
||||
import org.thingsboard.server.dao.DaoUtil;
|
||||
import org.thingsboard.server.dao.model.sql.ResourceCompositeKey;
|
||||
import org.thingsboard.server.dao.model.sql.ResourceEntity;
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user