Merge branch 'develop/3.2' of github.com:thingsboard/thingsboard into develop/3.2
This commit is contained in:
commit
4f280666e3
@ -105,6 +105,7 @@ import org.thingsboard.server.queue.discovery.PartitionService;
|
||||
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
import org.thingsboard.server.service.component.ComponentDiscoveryService;
|
||||
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
|
||||
import org.thingsboard.server.service.queue.TbClusterService;
|
||||
import org.thingsboard.server.service.security.model.SecurityUser;
|
||||
import org.thingsboard.server.service.security.permission.AccessControlService;
|
||||
@ -210,6 +211,9 @@ public abstract class BaseController {
|
||||
@Autowired
|
||||
protected TbQueueProducerProvider producerProvider;
|
||||
|
||||
@Autowired
|
||||
protected TbDeviceProfileCache deviceProfileCache;
|
||||
|
||||
@Value("${server.log_controller_error_stack_trace}")
|
||||
@Getter
|
||||
private boolean logControllerErrorStackTrace;
|
||||
|
||||
@ -34,6 +34,7 @@ import org.thingsboard.server.common.data.exception.ThingsboardException;
|
||||
import org.thingsboard.server.common.data.id.DeviceProfileId;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
import org.thingsboard.server.service.security.permission.Operation;
|
||||
import org.thingsboard.server.service.security.permission.Resource;
|
||||
@ -86,13 +87,17 @@ public class DeviceProfileController extends BaseController {
|
||||
@ResponseBody
|
||||
public DeviceProfile saveDeviceProfile(@RequestBody DeviceProfile deviceProfile) throws ThingsboardException {
|
||||
try {
|
||||
boolean created = deviceProfile.getId() == null;
|
||||
deviceProfile.setTenantId(getTenantId());
|
||||
|
||||
checkEntity(deviceProfile.getId(), deviceProfile, Resource.DEVICE_PROFILE);
|
||||
|
||||
DeviceProfile savedDeviceProfile = checkNotNull(deviceProfileService.saveDeviceProfile(deviceProfile));
|
||||
|
||||
deviceProfileCache.put(savedDeviceProfile);
|
||||
tbClusterService.onDeviceProfileChange(savedDeviceProfile, null);
|
||||
tbClusterService.onEntityStateChange(deviceProfile.getTenantId(), savedDeviceProfile.getId(),
|
||||
created ? ComponentLifecycleEvent.CREATED : ComponentLifecycleEvent.UPDATED);
|
||||
|
||||
logEntityAction(savedDeviceProfile.getId(), savedDeviceProfile,
|
||||
null,
|
||||
@ -115,6 +120,10 @@ public class DeviceProfileController extends BaseController {
|
||||
DeviceProfileId deviceProfileId = new DeviceProfileId(toUUID(strDeviceProfileId));
|
||||
DeviceProfile deviceProfile = checkDeviceProfileId(deviceProfileId, Operation.DELETE);
|
||||
deviceProfileService.deleteDeviceProfile(getTenantId(), deviceProfileId);
|
||||
deviceProfileCache.evict(deviceProfileId);
|
||||
|
||||
tbClusterService.onDeviceProfileDelete(deviceProfile, null);
|
||||
tbClusterService.onEntityStateChange(deviceProfile.getTenantId(), deviceProfile.getId(), ComponentLifecycleEvent.DELETED);
|
||||
|
||||
logEntityAction(deviceProfileId, deviceProfile,
|
||||
null,
|
||||
@ -180,10 +189,10 @@ public class DeviceProfileController extends BaseController {
|
||||
@RequestMapping(value = "/deviceProfileInfos", params = {"pageSize", "page"}, method = RequestMethod.GET)
|
||||
@ResponseBody
|
||||
public PageData<DeviceProfileInfo> getDeviceProfileInfos(@RequestParam int pageSize,
|
||||
@RequestParam int page,
|
||||
@RequestParam(required = false) String textSearch,
|
||||
@RequestParam(required = false) String sortProperty,
|
||||
@RequestParam(required = false) String sortOrder) throws ThingsboardException {
|
||||
@RequestParam int page,
|
||||
@RequestParam(required = false) String textSearch,
|
||||
@RequestParam(required = false) String sortProperty,
|
||||
@RequestParam(required = false) String sortOrder) throws ThingsboardException {
|
||||
try {
|
||||
PageLink pageLink = createPageLink(pageSize, page, textSearch, sortProperty, sortOrder);
|
||||
return checkNotNull(deviceProfileService.findDeviceProfileInfos(getTenantId(), pageLink));
|
||||
|
||||
@ -0,0 +1,99 @@
|
||||
/**
|
||||
* Copyright © 2016-2020 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.service.profile;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.server.common.data.Device;
|
||||
import org.thingsboard.server.common.data.DeviceProfile;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.DeviceProfileId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.dao.device.DeviceProfileService;
|
||||
import org.thingsboard.server.dao.device.DeviceService;
|
||||
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
@Service
|
||||
@Slf4j
|
||||
public class DefaultTbDeviceProfileCache implements TbDeviceProfileCache {
|
||||
|
||||
private final Lock deviceProfileFetchLock = new ReentrantLock();
|
||||
private final DeviceProfileService deviceProfileService;
|
||||
private final DeviceService deviceService;
|
||||
|
||||
private final ConcurrentMap<DeviceProfileId, DeviceProfile> deviceProfilesMap = new ConcurrentHashMap<>();
|
||||
private final ConcurrentMap<DeviceId, DeviceProfileId> devicesMap = new ConcurrentHashMap<>();
|
||||
|
||||
public DefaultTbDeviceProfileCache(DeviceProfileService deviceProfileService, DeviceService deviceService) {
|
||||
this.deviceProfileService = deviceProfileService;
|
||||
this.deviceService = deviceService;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DeviceProfile get(TenantId tenantId, DeviceProfileId deviceProfileId) {
|
||||
DeviceProfile profile = deviceProfilesMap.get(deviceProfileId);
|
||||
if (profile == null) {
|
||||
deviceProfileFetchLock.lock();
|
||||
profile = deviceProfilesMap.get(deviceProfileId);
|
||||
if (profile == null) {
|
||||
try {
|
||||
profile = deviceProfileService.findDeviceProfileById(tenantId, deviceProfileId);
|
||||
if (profile != null) {
|
||||
deviceProfilesMap.put(deviceProfileId, profile);
|
||||
}
|
||||
} finally {
|
||||
deviceProfileFetchLock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
return profile;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DeviceProfile get(TenantId tenantId, DeviceId deviceId) {
|
||||
DeviceProfileId profileId = devicesMap.get(deviceId);
|
||||
if (profileId == null) {
|
||||
Device device = deviceService.findDeviceById(tenantId, deviceId);
|
||||
if (device != null) {
|
||||
profileId = device.getDeviceProfileId();
|
||||
devicesMap.put(deviceId, profileId);
|
||||
}
|
||||
}
|
||||
return get(tenantId, profileId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void put(DeviceProfile profile) {
|
||||
if (profile.getId() != null) {
|
||||
deviceProfilesMap.put(profile.getId(), profile);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void evict(DeviceProfileId profileId) {
|
||||
deviceProfilesMap.remove(profileId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void evict(DeviceId deviceId) {
|
||||
devicesMap.remove(deviceId);
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,35 @@
|
||||
/**
|
||||
* Copyright © 2016-2020 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.service.profile;
|
||||
|
||||
import org.thingsboard.server.common.data.DeviceProfile;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.DeviceProfileId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
|
||||
public interface TbDeviceProfileCache {
|
||||
|
||||
DeviceProfile get(TenantId tenantId, DeviceProfileId deviceProfileId);
|
||||
|
||||
DeviceProfile get(TenantId tenantId, DeviceId deviceId);
|
||||
|
||||
void put(DeviceProfile profile);
|
||||
|
||||
void evict(DeviceProfileId id);
|
||||
|
||||
void evict(DeviceId id);
|
||||
|
||||
}
|
||||
@ -23,13 +23,17 @@ import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.rule.engine.api.msg.ToDeviceActorNotificationMsg;
|
||||
import org.thingsboard.server.common.data.DeviceProfile;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.DeviceProfileId;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.RuleChainId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
|
||||
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||
import org.thingsboard.server.common.transport.util.DataDecodingEncodingService;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.FromDeviceRPCResponseProto;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
|
||||
@ -42,7 +46,7 @@ import org.thingsboard.server.queue.TbQueueProducer;
|
||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
|
||||
import org.thingsboard.server.queue.discovery.PartitionService;
|
||||
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
|
||||
import org.thingsboard.server.common.transport.util.DataDecodingEncodingService;
|
||||
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
|
||||
import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
|
||||
|
||||
import java.util.HashSet;
|
||||
@ -66,11 +70,13 @@ public class DefaultTbClusterService implements TbClusterService {
|
||||
private final TbQueueProducerProvider producerProvider;
|
||||
private final PartitionService partitionService;
|
||||
private final DataDecodingEncodingService encodingService;
|
||||
private final TbDeviceProfileCache deviceProfileCache;
|
||||
|
||||
public DefaultTbClusterService(TbQueueProducerProvider producerProvider, PartitionService partitionService, DataDecodingEncodingService encodingService) {
|
||||
public DefaultTbClusterService(TbQueueProducerProvider producerProvider, PartitionService partitionService, DataDecodingEncodingService encodingService, TbDeviceProfileCache deviceProfileCache) {
|
||||
this.producerProvider = producerProvider;
|
||||
this.partitionService = partitionService;
|
||||
this.encodingService = encodingService;
|
||||
this.deviceProfileCache = deviceProfileCache;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -126,6 +132,12 @@ public class DefaultTbClusterService implements TbClusterService {
|
||||
log.warn("[{}][{}] Received invalid message: {}", tenantId, entityId, tbMsg);
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
if (entityId.getEntityType().equals(EntityType.DEVICE)) {
|
||||
tbMsg = transformMsg(tbMsg, deviceProfileCache.get(tenantId, new DeviceId(entityId.getId())));
|
||||
} else if (entityId.getEntityType().equals(EntityType.DEVICE_PROFILE)) {
|
||||
tbMsg = transformMsg(tbMsg, deviceProfileCache.get(tenantId, new DeviceProfileId(entityId.getId())));
|
||||
}
|
||||
}
|
||||
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, entityId);
|
||||
log.trace("PUSHING msg: {} to:{}", tbMsg, tpi);
|
||||
@ -137,6 +149,16 @@ public class DefaultTbClusterService implements TbClusterService {
|
||||
toRuleEngineMsgs.incrementAndGet();
|
||||
}
|
||||
|
||||
private TbMsg transformMsg(TbMsg tbMsg, DeviceProfile deviceProfile) {
|
||||
if (deviceProfile != null) {
|
||||
RuleChainId targetRuleChainId = deviceProfile.getDefaultRuleChainId();
|
||||
if (targetRuleChainId != null && !targetRuleChainId.equals(tbMsg.getRuleChainId())) {
|
||||
tbMsg = TbMsg.transformMsg(tbMsg, targetRuleChainId);
|
||||
}
|
||||
}
|
||||
return tbMsg;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void pushNotificationToRuleEngine(String serviceId, FromDeviceRpcResponse response, TbQueueCallback callback) {
|
||||
TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_RULE_ENGINE, serviceId);
|
||||
@ -167,11 +189,27 @@ public class DefaultTbClusterService implements TbClusterService {
|
||||
|
||||
@Override
|
||||
public void onDeviceProfileChange(DeviceProfile deviceProfile, TbQueueCallback callback) {
|
||||
log.trace("[{}][{}] Processing device profile [{}] event", deviceProfile.getTenantId(), deviceProfile.getId(), deviceProfile.getName());
|
||||
log.trace("[{}][{}] Processing device profile [{}] change event", deviceProfile.getTenantId(), deviceProfile.getId(), deviceProfile.getName());
|
||||
TransportProtos.DeviceProfileUpdateMsg profileUpdateMsg = TransportProtos.DeviceProfileUpdateMsg.newBuilder()
|
||||
.setData(ByteString.copyFrom(encodingService.encode(deviceProfile))).build();
|
||||
ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setDeviceProfileUpdateMsg(profileUpdateMsg).build();
|
||||
broadcast(transportMsg);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onDeviceProfileDelete(DeviceProfile deviceProfile, TbQueueCallback callback) {
|
||||
log.trace("[{}][{}] Processing device profile [{}] delete event", deviceProfile.getTenantId(), deviceProfile.getId(), deviceProfile.getName());
|
||||
TransportProtos.DeviceProfileDeleteMsg profileDeleteMsg = TransportProtos.DeviceProfileDeleteMsg.newBuilder()
|
||||
.setProfileIdMSB(deviceProfile.getId().getId().getMostSignificantBits())
|
||||
.setProfileIdLSB(deviceProfile.getId().getId().getLeastSignificantBits())
|
||||
.build();
|
||||
ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setDeviceProfileDeleteMsg(profileDeleteMsg).build();
|
||||
broadcast(transportMsg);
|
||||
}
|
||||
|
||||
private void broadcast(ToTransportMsg transportMsg) {
|
||||
TbQueueProducer<TbProtoQueueMsg<ToTransportMsg>> toTransportNfProducer = producerProvider.getTransportNotificationsMsgProducer();
|
||||
Set<String> tbTransportServices = partitionService.getAllServiceIds(ServiceType.TB_TRANSPORT);
|
||||
TransportProtos.DeviceProfileUpdateMsg profileUpdateMsg = TransportProtos.DeviceProfileUpdateMsg.newBuilder().setData(ByteString.copyFrom(encodingService.encode(deviceProfile))).build();
|
||||
ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setDeviceProfileUpdateMsg(profileUpdateMsg).build();
|
||||
for (String transportServiceId : tbTransportServices) {
|
||||
TopicPartitionInfo tpi = partitionService.getNotificationsTopic(ServiceType.TB_TRANSPORT, transportServiceId);
|
||||
toTransportNfProducer.send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), transportMsg), null);
|
||||
|
||||
@ -21,10 +21,13 @@ import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.rule.engine.api.RpcError;
|
||||
import org.thingsboard.server.actors.ActorSystemContext;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
import org.thingsboard.server.common.data.alarm.Alarm;
|
||||
import org.thingsboard.server.common.data.id.DeviceProfileId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.msg.MsgType;
|
||||
import org.thingsboard.server.common.msg.TbActorMsg;
|
||||
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
|
||||
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||
import org.thingsboard.server.common.msg.queue.TbCallback;
|
||||
import org.thingsboard.server.dao.util.mapping.JacksonUtil;
|
||||
@ -48,6 +51,7 @@ import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
|
||||
import org.thingsboard.server.common.stats.StatsFactory;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
import org.thingsboard.server.common.transport.util.DataDecodingEncodingService;
|
||||
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
|
||||
import org.thingsboard.server.service.queue.processing.AbstractConsumerService;
|
||||
import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
|
||||
import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService;
|
||||
@ -92,8 +96,8 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
|
||||
public DefaultTbCoreConsumerService(TbCoreQueueFactory tbCoreQueueFactory, ActorSystemContext actorContext,
|
||||
DeviceStateService stateService, TbLocalSubscriptionService localSubscriptionService,
|
||||
SubscriptionManagerService subscriptionManagerService, DataDecodingEncodingService encodingService,
|
||||
TbCoreDeviceRpcService tbCoreDeviceRpcService, StatsFactory statsFactory) {
|
||||
super(actorContext, encodingService, tbCoreQueueFactory.createToCoreNotificationsMsgConsumer());
|
||||
TbCoreDeviceRpcService tbCoreDeviceRpcService, StatsFactory statsFactory, TbDeviceProfileCache deviceProfileCache) {
|
||||
super(actorContext, encodingService, deviceProfileCache, tbCoreQueueFactory.createToCoreNotificationsMsgConsumer());
|
||||
this.mainConsumer = tbCoreQueueFactory.createToCoreMsgConsumer();
|
||||
this.stateService = stateService;
|
||||
this.localSubscriptionService = localSubscriptionService;
|
||||
@ -211,11 +215,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
|
||||
log.trace("[{}] Forwarding message to RPC service {}", id, toCoreNotification.getFromDeviceRpcResponse());
|
||||
forwardToCoreRpcService(toCoreNotification.getFromDeviceRpcResponse(), callback);
|
||||
} else if (toCoreNotification.getComponentLifecycleMsg() != null && !toCoreNotification.getComponentLifecycleMsg().isEmpty()) {
|
||||
Optional<TbActorMsg> actorMsg = encodingService.decode(toCoreNotification.getComponentLifecycleMsg().toByteArray());
|
||||
if (actorMsg.isPresent()) {
|
||||
log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get());
|
||||
actorContext.tellWithHighPriority(actorMsg.get());
|
||||
}
|
||||
handleComponentLifecycleMsg(id, toCoreNotification.getComponentLifecycleMsg());
|
||||
callback.onSuccess();
|
||||
}
|
||||
if (statsEnabled) {
|
||||
|
||||
@ -15,6 +15,7 @@
|
||||
*/
|
||||
package org.thingsboard.server.service.queue;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import com.google.protobuf.ProtocolStringList;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
@ -38,6 +39,7 @@ import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings;
|
||||
import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration;
|
||||
import org.thingsboard.server.queue.util.TbRuleEngineComponent;
|
||||
import org.thingsboard.server.common.transport.util.DataDecodingEncodingService;
|
||||
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
|
||||
import org.thingsboard.server.service.queue.processing.*;
|
||||
import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
|
||||
import org.thingsboard.server.service.rpc.TbRuleEngineDeviceRpcService;
|
||||
@ -80,8 +82,8 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
|
||||
TbRuleEngineQueueFactory tbRuleEngineQueueFactory, RuleEngineStatisticsService statisticsService,
|
||||
ActorSystemContext actorContext, DataDecodingEncodingService encodingService,
|
||||
TbRuleEngineDeviceRpcService tbDeviceRpcService,
|
||||
StatsFactory statsFactory) {
|
||||
super(actorContext, encodingService, tbRuleEngineQueueFactory.createToRuleEngineNotificationsMsgConsumer());
|
||||
StatsFactory statsFactory, TbDeviceProfileCache deviceProfileCache) {
|
||||
super(actorContext, encodingService, deviceProfileCache, tbRuleEngineQueueFactory.createToRuleEngineNotificationsMsgConsumer());
|
||||
this.statisticsService = statisticsService;
|
||||
this.ruleEngineSettings = ruleEngineSettings;
|
||||
this.tbRuleEngineQueueFactory = tbRuleEngineQueueFactory;
|
||||
@ -239,11 +241,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
|
||||
protected void handleNotification(UUID id, TbProtoQueueMsg<ToRuleEngineNotificationMsg> msg, TbCallback callback) throws Exception {
|
||||
ToRuleEngineNotificationMsg nfMsg = msg.getValue();
|
||||
if (nfMsg.getComponentLifecycleMsg() != null && !nfMsg.getComponentLifecycleMsg().isEmpty()) {
|
||||
Optional<TbActorMsg> actorMsg = encodingService.decode(nfMsg.getComponentLifecycleMsg().toByteArray());
|
||||
if (actorMsg.isPresent()) {
|
||||
log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg.get());
|
||||
actorContext.tellWithHighPriority(actorMsg.get());
|
||||
}
|
||||
handleComponentLifecycleMsg(id, nfMsg.getComponentLifecycleMsg());
|
||||
callback.onSuccess();
|
||||
} else if (nfMsg.hasFromDeviceRpcResponse()) {
|
||||
TransportProtos.FromDeviceRPCResponseProto proto = nfMsg.getFromDeviceRpcResponse();
|
||||
|
||||
@ -17,6 +17,7 @@ package org.thingsboard.server.service.queue;
|
||||
|
||||
import org.thingsboard.rule.engine.api.msg.ToDeviceActorNotificationMsg;
|
||||
import org.thingsboard.server.common.data.DeviceProfile;
|
||||
import org.thingsboard.server.common.data.id.DeviceProfileId;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
|
||||
@ -52,4 +53,5 @@ public interface TbClusterService {
|
||||
|
||||
void onDeviceProfileChange(DeviceProfile deviceProfile, TbQueueCallback callback);
|
||||
|
||||
void onDeviceProfileDelete(DeviceProfile deviceProfileId, TbQueueCallback callback);
|
||||
}
|
||||
|
||||
@ -15,23 +15,31 @@
|
||||
*/
|
||||
package org.thingsboard.server.service.queue.processing;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.boot.context.event.ApplicationReadyEvent;
|
||||
import org.springframework.context.ApplicationListener;
|
||||
import org.springframework.context.event.EventListener;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
import org.thingsboard.server.actors.ActorSystemContext;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.DeviceProfileId;
|
||||
import org.thingsboard.server.common.msg.TbActorMsg;
|
||||
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
|
||||
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||
import org.thingsboard.server.common.msg.queue.TbCallback;
|
||||
import org.thingsboard.server.queue.TbQueueConsumer;
|
||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
|
||||
import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
|
||||
import org.thingsboard.server.common.transport.util.DataDecodingEncodingService;
|
||||
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
|
||||
import org.thingsboard.server.service.queue.TbPackCallback;
|
||||
import org.thingsboard.server.service.queue.TbPackProcessingContext;
|
||||
|
||||
import javax.annotation.PreDestroy;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
@ -51,12 +59,15 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene
|
||||
|
||||
protected final ActorSystemContext actorContext;
|
||||
protected final DataDecodingEncodingService encodingService;
|
||||
protected final TbDeviceProfileCache deviceProfileCache;
|
||||
|
||||
protected final TbQueueConsumer<TbProtoQueueMsg<N>> nfConsumer;
|
||||
|
||||
public AbstractConsumerService(ActorSystemContext actorContext, DataDecodingEncodingService encodingService, TbQueueConsumer<TbProtoQueueMsg<N>> nfConsumer) {
|
||||
public AbstractConsumerService(ActorSystemContext actorContext, DataDecodingEncodingService encodingService,
|
||||
TbDeviceProfileCache deviceProfileCache, TbQueueConsumer<TbProtoQueueMsg<N>> nfConsumer) {
|
||||
this.actorContext = actorContext;
|
||||
this.encodingService = encodingService;
|
||||
this.deviceProfileCache = deviceProfileCache;
|
||||
this.nfConsumer = nfConsumer;
|
||||
}
|
||||
|
||||
@ -126,6 +137,23 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene
|
||||
});
|
||||
}
|
||||
|
||||
protected void handleComponentLifecycleMsg(UUID id, ByteString nfMsg) {
|
||||
Optional<TbActorMsg> actorMsgOpt = encodingService.decode(nfMsg.toByteArray());
|
||||
if (actorMsgOpt.isPresent()) {
|
||||
TbActorMsg actorMsg = actorMsgOpt.get();
|
||||
if (actorMsg instanceof ComponentLifecycleMsg) {
|
||||
ComponentLifecycleMsg componentLifecycleMsg = (ComponentLifecycleMsg) actorMsg;
|
||||
if (EntityType.DEVICE_PROFILE.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
|
||||
deviceProfileCache.evict(new DeviceProfileId(componentLifecycleMsg.getEntityId().getId()));
|
||||
} else if (EntityType.DEVICE.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
|
||||
deviceProfileCache.evict(new DeviceId(componentLifecycleMsg.getEntityId().getId()));
|
||||
}
|
||||
}
|
||||
log.trace("[{}] Forwarding message to App Actor {}", id, actorMsg);
|
||||
actorContext.tellWithHighPriority(actorMsg);
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract void handleNotification(UUID id, TbProtoQueueMsg<N> msg, TbCallback callback) throws Exception;
|
||||
|
||||
@PreDestroy
|
||||
|
||||
@ -26,6 +26,7 @@ import org.thingsboard.server.common.data.id.EntityIdFactory;
|
||||
import org.thingsboard.server.common.data.id.RuleChainId;
|
||||
import org.thingsboard.server.common.data.id.RuleNodeId;
|
||||
import org.thingsboard.server.common.msg.gen.MsgProtos;
|
||||
import org.thingsboard.server.common.msg.queue.RuleNodeInfo;
|
||||
import org.thingsboard.server.common.msg.queue.ServiceQueue;
|
||||
import org.thingsboard.server.common.msg.queue.TbMsgCallback;
|
||||
|
||||
@ -84,6 +85,11 @@ public final class TbMsg implements Serializable {
|
||||
data, origMsg.getRuleChainId(), origMsg.getRuleNodeId(), origMsg.getCallback());
|
||||
}
|
||||
|
||||
public static TbMsg transformMsg(TbMsg origMsg, RuleChainId ruleChainId) {
|
||||
return new TbMsg(origMsg.queueName, origMsg.id, origMsg.ts, origMsg.type, origMsg.originator, origMsg.metaData, origMsg.dataType,
|
||||
origMsg.data, ruleChainId, null, origMsg.getCallback());
|
||||
}
|
||||
|
||||
public static TbMsg newMsg(TbMsg tbMsg, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
|
||||
return new TbMsg(tbMsg.getQueueName(), UUID.randomUUID(), tbMsg.getTs(), tbMsg.getType(), tbMsg.getOriginator(), tbMsg.getMetaData().copy(),
|
||||
tbMsg.getDataType(), tbMsg.getData(), ruleChainId, ruleNodeId, TbMsgCallback.EMPTY);
|
||||
|
||||
@ -194,6 +194,11 @@ message DeviceProfileUpdateMsg {
|
||||
bytes data = 1;
|
||||
}
|
||||
|
||||
message DeviceProfileDeleteMsg {
|
||||
int64 profileIdMSB = 1;
|
||||
int64 profileIdLSB = 2;
|
||||
}
|
||||
|
||||
message SessionCloseNotificationProto {
|
||||
string message = 1;
|
||||
}
|
||||
@ -485,4 +490,5 @@ message ToTransportMsg {
|
||||
ToDeviceRpcRequestMsg toDeviceRequest = 6;
|
||||
ToServerRpcResponseMsg toServerResponse = 7;
|
||||
DeviceProfileUpdateMsg deviceProfileUpdateMsg = 8;
|
||||
DeviceProfileDeleteMsg deviceProfileDeleteMsg = 9;
|
||||
}
|
||||
|
||||
@ -23,7 +23,6 @@ import java.util.Optional;
|
||||
|
||||
public interface TransportProfileCache {
|
||||
|
||||
|
||||
DeviceProfile getOrCreate(DeviceProfileId id, ByteString profileBody);
|
||||
|
||||
DeviceProfile get(DeviceProfileId id);
|
||||
@ -32,4 +31,6 @@ public interface TransportProfileCache {
|
||||
|
||||
DeviceProfile put(ByteString profileBody);
|
||||
|
||||
void evict(DeviceProfileId id);
|
||||
|
||||
}
|
||||
|
||||
@ -74,4 +74,9 @@ public class DefaultTransportProfileCache implements TransportProfileCache {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void evict(DeviceProfileId id) {
|
||||
deviceProfiles.remove(id);
|
||||
}
|
||||
}
|
||||
|
||||
@ -629,6 +629,11 @@ public class DefaultTransportService implements TransportService {
|
||||
if (deviceProfile != null) {
|
||||
onProfileUpdate(deviceProfile);
|
||||
}
|
||||
} else if (toSessionMsg.hasDeviceProfileDeleteMsg()) {
|
||||
transportProfileCache.evict(new DeviceProfileId(new UUID(
|
||||
toSessionMsg.getDeviceProfileDeleteMsg().getProfileIdMSB(),
|
||||
toSessionMsg.getDeviceProfileDeleteMsg().getProfileIdLSB()
|
||||
)));
|
||||
} else {
|
||||
//TODO: should we notify the device actor about missed session?
|
||||
log.debug("[{}] Missing session.", sessionId);
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user