diff --git a/application/src/main/java/org/thingsboard/server/controller/BaseController.java b/application/src/main/java/org/thingsboard/server/controller/BaseController.java index 142a8520bb..8434f67d1d 100644 --- a/application/src/main/java/org/thingsboard/server/controller/BaseController.java +++ b/application/src/main/java/org/thingsboard/server/controller/BaseController.java @@ -105,6 +105,7 @@ import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.provider.TbQueueProducerProvider; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.component.ComponentDiscoveryService; +import org.thingsboard.server.service.profile.TbDeviceProfileCache; import org.thingsboard.server.service.queue.TbClusterService; import org.thingsboard.server.service.security.model.SecurityUser; import org.thingsboard.server.service.security.permission.AccessControlService; @@ -210,6 +211,9 @@ public abstract class BaseController { @Autowired protected TbQueueProducerProvider producerProvider; + @Autowired + protected TbDeviceProfileCache deviceProfileCache; + @Value("${server.log_controller_error_stack_trace}") @Getter private boolean logControllerErrorStackTrace; diff --git a/application/src/main/java/org/thingsboard/server/controller/DeviceProfileController.java b/application/src/main/java/org/thingsboard/server/controller/DeviceProfileController.java index 1636e2724c..b474a0c0f1 100644 --- a/application/src/main/java/org/thingsboard/server/controller/DeviceProfileController.java +++ b/application/src/main/java/org/thingsboard/server/controller/DeviceProfileController.java @@ -34,6 +34,7 @@ import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.security.permission.Operation; import org.thingsboard.server.service.security.permission.Resource; @@ -86,13 +87,17 @@ public class DeviceProfileController extends BaseController { @ResponseBody public DeviceProfile saveDeviceProfile(@RequestBody DeviceProfile deviceProfile) throws ThingsboardException { try { + boolean created = deviceProfile.getId() == null; deviceProfile.setTenantId(getTenantId()); checkEntity(deviceProfile.getId(), deviceProfile, Resource.DEVICE_PROFILE); DeviceProfile savedDeviceProfile = checkNotNull(deviceProfileService.saveDeviceProfile(deviceProfile)); + deviceProfileCache.put(savedDeviceProfile); tbClusterService.onDeviceProfileChange(savedDeviceProfile, null); + tbClusterService.onEntityStateChange(deviceProfile.getTenantId(), savedDeviceProfile.getId(), + created ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED); logEntityAction(savedDeviceProfile.getId(), savedDeviceProfile, null, @@ -115,6 +120,10 @@ public class DeviceProfileController extends BaseController { DeviceProfileId deviceProfileId = new DeviceProfileId(toUUID(strDeviceProfileId)); DeviceProfile deviceProfile = checkDeviceProfileId(deviceProfileId, Operation.DELETE); deviceProfileService.deleteDeviceProfile(getTenantId(), deviceProfileId); + deviceProfileCache.evict(deviceProfileId); + + tbClusterService.onDeviceProfileDelete(deviceProfile, null); + tbClusterService.onEntityStateChange(deviceProfile.getTenantId(), deviceProfile.getId(), ComponentLifecycleEvent.DELETED); logEntityAction(deviceProfileId, deviceProfile, null, @@ -180,10 +189,10 @@ public class DeviceProfileController extends BaseController { @RequestMapping(value = "/deviceProfileInfos", params = {"pageSize", "page"}, method = RequestMethod.GET) @ResponseBody public PageData getDeviceProfileInfos(@RequestParam int pageSize, - @RequestParam int page, - @RequestParam(required = false) String textSearch, - @RequestParam(required = false) String sortProperty, - @RequestParam(required = false) String sortOrder) throws ThingsboardException { + @RequestParam int page, + @RequestParam(required = false) String textSearch, + @RequestParam(required = false) String sortProperty, + @RequestParam(required = false) String sortOrder) throws ThingsboardException { try { PageLink pageLink = createPageLink(pageSize, page, textSearch, sortProperty, sortOrder); return checkNotNull(deviceProfileService.findDeviceProfileInfos(getTenantId(), pageLink)); diff --git a/application/src/main/java/org/thingsboard/server/service/profile/DefaultTbDeviceProfileCache.java b/application/src/main/java/org/thingsboard/server/service/profile/DefaultTbDeviceProfileCache.java new file mode 100644 index 0000000000..085bb26e8c --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/profile/DefaultTbDeviceProfileCache.java @@ -0,0 +1,99 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.profile; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.DeviceProfile; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.DeviceProfileId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.dao.device.DeviceProfileService; +import org.thingsboard.server.dao.device.DeviceService; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +@Service +@Slf4j +public class DefaultTbDeviceProfileCache implements TbDeviceProfileCache { + + private final Lock deviceProfileFetchLock = new ReentrantLock(); + private final DeviceProfileService deviceProfileService; + private final DeviceService deviceService; + + private final ConcurrentMap deviceProfilesMap = new ConcurrentHashMap<>(); + private final ConcurrentMap devicesMap = new ConcurrentHashMap<>(); + + public DefaultTbDeviceProfileCache(DeviceProfileService deviceProfileService, DeviceService deviceService) { + this.deviceProfileService = deviceProfileService; + this.deviceService = deviceService; + } + + @Override + public DeviceProfile get(TenantId tenantId, DeviceProfileId deviceProfileId) { + DeviceProfile profile = deviceProfilesMap.get(deviceProfileId); + if (profile == null) { + deviceProfileFetchLock.lock(); + profile = deviceProfilesMap.get(deviceProfileId); + if (profile == null) { + try { + profile = deviceProfileService.findDeviceProfileById(tenantId, deviceProfileId); + if (profile != null) { + deviceProfilesMap.put(deviceProfileId, profile); + } + } finally { + deviceProfileFetchLock.unlock(); + } + } + } + return profile; + } + + @Override + public DeviceProfile get(TenantId tenantId, DeviceId deviceId) { + DeviceProfileId profileId = devicesMap.get(deviceId); + if (profileId == null) { + Device device = deviceService.findDeviceById(tenantId, deviceId); + if (device != null) { + profileId = device.getDeviceProfileId(); + devicesMap.put(deviceId, profileId); + } + } + return get(tenantId, profileId); + } + + @Override + public void put(DeviceProfile profile) { + if (profile.getId() != null) { + deviceProfilesMap.put(profile.getId(), profile); + } + } + + @Override + public void evict(DeviceProfileId profileId) { + deviceProfilesMap.remove(profileId); + } + + @Override + public void evict(DeviceId deviceId) { + devicesMap.remove(deviceId); + } + +} diff --git a/application/src/main/java/org/thingsboard/server/service/profile/TbDeviceProfileCache.java b/application/src/main/java/org/thingsboard/server/service/profile/TbDeviceProfileCache.java new file mode 100644 index 0000000000..ea14a71072 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/profile/TbDeviceProfileCache.java @@ -0,0 +1,35 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.profile; + +import org.thingsboard.server.common.data.DeviceProfile; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.DeviceProfileId; +import org.thingsboard.server.common.data.id.TenantId; + +public interface TbDeviceProfileCache { + + DeviceProfile get(TenantId tenantId, DeviceProfileId deviceProfileId); + + DeviceProfile get(TenantId tenantId, DeviceId deviceId); + + void put(DeviceProfile profile); + + void evict(DeviceProfileId id); + + void evict(DeviceId id); + +} diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index 4a56b94ac9..28cd998e46 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -23,13 +23,17 @@ import org.springframework.stereotype.Service; import org.thingsboard.rule.engine.api.msg.ToDeviceActorNotificationMsg; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; +import org.thingsboard.server.common.transport.util.DataDecodingEncodingService; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.FromDeviceRPCResponseProto; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; @@ -42,7 +46,7 @@ import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.provider.TbQueueProducerProvider; -import org.thingsboard.server.common.transport.util.DataDecodingEncodingService; +import org.thingsboard.server.service.profile.TbDeviceProfileCache; import org.thingsboard.server.service.rpc.FromDeviceRpcResponse; import java.util.HashSet; @@ -66,11 +70,13 @@ public class DefaultTbClusterService implements TbClusterService { private final TbQueueProducerProvider producerProvider; private final PartitionService partitionService; private final DataDecodingEncodingService encodingService; + private final TbDeviceProfileCache deviceProfileCache; - public DefaultTbClusterService(TbQueueProducerProvider producerProvider, PartitionService partitionService, DataDecodingEncodingService encodingService) { + public DefaultTbClusterService(TbQueueProducerProvider producerProvider, PartitionService partitionService, DataDecodingEncodingService encodingService, TbDeviceProfileCache deviceProfileCache) { this.producerProvider = producerProvider; this.partitionService = partitionService; this.encodingService = encodingService; + this.deviceProfileCache = deviceProfileCache; } @Override @@ -126,6 +132,12 @@ public class DefaultTbClusterService implements TbClusterService { log.warn("[{}][{}] Received invalid message: {}", tenantId, entityId, tbMsg); return; } + } else { + if (entityId.getEntityType().equals(EntityType.DEVICE)) { + tbMsg = transformMsg(tbMsg, deviceProfileCache.get(tenantId, new DeviceId(entityId.getId()))); + } else if (entityId.getEntityType().equals(EntityType.DEVICE_PROFILE)) { + tbMsg = transformMsg(tbMsg, deviceProfileCache.get(tenantId, new DeviceProfileId(entityId.getId()))); + } } TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, entityId); log.trace("PUSHING msg: {} to:{}", tbMsg, tpi); @@ -137,6 +149,16 @@ public class DefaultTbClusterService implements TbClusterService { toRuleEngineMsgs.incrementAndGet(); } + private TbMsg transformMsg(TbMsg tbMsg, DeviceProfile deviceProfile) { + if (deviceProfile != null) { + RuleChainId targetRuleChainId = deviceProfile.getDefaultRuleChainId(); + if (targetRuleChainId != null && !targetRuleChainId.equals(tbMsg.getRuleChainId())) { + tbMsg = TbMsg.transformMsg(tbMsg, targetRuleChainId); + } + } + return tbMsg; + } + @Override public void pushNotificationToRuleEngine(String serviceId, FromDeviceRpcResponse response, TbQueueCallback callback) { TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceId); @@ -167,11 +189,27 @@ public class DefaultTbClusterService implements TbClusterService { @Override public void onDeviceProfileChange(DeviceProfile deviceProfile, TbQueueCallback callback) { - log.trace("[{}][{}] Processing device profile [{}] event", deviceProfile.getTenantId(), deviceProfile.getId(), deviceProfile.getName()); + log.trace("[{}][{}] Processing device profile [{}] change event", deviceProfile.getTenantId(), deviceProfile.getId(), deviceProfile.getName()); + TransportProtos.DeviceProfileUpdateMsg profileUpdateMsg = TransportProtos.DeviceProfileUpdateMsg.newBuilder() + .setData(ByteString.copyFrom(encodingService.encode(deviceProfile))).build(); + ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setDeviceProfileUpdateMsg(profileUpdateMsg).build(); + broadcast(transportMsg); + } + + @Override + public void onDeviceProfileDelete(DeviceProfile deviceProfile, TbQueueCallback callback) { + log.trace("[{}][{}] Processing device profile [{}] delete event", deviceProfile.getTenantId(), deviceProfile.getId(), deviceProfile.getName()); + TransportProtos.DeviceProfileDeleteMsg profileDeleteMsg = TransportProtos.DeviceProfileDeleteMsg.newBuilder() + .setProfileIdMSB(deviceProfile.getId().getId().getMostSignificantBits()) + .setProfileIdLSB(deviceProfile.getId().getId().getLeastSignificantBits()) + .build(); + ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setDeviceProfileDeleteMsg(profileDeleteMsg).build(); + broadcast(transportMsg); + } + + private void broadcast(ToTransportMsg transportMsg) { TbQueueProducer> toTransportNfProducer = producerProvider.getTransportNotificationsMsgProducer(); Set tbTransportServices = partitionService.getAllServiceIds(ServiceType.TB_TRANSPORT); - TransportProtos.DeviceProfileUpdateMsg profileUpdateMsg = TransportProtos.DeviceProfileUpdateMsg.newBuilder().setData(ByteString.copyFrom(encodingService.encode(deviceProfile))).build(); - ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setDeviceProfileUpdateMsg(profileUpdateMsg).build(); for (String transportServiceId : tbTransportServices) { TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_TRANSPORT, transportServiceId); toTransportNfProducer.send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), transportMsg), null); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index 1dba860fcd..5d240790f2 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -21,10 +21,13 @@ import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.thingsboard.rule.engine.api.RpcError; import org.thingsboard.server.actors.ActorSystemContext; +import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.alarm.Alarm; +import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.MsgType; import org.thingsboard.server.common.msg.TbActorMsg; +import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.dao.util.mapping.JacksonUtil; @@ -48,6 +51,7 @@ import org.thingsboard.server.queue.provider.TbCoreQueueFactory; import org.thingsboard.server.common.stats.StatsFactory; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.common.transport.util.DataDecodingEncodingService; +import org.thingsboard.server.service.profile.TbDeviceProfileCache; import org.thingsboard.server.service.queue.processing.AbstractConsumerService; import org.thingsboard.server.service.rpc.FromDeviceRpcResponse; import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService; @@ -92,8 +96,8 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService actorMsg = encodingService.decode(toCoreNotification.getComponentLifecycleMsg().toByteArray()); - if (actorMsg.isPresent()) { - log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get()); - actorContext.tellWithHighPriority(actorMsg.get()); - } + handleComponentLifecycleMsg(id, toCoreNotification.getComponentLifecycleMsg()); callback.onSuccess(); } if (statsEnabled) { diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java index 69d2b04d2a..fd356cabcc 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.service.queue; +import com.google.protobuf.ByteString; import com.google.protobuf.ProtocolStringList; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; @@ -38,6 +39,7 @@ import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; import org.thingsboard.server.queue.util.TbRuleEngineComponent; import org.thingsboard.server.common.transport.util.DataDecodingEncodingService; +import org.thingsboard.server.service.profile.TbDeviceProfileCache; import org.thingsboard.server.service.queue.processing.*; import org.thingsboard.server.service.rpc.FromDeviceRpcResponse; import org.thingsboard.server.service.rpc.TbRuleEngineDeviceRpcService; @@ -80,8 +82,8 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< TbRuleEngineQueueFactory tbRuleEngineQueueFactory, RuleEngineStatisticsService statisticsService, ActorSystemContext actorContext, DataDecodingEncodingService encodingService, TbRuleEngineDeviceRpcService tbDeviceRpcService, - StatsFactory statsFactory) { - super(actorContext, encodingService, tbRuleEngineQueueFactory.createToRuleEngineNotificationsMsgConsumer()); + StatsFactory statsFactory, TbDeviceProfileCache deviceProfileCache) { + super(actorContext, encodingService, deviceProfileCache, tbRuleEngineQueueFactory.createToRuleEngineNotificationsMsgConsumer()); this.statisticsService = statisticsService; this.ruleEngineSettings = ruleEngineSettings; this.tbRuleEngineQueueFactory = tbRuleEngineQueueFactory; @@ -239,11 +241,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< protected void handleNotification(UUID id, TbProtoQueueMsg msg, TbCallback callback) throws Exception { ToRuleEngineNotificationMsg nfMsg = msg.getValue(); if (nfMsg.getComponentLifecycleMsg() != null && !nfMsg.getComponentLifecycleMsg().isEmpty()) { - Optional actorMsg = encodingService.decode(nfMsg.getComponentLifecycleMsg().toByteArray()); - if (actorMsg.isPresent()) { - log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get()); - actorContext.tellWithHighPriority(actorMsg.get()); - } + handleComponentLifecycleMsg(id, nfMsg.getComponentLifecycleMsg()); callback.onSuccess(); } else if (nfMsg.hasFromDeviceRpcResponse()) { TransportProtos.FromDeviceRPCResponseProto proto = nfMsg.getFromDeviceRpcResponse(); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/TbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/TbClusterService.java index 5f72cfa693..cf212a06f5 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/TbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/TbClusterService.java @@ -17,6 +17,7 @@ package org.thingsboard.server.service.queue; import org.thingsboard.rule.engine.api.msg.ToDeviceActorNotificationMsg; import org.thingsboard.server.common.data.DeviceProfile; +import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; @@ -52,4 +53,5 @@ public interface TbClusterService { void onDeviceProfileChange(DeviceProfile deviceProfile, TbQueueCallback callback); + void onDeviceProfileDelete(DeviceProfile deviceProfileId, TbQueueCallback callback); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java index 5c2ec700f8..47beb9e796 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java @@ -15,23 +15,31 @@ */ package org.thingsboard.server.service.queue.processing; +import com.google.protobuf.ByteString; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.ApplicationListener; import org.springframework.context.event.EventListener; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.actors.ActorSystemContext; +import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.DeviceProfileId; +import org.thingsboard.server.common.msg.TbActorMsg; +import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.PartitionChangeEvent; import org.thingsboard.server.common.transport.util.DataDecodingEncodingService; +import org.thingsboard.server.service.profile.TbDeviceProfileCache; import org.thingsboard.server.service.queue.TbPackCallback; import org.thingsboard.server.service.queue.TbPackProcessingContext; import javax.annotation.PreDestroy; import java.util.List; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -51,12 +59,15 @@ public abstract class AbstractConsumerService> nfConsumer; - public AbstractConsumerService(ActorSystemContext actorContext, DataDecodingEncodingService encodingService, TbQueueConsumer> nfConsumer) { + public AbstractConsumerService(ActorSystemContext actorContext, DataDecodingEncodingService encodingService, + TbDeviceProfileCache deviceProfileCache, TbQueueConsumer> nfConsumer) { this.actorContext = actorContext; this.encodingService = encodingService; + this.deviceProfileCache = deviceProfileCache; this.nfConsumer = nfConsumer; } @@ -126,6 +137,23 @@ public abstract class AbstractConsumerService actorMsgOpt = encodingService.decode(nfMsg.toByteArray()); + if (actorMsgOpt.isPresent()) { + TbActorMsg actorMsg = actorMsgOpt.get(); + if (actorMsg instanceof ComponentLifecycleMsg) { + ComponentLifecycleMsg componentLifecycleMsg = (ComponentLifecycleMsg) actorMsg; + if (EntityType.DEVICE_PROFILE.equals(componentLifecycleMsg.getEntityId().getEntityType())) { + deviceProfileCache.evict(new DeviceProfileId(componentLifecycleMsg.getEntityId().getId())); + } else if (EntityType.DEVICE.equals(componentLifecycleMsg.getEntityId().getEntityType())) { + deviceProfileCache.evict(new DeviceId(componentLifecycleMsg.getEntityId().getId())); + } + } + log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg); + actorContext.tellWithHighPriority(actorMsg); + } + } + protected abstract void handleNotification(UUID id, TbProtoQueueMsg msg, TbCallback callback) throws Exception; @PreDestroy diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java index 23710cc5ea..f0af529be9 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java @@ -26,6 +26,7 @@ import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.RuleNodeId; import org.thingsboard.server.common.msg.gen.MsgProtos; +import org.thingsboard.server.common.msg.queue.RuleNodeInfo; import org.thingsboard.server.common.msg.queue.ServiceQueue; import org.thingsboard.server.common.msg.queue.TbMsgCallback; @@ -84,6 +85,11 @@ public final class TbMsg implements Serializable { data, origMsg.getRuleChainId(), origMsg.getRuleNodeId(), origMsg.getCallback()); } + public static TbMsg transformMsg(TbMsg origMsg, RuleChainId ruleChainId) { + return new TbMsg(origMsg.queueName, origMsg.id, origMsg.ts, origMsg.type, origMsg.originator, origMsg.metaData, origMsg.dataType, + origMsg.data, ruleChainId, null, origMsg.getCallback()); + } + public static TbMsg newMsg(TbMsg tbMsg, RuleChainId ruleChainId, RuleNodeId ruleNodeId) { return new TbMsg(tbMsg.getQueueName(), UUID.randomUUID(), tbMsg.getTs(), tbMsg.getType(), tbMsg.getOriginator(), tbMsg.getMetaData().copy(), tbMsg.getDataType(), tbMsg.getData(), ruleChainId, ruleNodeId, TbMsgCallback.EMPTY); diff --git a/common/queue/src/main/proto/queue.proto b/common/queue/src/main/proto/queue.proto index b833ef1b7a..331c17b9d7 100644 --- a/common/queue/src/main/proto/queue.proto +++ b/common/queue/src/main/proto/queue.proto @@ -194,6 +194,11 @@ message DeviceProfileUpdateMsg { bytes data = 1; } +message DeviceProfileDeleteMsg { + int64 profileIdMSB = 1; + int64 profileIdLSB = 2; +} + message SessionCloseNotificationProto { string message = 1; } @@ -485,4 +490,5 @@ message ToTransportMsg { ToDeviceRpcRequestMsg toDeviceRequest = 6; ToServerRpcResponseMsg toServerResponse = 7; DeviceProfileUpdateMsg deviceProfileUpdateMsg = 8; + DeviceProfileDeleteMsg deviceProfileDeleteMsg = 9; } diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportProfileCache.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportProfileCache.java index f34b76ade9..ee05e59010 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportProfileCache.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/TransportProfileCache.java @@ -23,7 +23,6 @@ import java.util.Optional; public interface TransportProfileCache { - DeviceProfile getOrCreate(DeviceProfileId id, ByteString profileBody); DeviceProfile get(DeviceProfileId id); @@ -32,4 +31,6 @@ public interface TransportProfileCache { DeviceProfile put(ByteString profileBody); + void evict(DeviceProfileId id); + } diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportProfileCache.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportProfileCache.java index afa8e15e20..4d955de70c 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportProfileCache.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportProfileCache.java @@ -74,4 +74,9 @@ public class DefaultTransportProfileCache implements TransportProfileCache { return null; } } + + @Override + public void evict(DeviceProfileId id) { + deviceProfiles.remove(id); + } } diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index 8a5966f6af..72727e39de 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -629,6 +629,11 @@ public class DefaultTransportService implements TransportService { if (deviceProfile != null) { onProfileUpdate(deviceProfile); } + } else if (toSessionMsg.hasDeviceProfileDeleteMsg()) { + transportProfileCache.evict(new DeviceProfileId(new UUID( + toSessionMsg.getDeviceProfileDeleteMsg().getProfileIdMSB(), + toSessionMsg.getDeviceProfileDeleteMsg().getProfileIdLSB() + ))); } else { //TODO: should we notify the device actor about missed session? log.debug("[{}] Missing session.", sessionId);