diff --git a/application/src/main/data/upgrade/3.3.4/schema_update_device_profile.sql b/application/src/main/data/upgrade/3.3.4/schema_update_device_profile.sql deleted file mode 100644 index e0b0cc7f1a..0000000000 --- a/application/src/main/data/upgrade/3.3.4/schema_update_device_profile.sql +++ /dev/null @@ -1,49 +0,0 @@ --- --- Copyright © 2016-2022 The Thingsboard Authors --- --- Licensed under the Apache License, Version 2.0 (the "License"); --- you may not use this file except in compliance with the License. --- You may obtain a copy of the License at --- --- http://www.apache.org/licenses/LICENSE-2.0 --- --- Unless required by applicable law or agreed to in writing, software --- distributed under the License is distributed on an "AS IS" BASIS, --- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. --- See the License for the specific language governing permissions and --- limitations under the License. --- - -ALTER TABLE device_profile - ADD COLUMN IF NOT EXISTS default_queue_id uuid; - -DO -$$ - BEGIN - IF EXISTS - (SELECT column_name - FROM information_schema.columns - WHERE table_name = 'device_profile' - AND column_name = 'default_queue_name' - ) - THEN - UPDATE device_profile - SET default_queue_id = q.id - FROM queue as q - WHERE default_queue_name = q.name; - END IF; - END -$$; - -DO -$$ - BEGIN - IF NOT EXISTS(SELECT 1 FROM pg_constraint WHERE conname = 'fk_default_queue_device_profile') THEN - ALTER TABLE device_profile - ADD CONSTRAINT fk_default_queue_device_profile FOREIGN KEY (default_queue_id) REFERENCES queue (id); - END IF; - END; -$$; - -ALTER TABLE device_profile - DROP COLUMN IF EXISTS default_queue_name; diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index 7e689e58fa..a38de7b839 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -508,13 +508,8 @@ public class ActorSystemContext { return partitionService.resolve(serviceType, tenantId, entityId); } - public TopicPartitionInfo resolve(ServiceType serviceType, QueueId queueId, TenantId tenantId, EntityId entityId) { - return partitionService.resolve(serviceType, queueId, tenantId, entityId); - } - - @Deprecated public TopicPartitionInfo resolve(ServiceType serviceType, String queueName, TenantId tenantId, EntityId entityId) { - return partitionService.resolve(serviceType, tenantId, entityId, queueName); + return partitionService.resolve(serviceType, queueName, tenantId, entityId); } public String getServiceId() { diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index 3538800d38..d5460689b8 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -162,18 +162,11 @@ class DefaultTbContext implements TbContext { } @Override - @Deprecated public void enqueue(TbMsg tbMsg, String queueName, Runnable onSuccess, Consumer onFailure) { TopicPartitionInfo tpi = resolvePartition(tbMsg, queueName); enqueue(tpi, tbMsg, onFailure, onSuccess); } - @Override - public void enqueue(TbMsg tbMsg, QueueId queueId, Runnable onSuccess, Consumer onFailure) { - TopicPartitionInfo tpi = resolvePartition(tbMsg, queueId); - enqueue(tpi, tbMsg, onFailure, onSuccess); - } - private void enqueue(TopicPartitionInfo tpi, TbMsg tbMsg, Consumer onFailure, Runnable onSuccess) { if (!tbMsg.isValid()) { log.trace("[{}] Skip invalid message: {}", getTenantId(), tbMsg); @@ -223,35 +216,30 @@ class DefaultTbContext implements TbContext { } @Override - public void enqueueForTellNext(TbMsg tbMsg, QueueId queueId, String relationType, Runnable onSuccess, Consumer onFailure) { - TopicPartitionInfo tpi = resolvePartition(tbMsg, queueId); - enqueueForTellNext(tpi, queueId, tbMsg, Collections.singleton(relationType), null, onSuccess, onFailure); + public void enqueueForTellNext(TbMsg tbMsg, String queueName, String relationType, Runnable onSuccess, Consumer onFailure) { + TopicPartitionInfo tpi = resolvePartition(tbMsg, queueName); + enqueueForTellNext(tpi, queueName, tbMsg, Collections.singleton(relationType), null, onSuccess, onFailure); } @Override - public void enqueueForTellNext(TbMsg tbMsg, QueueId queueId, Set relationTypes, Runnable onSuccess, Consumer onFailure) { - TopicPartitionInfo tpi = resolvePartition(tbMsg, queueId); - enqueueForTellNext(tpi, queueId, tbMsg, relationTypes, null, onSuccess, onFailure); + public void enqueueForTellNext(TbMsg tbMsg, String queueName, Set relationTypes, Runnable onSuccess, Consumer onFailure) { + TopicPartitionInfo tpi = resolvePartition(tbMsg, queueName); + enqueueForTellNext(tpi, queueName, tbMsg, relationTypes, null, onSuccess, onFailure); } - private TopicPartitionInfo resolvePartition(TbMsg tbMsg, QueueId queueId) { - return mainCtx.resolve(ServiceType.TB_RULE_ENGINE, queueId, getTenantId(), tbMsg.getOriginator()); - } - - @Deprecated private TopicPartitionInfo resolvePartition(TbMsg tbMsg, String queueName) { return mainCtx.resolve(ServiceType.TB_RULE_ENGINE, queueName, getTenantId(), tbMsg.getOriginator()); } private TopicPartitionInfo resolvePartition(TbMsg tbMsg) { - return resolvePartition(tbMsg, tbMsg.getQueueId()); + return resolvePartition(tbMsg, tbMsg.getQueueName()); } private void enqueueForTellNext(TopicPartitionInfo tpi, TbMsg source, Set relationTypes, String failureMessage, Runnable onSuccess, Consumer onFailure) { - enqueueForTellNext(tpi, source.getQueueId(), source, relationTypes, failureMessage, onSuccess, onFailure); + enqueueForTellNext(tpi, source.getQueueName(), source, relationTypes, failureMessage, onSuccess, onFailure); } - private void enqueueForTellNext(TopicPartitionInfo tpi, QueueId queueId, TbMsg source, Set relationTypes, String failureMessage, Runnable onSuccess, Consumer onFailure) { + private void enqueueForTellNext(TopicPartitionInfo tpi, String queueName, TbMsg source, Set relationTypes, String failureMessage, Runnable onSuccess, Consumer onFailure) { if (!source.isValid()) { log.trace("[{}] Skip invalid message: {}", getTenantId(), source); if (onFailure != null) { @@ -261,7 +249,7 @@ class DefaultTbContext implements TbContext { } RuleChainId ruleChainId = nodeCtx.getSelf().getRuleChainId(); RuleNodeId ruleNodeId = nodeCtx.getSelf().getId(); - TbMsg tbMsg = TbMsg.newMsg(source, queueId, ruleChainId, ruleNodeId); + TbMsg tbMsg = TbMsg.newMsg(source, queueName, ruleChainId, ruleNodeId); TransportProtos.ToRuleEngineMsg.Builder msg = TransportProtos.ToRuleEngineMsg.newBuilder() .setTenantIdMSB(getTenantId().getId().getMostSignificantBits()) .setTenantIdLSB(getTenantId().getId().getLeastSignificantBits()) @@ -320,13 +308,13 @@ class DefaultTbContext implements TbContext { } @Override - public TbMsg newMsg(QueueId queueId, String type, EntityId originator, TbMsgMetaData metaData, String data) { - return newMsg(queueId, type, originator, null, metaData, data); + public TbMsg newMsg(String queueName, String type, EntityId originator, TbMsgMetaData metaData, String data) { + return newMsg(queueName, type, originator, null, metaData, data); } @Override - public TbMsg newMsg(QueueId queueId, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data) { - return TbMsg.newMsg(queueId, type, originator, customerId, metaData, data, nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId()); + public TbMsg newMsg(String queueName, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data) { + return TbMsg.newMsg(queueName, type, originator, customerId, metaData, data, nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId()); } @Override @@ -340,17 +328,17 @@ class DefaultTbContext implements TbContext { public TbMsg deviceCreatedMsg(Device device, RuleNodeId ruleNodeId) { RuleChainId ruleChainId = null; - QueueId queueId = null; + String queueName = null; if (device.getDeviceProfileId() != null) { DeviceProfile deviceProfile = mainCtx.getDeviceProfileCache().find(device.getDeviceProfileId()); if (deviceProfile == null) { log.warn("[{}] Device profile is null!", device.getDeviceProfileId()); } else { ruleChainId = deviceProfile.getDefaultRuleChainId(); - queueId = deviceProfile.getDefaultQueueId(); + queueName = deviceProfile.getDefaultQueueName(); } } - return entityActionMsg(device, device.getId(), ruleNodeId, DataConstants.ENTITY_CREATED, queueId, ruleChainId); + return entityActionMsg(device, device.getId(), ruleNodeId, DataConstants.ENTITY_CREATED, queueName, ruleChainId); } public TbMsg assetCreatedMsg(Asset asset, RuleNodeId ruleNodeId) { @@ -359,7 +347,7 @@ class DefaultTbContext implements TbContext { public TbMsg alarmActionMsg(Alarm alarm, RuleNodeId ruleNodeId, String action) { RuleChainId ruleChainId = null; - QueueId queueId = null; + String queueName = null; if (EntityType.DEVICE.equals(alarm.getOriginator().getEntityType())) { DeviceId deviceId = new DeviceId(alarm.getOriginator().getId()); DeviceProfile deviceProfile = mainCtx.getDeviceProfileCache().get(getTenantId(), deviceId); @@ -367,10 +355,10 @@ class DefaultTbContext implements TbContext { log.warn("[{}] Device profile is null!", deviceId); } else { ruleChainId = deviceProfile.getDefaultRuleChainId(); - queueId = deviceProfile.getDefaultQueueId(); + queueName = deviceProfile.getDefaultQueueName(); } } - return entityActionMsg(alarm, alarm.getId(), ruleNodeId, action, queueId, ruleChainId); + return entityActionMsg(alarm, alarm.getId(), ruleNodeId, action, queueName, ruleChainId); } @Override @@ -382,9 +370,9 @@ class DefaultTbContext implements TbContext { return entityActionMsg(entity, id, ruleNodeId, action, null, null); } - public TbMsg entityActionMsg(E entity, I id, RuleNodeId ruleNodeId, String action, QueueId queueId, RuleChainId ruleChainId) { + public TbMsg entityActionMsg(E entity, I id, RuleNodeId ruleNodeId, String action, String queueName, RuleChainId ruleChainId) { try { - return TbMsg.newMsg(queueId, action, id, getActionMetaData(ruleNodeId), mapper.writeValueAsString(mapper.valueToTree(entity)), ruleChainId, null); + return TbMsg.newMsg(queueName, action, id, getActionMetaData(ruleNodeId), mapper.writeValueAsString(mapper.valueToTree(entity)), ruleChainId, null); } catch (JsonProcessingException | IllegalArgumentException e) { throw new RuntimeException("Failed to process " + id.getEntityType().name().toLowerCase() + " " + action + " msg: " + e); } diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java index add500f058..b4911650f3 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleChainActorMessageProcessor.java @@ -293,7 +293,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor ruleNodeRelations = nodeRoutes.get(originatorNodeId); if (ruleNodeRelations == null) { // When unchecked, this will cause NullPointerException when rule node doesn't exist anymore diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/DeviceProfileMsgConstructor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/DeviceProfileMsgConstructor.java index 3920e567f9..4e12fa3366 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/DeviceProfileMsgConstructor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/DeviceProfileMsgConstructor.java @@ -48,9 +48,8 @@ public class DeviceProfileMsgConstructor { // builder.setDefaultRuleChainIdMSB(deviceProfile.getDefaultRuleChainId().getId().getMostSignificantBits()) // .setDefaultRuleChainIdLSB(deviceProfile.getDefaultRuleChainId().getId().getLeastSignificantBits()); // } - if (deviceProfile.getDefaultQueueId() != null) { - builder.setDefaultQueueIdMSB(deviceProfile.getDefaultQueueId().getId().getMostSignificantBits()) - .setDefaultQueueIdLSB(deviceProfile.getDefaultQueueId().getId().getLeastSignificantBits()); + if (deviceProfile.getDefaultQueueName() != null) { + builder.setDefaultQueueName(deviceProfile.getDefaultQueueName()); } if (deviceProfile.getDescription() != null) { builder.setDescription(deviceProfile.getDescription()); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java index f30111e1ef..ad6446cb95 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java @@ -158,21 +158,21 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor { return metaData; } - private Pair getDefaultQueueNameAndRuleChainId(TenantId tenantId, EntityId entityId) { + private Pair getDefaultQueueNameAndRuleChainId(TenantId tenantId, EntityId entityId) { if (EntityType.DEVICE.equals(entityId.getEntityType())) { DeviceProfile deviceProfile = deviceProfileCache.get(tenantId, new DeviceId(entityId.getId())); RuleChainId ruleChainId; - QueueId queueId; + String queueName; if (deviceProfile == null) { log.warn("[{}] Device profile is null!", entityId); ruleChainId = null; - queueId = null; + queueName = null; } else { ruleChainId = deviceProfile.getDefaultRuleChainId(); - queueId = deviceProfile.getDefaultQueueId(); + queueName = deviceProfile.getDefaultQueueName(); } - return new ImmutablePair<>(queueId, ruleChainId); + return new ImmutablePair<>(queueName, ruleChainId); } else { return new ImmutablePair<>(null, null); } @@ -183,10 +183,8 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor { for (TransportProtos.TsKvListProto tsKv : msg.getTsKvListList()) { JsonObject json = JsonUtils.getJsonObject(tsKv.getKvList()); metaData.putValue("ts", tsKv.getTs() + ""); - Pair defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId); - QueueId queueId = defaultQueueAndRuleChain.getKey(); - RuleChainId ruleChainId = defaultQueueAndRuleChain.getValue(); - TbMsg tbMsg = TbMsg.newMsg(queueId, SessionMsgType.POST_TELEMETRY_REQUEST.name(), entityId, customerId, metaData, gson.toJson(json), ruleChainId, null); + var defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId); + TbMsg tbMsg = TbMsg.newMsg(defaultQueueAndRuleChain.getKey(), SessionMsgType.POST_TELEMETRY_REQUEST.name(), entityId, customerId, metaData, gson.toJson(json), defaultQueueAndRuleChain.getValue(), null); tbClusterService.pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() { @Override public void onSuccess(TbQueueMsgMetadata metadata) { @@ -206,10 +204,8 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor { private ListenableFuture processPostAttributes(TenantId tenantId, CustomerId customerId, EntityId entityId, TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData) { SettableFuture futureToSet = SettableFuture.create(); JsonObject json = JsonUtils.getJsonObject(msg.getKvList()); - Pair defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId); - QueueId queueId = defaultQueueAndRuleChain.getKey(); - RuleChainId ruleChainId = defaultQueueAndRuleChain.getValue(); - TbMsg tbMsg = TbMsg.newMsg(queueId, SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), entityId, customerId, metaData, gson.toJson(json), ruleChainId, null); + var defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId); + TbMsg tbMsg = TbMsg.newMsg(defaultQueueAndRuleChain.getKey(), SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), entityId, customerId, metaData, gson.toJson(json), defaultQueueAndRuleChain.getValue(), null); tbClusterService.pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() { @Override public void onSuccess(TbQueueMsgMetadata metadata) { @@ -233,10 +229,8 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor { Futures.addCallback(future, new FutureCallback<>() { @Override public void onSuccess(@Nullable List keys) { - Pair defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId); - QueueId queueId = defaultQueueAndRuleChain.getKey(); - RuleChainId ruleChainId = defaultQueueAndRuleChain.getValue(); - TbMsg tbMsg = TbMsg.newMsg(queueId, DataConstants.ATTRIBUTES_UPDATED, entityId, customerId, metaData, gson.toJson(json), ruleChainId, null); + var defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId); + TbMsg tbMsg = TbMsg.newMsg(defaultQueueAndRuleChain.getKey(), DataConstants.ATTRIBUTES_UPDATED, entityId, customerId, metaData, gson.toJson(json), defaultQueueAndRuleChain.getValue(), null); tbClusterService.pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() { @Override public void onSuccess(TbQueueMsgMetadata metadata) { diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/queue/DefaultTbQueueService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/queue/DefaultTbQueueService.java index ad0a91e0eb..4355d998bb 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/queue/DefaultTbQueueService.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/queue/DefaultTbQueueService.java @@ -48,13 +48,11 @@ import java.util.stream.Collectors; @TbCoreComponent @AllArgsConstructor public class DefaultTbQueueService extends AbstractTbEntityService implements TbQueueService { - private static final String MAIN = "Main"; private static final long DELETE_DELAY = 30; private final QueueService queueService; private final TbClusterService tbClusterService; private final TbQueueAdmin tbQueueAdmin; - private final DeviceProfileService deviceProfileService; private final SchedulerComponent scheduler; @Override @@ -205,35 +203,7 @@ public class DefaultTbQueueService extends AbstractTbEntityService implements Tb } tenantIds.forEach(tenantId -> { - Map> deviceProfileQueues; - - if (oldTenantProfile != null && !newTenantProfile.getId().equals(oldTenantProfile.getId()) || !toRemove.isEmpty()) { - List deviceProfiles = deviceProfileService.findDeviceProfiles(tenantId, new PageLink(Integer.MAX_VALUE)).getData(); - deviceProfileQueues = deviceProfiles.stream() - .filter(dp -> dp.getDefaultQueueId() != null) - .collect(Collectors.groupingBy(DeviceProfile::getDefaultQueueId)); - } else { - deviceProfileQueues = Collections.emptyMap(); - } - - Map createdQueues = toCreate.stream() - .map(key -> saveQueue(new Queue(tenantId, newQueues.get(key)))) - .collect(Collectors.toMap(Queue::getName, Queue::getId)); - - // assigning created queues to device profiles instead of system queues - if (oldTenantProfile != null && !oldTenantProfile.isIsolatedTbRuleEngine()) { - deviceProfileQueues.forEach((queueId, list) -> { - Queue queue = queueService.findQueueById(TenantId.SYS_TENANT_ID, queueId); - QueueId queueIdToAssign = createdQueues.get(queue.getName()); - if (queueIdToAssign == null) { - queueIdToAssign = createdQueues.get(MAIN); - } - for (DeviceProfile deviceProfile : list) { - deviceProfile.setDefaultQueueId(queueIdToAssign); - saveDeviceProfile(deviceProfile); - } - }); - } + toCreate.forEach(key -> saveQueue(new Queue(tenantId, newQueues.get(key)))); toUpdate.forEach(key -> { Queue queueToUpdate = new Queue(tenantId, newQueues.get(key)); @@ -241,9 +211,7 @@ public class DefaultTbQueueService extends AbstractTbEntityService implements Tb queueToUpdate.setId(foundQueue.getId()); queueToUpdate.setCreatedTime(foundQueue.getCreatedTime()); - if (queueToUpdate.equals(foundQueue)) { - //Queue not changed - } else { + if (!queueToUpdate.equals(foundQueue)) { saveQueue(queueToUpdate); } }); @@ -251,26 +219,9 @@ public class DefaultTbQueueService extends AbstractTbEntityService implements Tb toRemove.forEach(q -> { Queue queue = queueService.findQueueByTenantIdAndNameInternal(tenantId, q); QueueId queueIdForRemove = queue.getId(); - if (deviceProfileQueues.containsKey(queueIdForRemove)) { - Queue foundQueue = queueService.findQueueByTenantIdAndName(tenantId, q); - if (foundQueue == null || queue.equals(foundQueue)) { - foundQueue = queueService.findQueueByTenantIdAndName(tenantId, MAIN); - } - QueueId newQueueId = foundQueue.getId(); - deviceProfileQueues.get(queueIdForRemove).stream() - .peek(dp -> dp.setDefaultQueueId(newQueueId)) - .forEach(this::saveDeviceProfile); - } deleteQueue(tenantId, queueIdForRemove); }); }); } - //TODO: remove after implementing TbDeviceProfileService - private void saveDeviceProfile(DeviceProfile deviceProfile) { - DeviceProfile savedDeviceProfile = deviceProfileService.saveDeviceProfile(deviceProfile); - tbClusterService.onDeviceProfileChange(savedDeviceProfile, null); - tbClusterService.broadcastEntityStateChangeEvent(deviceProfile.getTenantId(), savedDeviceProfile.getId(), ComponentLifecycleEvent.UPDATED); - } - } diff --git a/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java b/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java index e428d55d91..d22039683d 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java @@ -589,10 +589,6 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService } catch (Exception e) { } - log.info("Updating device profiles..."); - schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "3.3.4", "schema_update_device_profile.sql"); - loadSql(schemaUpdateFile, conn); - log.info("Updating schema settings..."); conn.createStatement().execute("UPDATE tb_schema_settings SET schema_version = 3004000;"); log.info("Schema updated."); diff --git a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java index 9bfa86b3ba..cb560bb5ff 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java @@ -158,7 +158,6 @@ public class DefaultDataUpdateService implements DataUpdateService { log.info("Updating data from version 3.3.4 to 3.4.0 ..."); rateLimitsUpdater.updateEntities(); tenantsProfileQueueConfigurationUpdater.updateEntities(); - checkPointRuleNodesUpdater.updateEntities(); break; default: throw new RuntimeException("Unable to update data, unsupported fromVersion: " + fromVersion); @@ -629,47 +628,4 @@ public class DefaultDataUpdateService implements DataUpdateService { return mainQueueConfiguration; } - private final PaginatedUpdater checkPointRuleNodesUpdater = - new PaginatedUpdater<>() { - - @Override - protected String getName() { - return "Checkpoint rule nodes updater"; - } - - @Override - protected boolean forceReportTotal() { - return true; - } - - @Override - protected PageData findEntities(String id, PageLink pageLink) { - return ruleChainService.findAllRuleNodesByType("org.thingsboard.rule.engine.flow.TbCheckpointNode", pageLink); - } - - @Override - protected void updateEntity(RuleNode ruleNode) { - updateCheckPointRuleNodeConfiguration(ruleNode); - } - }; - - private void updateCheckPointRuleNodeConfiguration(RuleNode node) { - try { - ObjectNode configuration = (ObjectNode) node.getConfiguration(); - JsonNode queueNameNode = configuration.remove("queueName"); - if (queueNameNode != null) { - RuleChain ruleChain = this.ruleChainService.findRuleChainById(TenantId.SYS_TENANT_ID, node.getRuleChainId()); - TenantId tenantId = ruleChain.getTenantId(); - Map queues = - queueService.findQueuesByTenantId(tenantId).stream().collect(Collectors.toMap(Queue::getName, Queue::getId)); - String queueName = queueNameNode.asText(); - QueueId queueId = queues.get(queueName); - configuration.put("queueId", queueId != null ? queueId.toString() : ""); - ruleChainService.saveRuleNode(tenantId, node); - } - } catch (Exception e) { - log.error("Failed to update checkpoint rule node configuration name=["+node.getName()+"], id=["+ node.getId().getId() +"]", e); - } - } - } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index 9e60f69620..062e4128d4 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -181,7 +181,7 @@ public class DefaultTbClusterService implements TbClusterService { tbMsg = transformMsg(tbMsg, deviceProfileCache.get(tenantId, new DeviceProfileId(entityId.getId()))); } } - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueId(), tenantId, entityId); + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueName(), tenantId, entityId); log.trace("PUSHING msg: {} to:{}", tbMsg, tpi); ToRuleEngineMsg msg = ToRuleEngineMsg.newBuilder() .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) @@ -194,16 +194,16 @@ public class DefaultTbClusterService implements TbClusterService { private TbMsg transformMsg(TbMsg tbMsg, DeviceProfile deviceProfile) { if (deviceProfile != null) { RuleChainId targetRuleChainId = deviceProfile.getDefaultRuleChainId(); - QueueId targetQueueId = deviceProfile.getDefaultQueueId(); + String targetQueueName = deviceProfile.getDefaultQueueName(); boolean isRuleChainTransform = targetRuleChainId != null && !targetRuleChainId.equals(tbMsg.getRuleChainId()); - boolean isQueueTransform = targetQueueId != null && !targetQueueId.equals(tbMsg.getQueueId()); + boolean isQueueTransform = targetQueueName != null && !targetQueueName.equals(tbMsg.getQueueName()); if (isRuleChainTransform && isQueueTransform) { - tbMsg = TbMsg.transformMsg(tbMsg, targetRuleChainId, targetQueueId); + tbMsg = TbMsg.transformMsg(tbMsg, targetRuleChainId, targetQueueName); } else if (isRuleChainTransform) { tbMsg = TbMsg.transformMsg(tbMsg, targetRuleChainId); } else if (isQueueTransform) { - tbMsg = TbMsg.transformMsg(tbMsg, targetQueueId); + tbMsg = TbMsg.transformMsg(tbMsg, targetQueueName); } } return tbMsg; diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java index 92dd4fcf5c..3c7552ff9b 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbRuleEngineConsumerService.java @@ -274,7 +274,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< final boolean timeout = !ctx.await(configuration.getPackProcessingTimeout(), TimeUnit.MILLISECONDS); - TbRuleEngineProcessingResult result = new TbRuleEngineProcessingResult(configuration.getId(), timeout, ctx); + TbRuleEngineProcessingResult result = new TbRuleEngineProcessingResult(configuration.getName(), timeout, ctx); if (timeout) { printFirstOrAll(configuration, ctx, ctx.getPendingMap(), "Timeout"); } @@ -339,7 +339,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< new TbMsgPackCallback(id, tenantId, ctx); try { if (toRuleEngineMsg.getTbMsg() != null && !toRuleEngineMsg.getTbMsg().isEmpty()) { - forwardToRuleEngineActor(configuration.getId(), tenantId, toRuleEngineMsg, callback); + forwardToRuleEngineActor(configuration.getName(), tenantId, toRuleEngineMsg, callback); } else { callback.onSuccess(); } @@ -353,7 +353,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< log.info("{} to process [{}] messages", prefix, map.size()); for (Map.Entry> pending : map.entrySet()) { ToRuleEngineMsg tmp = pending.getValue().getValue(); - TbMsg tmpMsg = TbMsg.fromBytes(configuration.getId(), tmp.getTbMsg().toByteArray(), TbMsgCallback.EMPTY); + TbMsg tmpMsg = TbMsg.fromBytes(configuration.getName(), tmp.getTbMsg().toByteArray(), TbMsgCallback.EMPTY); RuleNodeInfo ruleNodeInfo = ctx.getLastVisitedRuleNode(pending.getKey()); if (printAll) { log.trace("[{}] {} to process message: {}, Last Rule Node: {}", TenantId.fromUUID(new UUID(tmp.getTenantIdMSB(), tmp.getTenantIdLSB())), prefix, tmpMsg, ruleNodeInfo); @@ -461,8 +461,8 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService< partitionService.removeQueue(queueDeleteMsg); } - private void forwardToRuleEngineActor(QueueId queueId, TenantId tenantId, ToRuleEngineMsg toRuleEngineMsg, TbMsgCallback callback) { - TbMsg tbMsg = TbMsg.fromBytes(queueId, toRuleEngineMsg.getTbMsg().toByteArray(), callback); + private void forwardToRuleEngineActor(String queueName, TenantId tenantId, ToRuleEngineMsg toRuleEngineMsg, TbMsgCallback callback) { + TbMsg tbMsg = TbMsg.fromBytes(queueName, toRuleEngineMsg.getTbMsg().toByteArray(), callback); QueueToRuleEngineMsg msg; ProtocolStringList relationTypesList = toRuleEngineMsg.getRelationTypesList(); Set relationTypes = null; diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingResult.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingResult.java index 2e3fedb766..001f4a4c2d 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingResult.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingResult.java @@ -29,7 +29,7 @@ import java.util.concurrent.ConcurrentMap; public class TbRuleEngineProcessingResult { @Getter - private final QueueId queueId; + private final String queueName; @Getter private final boolean success; @Getter @@ -37,8 +37,8 @@ public class TbRuleEngineProcessingResult { @Getter private final TbMsgPackProcessingContext ctx; - public TbRuleEngineProcessingResult(QueueId queueId, boolean timeout, TbMsgPackProcessingContext ctx) { - this.queueId = queueId; + public TbRuleEngineProcessingResult(String queueName, boolean timeout, TbMsgPackProcessingContext ctx) { + this.queueName = queueName; this.timeout = timeout; this.ctx = ctx; this.success = !timeout && ctx.getPendingMap().isEmpty() && ctx.getFailedMap().isEmpty(); diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java index 2191c38017..e50d560d18 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/TbRuleEngineProcessingStrategyFactory.java @@ -125,7 +125,7 @@ public class TbRuleEngineProcessingStrategyFactory { } log.debug("[{}] Going to reprocess {} messages", queueName, toReprocess.size()); if (log.isTraceEnabled()) { - toReprocess.forEach((id, msg) -> log.trace("Going to reprocess [{}]: {}", id, TbMsg.fromBytes(result.getQueueId(), msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY))); + toReprocess.forEach((id, msg) -> log.trace("Going to reprocess [{}]: {}", id, TbMsg.fromBytes(result.getQueueName(), msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY))); } if (pauseBetweenRetries > 0) { try { @@ -164,10 +164,10 @@ public class TbRuleEngineProcessingStrategyFactory { log.debug("[{}] Reprocessing skipped for {} failed and {} timeout messages", queueName, result.getFailedMap().size(), result.getPendingMap().size()); } if (log.isTraceEnabled()) { - result.getFailedMap().forEach((id, msg) -> log.trace("Failed messages [{}]: {}", id, TbMsg.fromBytes(result.getQueueId(), msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY))); + result.getFailedMap().forEach((id, msg) -> log.trace("Failed messages [{}]: {}", id, TbMsg.fromBytes(result.getQueueName(), msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY))); } if (log.isTraceEnabled()) { - result.getPendingMap().forEach((id, msg) -> log.trace("Timeout messages [{}]: {}", id, TbMsg.fromBytes(result.getQueueId(), msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY))); + result.getPendingMap().forEach((id, msg) -> log.trace("Timeout messages [{}]: {}", id, TbMsg.fromBytes(result.getQueueName(), msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY))); } return new TbRuleEngineProcessingDecision(true, null); } diff --git a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/DeviceProfileImportService.java b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/DeviceProfileImportService.java index 4c0b80b149..72be8e6bf7 100644 --- a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/DeviceProfileImportService.java +++ b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/DeviceProfileImportService.java @@ -53,7 +53,6 @@ public class DeviceProfileImportService extends BaseEntityImportService pageData = doGetTypedWithPageLink("/api/queues?serviceType=TB_RULE_ENGINE&", - new TypeReference<>() {}, pageLink); - Queue differentQueue = pageData.getData().get(0); - - loginTenantAdmin(); - - DeviceProfile deviceProfile = this.createDeviceProfile("Device Profile"); - deviceProfile.setDefaultQueueId(differentQueue.getId()); - doPost("/api/deviceProfile", deviceProfile).andExpect(status().isBadRequest()) - .andExpect(statusReason(containsString("Can't assign queue from different tenant!"))); - } - @Test public void testSaveDeviceProfileWithFirmwareFromDifferentTenant() throws Exception { loginDifferentTenant(); diff --git a/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java index c0276df82f..4bcb9c6d59 100644 --- a/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java @@ -163,7 +163,6 @@ abstract public class BaseEdgeTest extends AbstractControllerTest { private Tenant savedTenant; private TenantId tenantId; private User tenantAdmin; - private QueueId defaultQueueId; private DeviceProfile thermostatDeviceProfile; @@ -186,8 +185,6 @@ abstract public class BaseEdgeTest extends AbstractControllerTest { tenantId = savedTenant.getId(); Assert.assertNotNull(savedTenant); - defaultQueueId = getRandomQueueId(); - tenantAdmin = new User(); tenantAdmin.setAuthority(Authority.TENANT_ADMIN); tenantAdmin.setTenantId(savedTenant.getId()); @@ -378,7 +375,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest { @Test public void testDeviceProfiles() throws Exception { // 1 - DeviceProfile deviceProfile = this.createDeviceProfile("ONE_MORE_DEVICE_PROFILE", null, defaultQueueId); + DeviceProfile deviceProfile = this.createDeviceProfile("ONE_MORE_DEVICE_PROFILE", null); extendDeviceProfileData(deviceProfile); edgeImitator.expectMessageAmount(1); deviceProfile = doPost("/api/deviceProfile", deviceProfile, DeviceProfile.class); @@ -389,8 +386,6 @@ abstract public class BaseEdgeTest extends AbstractControllerTest { Assert.assertEquals(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, deviceProfileUpdateMsg.getMsgType()); Assert.assertEquals(deviceProfileUpdateMsg.getIdMSB(), deviceProfile.getUuidId().getMostSignificantBits()); Assert.assertEquals(deviceProfileUpdateMsg.getIdLSB(), deviceProfile.getUuidId().getLeastSignificantBits()); - Assert.assertEquals(defaultQueueId.getId().getMostSignificantBits(), deviceProfileUpdateMsg.getDefaultQueueIdMSB()); - Assert.assertEquals(defaultQueueId.getId().getLeastSignificantBits(), deviceProfileUpdateMsg.getDefaultQueueIdLSB()); // 2 edgeImitator.expectMessageAmount(1); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/DeviceProfile.java b/common/data/src/main/java/org/thingsboard/server/common/data/DeviceProfile.java index eb2881e63e..316780550c 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/DeviceProfile.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/DeviceProfile.java @@ -75,8 +75,6 @@ public class DeviceProfile extends SearchTextBased implements H @ApiModelProperty(position = 6, value = "Reference to the dashboard. Used in the mobile application to open the default dashboard when user navigates to device details.") private DashboardId defaultDashboardId; - @JsonIgnore - private QueueId defaultQueueId; @NoXss @ApiModelProperty(position = 8, value = "Rule engine queue name. " + "If present, the specified queue will be used to store all unprocessed messages related to device, including telemetry, attribute updates, etc. " + @@ -114,7 +112,6 @@ public class DeviceProfile extends SearchTextBased implements H this.isDefault = deviceProfile.isDefault(); this.defaultRuleChainId = deviceProfile.getDefaultRuleChainId(); this.defaultDashboardId = deviceProfile.getDefaultDashboardId(); - this.defaultQueueId = deviceProfile.getDefaultQueueId(); this.defaultQueueName = deviceProfile.getDefaultQueueName(); this.setProfileData(deviceProfile.getProfileData()); this.provisionDeviceKey = deviceProfile.getProvisionDeviceKey(); diff --git a/common/edge-api/src/main/proto/edge.proto b/common/edge-api/src/main/proto/edge.proto index 30fdd7053f..4b132d8c76 100644 --- a/common/edge-api/src/main/proto/edge.proto +++ b/common/edge-api/src/main/proto/edge.proto @@ -216,8 +216,6 @@ message DeviceProfileUpdateMsg { bytes profileDataBytes = 13; optional string provisionDeviceKey = 14; optional bytes image = 15; - int64 defaultQueueIdMSB = 16; - int64 defaultQueueIdLSB = 17; } message DeviceCredentialsUpdateMsg { diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java index bb3be80629..f8eb8c663e 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java @@ -43,7 +43,7 @@ import java.util.UUID; @Slf4j public final class TbMsg implements Serializable { - private final QueueId queueId; + private final String queueName; private final UUID id; private final long ts; private final String type; @@ -67,12 +67,12 @@ public final class TbMsg implements Serializable { return ctx.getAndIncrementRuleNodeCounter(); } - public static TbMsg newMsg(QueueId queueId, String type, EntityId originator, TbMsgMetaData metaData, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId) { - return newMsg(queueId, type, originator, null, metaData, data, ruleChainId, ruleNodeId); + public static TbMsg newMsg(String queueName, String type, EntityId originator, TbMsgMetaData metaData, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId) { + return newMsg(queueName, type, originator, null, metaData, data, ruleChainId, ruleNodeId); } - public static TbMsg newMsg(QueueId queueId, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId) { - return new TbMsg(queueId, UUID.randomUUID(), System.currentTimeMillis(), type, originator, customerId, + public static TbMsg newMsg(String queueName, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId) { + return new TbMsg(queueName, UUID.randomUUID(), System.currentTimeMillis(), type, originator, customerId, metaData.copy(), TbMsgDataType.JSON, data, ruleChainId, ruleNodeId, null, TbMsgCallback.EMPTY); } @@ -87,12 +87,12 @@ public final class TbMsg implements Serializable { // REALLY NEW MSG - public static TbMsg newMsg(QueueId queueId, String type, EntityId originator, TbMsgMetaData metaData, String data) { - return newMsg(queueId, type, originator, null, metaData, data); + public static TbMsg newMsg(String queueName, String type, EntityId originator, TbMsgMetaData metaData, String data) { + return newMsg(queueName, type, originator, null, metaData, data); } - public static TbMsg newMsg(QueueId queueId, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data) { - return new TbMsg(queueId, UUID.randomUUID(), System.currentTimeMillis(), type, originator, customerId, + public static TbMsg newMsg(String queueName, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data) { + return new TbMsg(queueName, UUID.randomUUID(), System.currentTimeMillis(), type, originator, customerId, metaData.copy(), TbMsgDataType.JSON, data, null, null, null, TbMsgCallback.EMPTY); } @@ -118,40 +118,40 @@ public final class TbMsg implements Serializable { } public static TbMsg transformMsg(TbMsg tbMsg, String type, EntityId originator, TbMsgMetaData metaData, String data) { - return new TbMsg(tbMsg.queueId, tbMsg.id, tbMsg.ts, type, originator, tbMsg.customerId, metaData.copy(), tbMsg.dataType, + return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, type, originator, tbMsg.customerId, metaData.copy(), tbMsg.dataType, data, tbMsg.ruleChainId, tbMsg.ruleNodeId, tbMsg.ctx.copy(), tbMsg.callback); } public static TbMsg transformMsg(TbMsg tbMsg, CustomerId customerId) { - return new TbMsg(tbMsg.queueId, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, customerId, tbMsg.metaData, tbMsg.dataType, + return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, customerId, tbMsg.metaData, tbMsg.dataType, tbMsg.data, tbMsg.ruleChainId, tbMsg.ruleNodeId, tbMsg.ctx.copy(), tbMsg.getCallback()); } public static TbMsg transformMsg(TbMsg tbMsg, RuleChainId ruleChainId) { - return new TbMsg(tbMsg.queueId, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, tbMsg.customerId, tbMsg.metaData, tbMsg.dataType, + return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, tbMsg.customerId, tbMsg.metaData, tbMsg.dataType, tbMsg.data, ruleChainId, null, tbMsg.ctx.copy(), tbMsg.getCallback()); } - public static TbMsg transformMsg(TbMsg tbMsg, QueueId queueId) { - return new TbMsg(queueId, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, tbMsg.customerId, tbMsg.metaData, tbMsg.dataType, + public static TbMsg transformMsg(TbMsg tbMsg, String queueName) { + return new TbMsg(queueName, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, tbMsg.customerId, tbMsg.metaData, tbMsg.dataType, tbMsg.data, tbMsg.getRuleChainId(), null, tbMsg.ctx.copy(), tbMsg.getCallback()); } - public static TbMsg transformMsg(TbMsg tbMsg, RuleChainId ruleChainId, QueueId queueId) { - return new TbMsg(queueId, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, tbMsg.customerId, tbMsg.metaData, tbMsg.dataType, + public static TbMsg transformMsg(TbMsg tbMsg, RuleChainId ruleChainId, String queueName) { + return new TbMsg(queueName, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, tbMsg.customerId, tbMsg.metaData, tbMsg.dataType, tbMsg.data, ruleChainId, null, tbMsg.ctx.copy(), tbMsg.getCallback()); } //used for enqueueForTellNext - public static TbMsg newMsg(TbMsg tbMsg, QueueId queueId, RuleChainId ruleChainId, RuleNodeId ruleNodeId) { - return new TbMsg(queueId, UUID.randomUUID(), tbMsg.getTs(), tbMsg.getType(), tbMsg.getOriginator(), tbMsg.customerId, tbMsg.getMetaData().copy(), + public static TbMsg newMsg(TbMsg tbMsg, String queueName, RuleChainId ruleChainId, RuleNodeId ruleNodeId) { + return new TbMsg(queueName, UUID.randomUUID(), tbMsg.getTs(), tbMsg.getType(), tbMsg.getOriginator(), tbMsg.customerId, tbMsg.getMetaData().copy(), tbMsg.getDataType(), tbMsg.getData(), ruleChainId, ruleNodeId, tbMsg.ctx.copy(), TbMsgCallback.EMPTY); } - private TbMsg(QueueId queueId, UUID id, long ts, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, TbMsgDataType dataType, String data, + private TbMsg(String queueName, UUID id, long ts, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, TbMsgDataType dataType, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId, TbMsgProcessingCtx ctx, TbMsgCallback callback) { this.id = id; - this.queueId = queueId; + this.queueName = queueName; if (ts > 0) { this.ts = ts; } else { @@ -220,7 +220,7 @@ public final class TbMsg implements Serializable { return builder.build().toByteArray(); } - public static TbMsg fromBytes(QueueId queueId, byte[] data, TbMsgCallback callback) { + public static TbMsg fromBytes(String queueName, byte[] data, TbMsgCallback callback) { try { MsgProtos.TbMsgProto proto = MsgProtos.TbMsgProto.parseFrom(data); TbMsgMetaData metaData = new TbMsgMetaData(proto.getMetaData().getDataMap()); @@ -247,7 +247,7 @@ public final class TbMsg implements Serializable { } TbMsgDataType dataType = TbMsgDataType.values()[proto.getDataType()]; - return new TbMsg(queueId, UUID.fromString(proto.getId()), proto.getTs(), proto.getType(), entityId, customerId, + return new TbMsg(queueName, UUID.fromString(proto.getId()), proto.getTs(), proto.getType(), entityId, customerId, metaData, dataType, proto.getData(), ruleChainId, ruleNodeId, ctx, callback); } catch (InvalidProtocolBufferException e) { throw new IllegalStateException("Could not parse protobuf for TbMsg", e); @@ -259,12 +259,12 @@ public final class TbMsg implements Serializable { } public TbMsg copyWithRuleChainId(RuleChainId ruleChainId, UUID msgId) { - return new TbMsg(this.queueId, msgId, this.ts, this.type, this.originator, this.customerId, + return new TbMsg(this.queueName, msgId, this.ts, this.type, this.originator, this.customerId, this.metaData, this.dataType, this.data, ruleChainId, null, this.ctx, callback); } public TbMsg copyWithRuleNodeId(RuleChainId ruleChainId, RuleNodeId ruleNodeId, UUID msgId) { - return new TbMsg(this.queueId, msgId, this.ts, this.type, this.originator, this.customerId, + return new TbMsg(this.queueName, msgId, this.ts, this.type, this.originator, this.customerId, this.metaData, this.dataType, this.data, ruleChainId, ruleNodeId, this.ctx, callback); } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java index 809c08ac6f..16ed860f52 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java @@ -68,8 +68,6 @@ public class HashPartitionService implements PartitionService { private final TenantRoutingInfoService tenantRoutingInfoService; private final QueueRoutingInfoService queueRoutingInfoService; - private final ConcurrentMap queuesById = new ConcurrentHashMap<>(); - private ConcurrentMap> myPartitions = new ConcurrentHashMap<>(); private final ConcurrentMap partitionTopicsMap = new ConcurrentHashMap<>(); @@ -121,7 +119,6 @@ public class HashPartitionService implements PartitionService { QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queue); partitionTopicsMap.put(queueKey, queue.getQueueTopic()); partitionSizesMap.put(queueKey, queue.getPartitions()); - queuesById.put(queue.getQueueId(), queue); }); } @@ -165,8 +162,6 @@ public class HashPartitionService implements PartitionService { public void updateQueue(TransportProtos.QueueUpdateMsg queueUpdateMsg) { TenantId tenantId = new TenantId(new UUID(queueUpdateMsg.getTenantIdMSB(), queueUpdateMsg.getTenantIdLSB())); QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueUpdateMsg.getQueueName(), tenantId); - QueueRoutingInfo queue = new QueueRoutingInfo(queueUpdateMsg); - queuesById.put(queue.getQueueId(), queue); partitionTopicsMap.put(queueKey, queueUpdateMsg.getQueueTopic()); partitionSizesMap.put(queueKey, queueUpdateMsg.getPartitions()); myPartitions.remove(queueKey); @@ -176,7 +171,6 @@ public class HashPartitionService implements PartitionService { public void removeQueue(TransportProtos.QueueDeleteMsg queueDeleteMsg) { TenantId tenantId = new TenantId(new UUID(queueDeleteMsg.getTenantIdMSB(), queueDeleteMsg.getTenantIdLSB())); QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueDeleteMsg.getQueueName(), tenantId); - queuesById.remove(new QueueId(new UUID(queueDeleteMsg.getQueueIdMSB(), queueDeleteMsg.getQueueIdLSB()))); myPartitions.remove(queueKey); partitionTopicsMap.remove(queueKey); partitionSizesMap.remove(queueKey); @@ -185,9 +179,7 @@ public class HashPartitionService implements PartitionService { } @Override - @Deprecated - public TopicPartitionInfo resolve(ServiceType serviceType, TenantId tenantId, EntityId entityId, String queueName) { - log.warn("This method is deprecated and will be removed!!!"); + public TopicPartitionInfo resolve(ServiceType serviceType, String queueName, TenantId tenantId, EntityId entityId) { TenantId isolatedOrSystemTenantId = getIsolatedOrSystemTenantId(serviceType, tenantId); QueueKey queueKey = new QueueKey(serviceType, queueName, isolatedOrSystemTenantId); if (!partitionSizesMap.containsKey(queueKey)) { @@ -201,32 +193,6 @@ public class HashPartitionService implements PartitionService { return resolve(serviceType, null, tenantId, entityId); } - @Override - public TopicPartitionInfo resolve(ServiceType serviceType, QueueId queueId, TenantId tenantId, EntityId entityId) { - QueueKey queueKey; - if (queueId == null) { - queueKey = getMainQueueKey(serviceType, tenantId); - } else { - QueueRoutingInfo queueRoutingInfo = queuesById.get(queueId); - - if (queueRoutingInfo == null) { - log.debug("Queue was removed but still used in CheckPoint rule node. [{}][{}]", tenantId, entityId); - queueKey = getMainQueueKey(serviceType, tenantId); - } else if (!queueRoutingInfo.getTenantId().equals(getIsolatedOrSystemTenantId(serviceType, tenantId))) { - log.debug("Tenant profile was changed but CheckPoint rule node still uses the queue from system level. [{}][{}]", tenantId, entityId); - queueKey = getMainQueueKey(serviceType, tenantId); - } else { - queueKey = new QueueKey(serviceType, queueRoutingInfo); - } - } - - return resolve(queueKey, entityId); - } - - private QueueKey getMainQueueKey(ServiceType serviceType, TenantId tenantId) { - return new QueueKey(serviceType, getIsolatedOrSystemTenantId(serviceType, tenantId)); - } - private TopicPartitionInfo resolve(QueueKey queueKey, EntityId entityId) { int hash = hashFunction.newHasher() .putLong(entityId.getId().getMostSignificantBits()) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java index 3844bf02c9..f65a83b287 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/PartitionService.java @@ -32,13 +32,10 @@ import java.util.UUID; */ public interface PartitionService { - @Deprecated - TopicPartitionInfo resolve(ServiceType serviceType, TenantId tenantId, EntityId entityId, String queueName); + TopicPartitionInfo resolve(ServiceType serviceType, String queueName, TenantId tenantId, EntityId entityId); TopicPartitionInfo resolve(ServiceType serviceType, TenantId tenantId, EntityId entityId); - TopicPartitionInfo resolve(ServiceType serviceType, QueueId queueId, TenantId tenantId, EntityId entityId); - /** * Received from the Discovery service when network topology is changed. * @param currentService - current service information {@link org.thingsboard.server.gen.transport.TransportProtos.ServiceInfo} diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index 68f8db272b..a63e4a01a2 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -1088,7 +1088,7 @@ public class DefaultTransportService implements TransportService { } private void sendToRuleEngine(TenantId tenantId, TbMsg tbMsg, TbQueueCallback callback) { - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueId(), tenantId, tbMsg.getOriginator()); + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueName(), tenantId, tbMsg.getOriginator()); if (log.isTraceEnabled()) { log.trace("[{}][{}] Pushing to topic {} message {}", tenantId, tbMsg.getOriginator(), tpi.getFullTopicName(), tbMsg); } @@ -1105,18 +1105,18 @@ public class DefaultTransportService implements TransportService { DeviceProfileId deviceProfileId = new DeviceProfileId(new UUID(sessionInfo.getDeviceProfileIdMSB(), sessionInfo.getDeviceProfileIdLSB())); DeviceProfile deviceProfile = deviceProfileCache.get(deviceProfileId); RuleChainId ruleChainId; - QueueId queueId; + String queueName; if (deviceProfile == null) { log.warn("[{}] Device profile is null!", deviceProfileId); ruleChainId = null; - queueId = null; + queueName = null; } else { ruleChainId = deviceProfile.getDefaultRuleChainId(); - queueId = deviceProfile.getDefaultQueueId(); + queueName = deviceProfile.getDefaultQueueName(); } - TbMsg tbMsg = TbMsg.newMsg(queueId, sessionMsgType.name(), deviceId, customerId, metaData, gson.toJson(json), ruleChainId, null); + TbMsg tbMsg = TbMsg.newMsg(queueName, sessionMsgType.name(), deviceId, customerId, metaData, gson.toJson(json), ruleChainId, null); sendToRuleEngine(tenantId, tbMsg, callback); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceProfileServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceProfileServiceImpl.java index 4cc019a206..306f99e2fb 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceProfileServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceProfileServiceImpl.java @@ -120,12 +120,6 @@ public class DeviceProfileServiceImpl extends AbstractCachedEntityService impl @Column(name = ModelConstants.DEVICE_PROFILE_DEFAULT_DASHBOARD_ID_PROPERTY) private UUID defaultDashboardId; - @Column(name = ModelConstants.DEVICE_PROFILE_DEFAULT_QUEUE_ID_PROPERTY) - private UUID defaultQueueId; - @Column(name = ModelConstants.DEVICE_PROFILE_DEFAULT_QUEUE_NAME_PROPERTY) private String defaultQueueName; @@ -137,9 +134,6 @@ public final class DeviceProfileEntity extends BaseSqlEntity impl this.defaultDashboardId = deviceProfile.getDefaultDashboardId().getId(); } this.defaultQueueName = deviceProfile.getDefaultQueueName(); - if (deviceProfile.getDefaultQueueId() != null) { - this.defaultQueueId = deviceProfile.getDefaultQueueId().getId(); - } this.provisionDeviceKey = deviceProfile.getProvisionDeviceKey(); if (deviceProfile.getFirmwareId() != null) { this.firmwareId = deviceProfile.getFirmwareId().getId(); @@ -188,9 +182,6 @@ public final class DeviceProfileEntity extends BaseSqlEntity impl if (defaultDashboardId != null) { deviceProfile.setDefaultDashboardId(new DashboardId(defaultDashboardId)); } - if (defaultQueueId != null) { - deviceProfile.setDefaultQueueId(new QueueId(defaultQueueId)); - } deviceProfile.setProvisionDeviceKey(provisionDeviceKey); if (firmwareId != null) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/service/validator/DeviceProfileDataValidator.java b/dao/src/main/java/org/thingsboard/server/dao/service/validator/DeviceProfileDataValidator.java index eec7bc8bc8..9ebc8e96cc 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/service/validator/DeviceProfileDataValidator.java +++ b/dao/src/main/java/org/thingsboard/server/dao/service/validator/DeviceProfileDataValidator.java @@ -126,16 +126,11 @@ public class DeviceProfileDataValidator extends AbstractHasOtaPackageValidator onFailure); - /** * Puts new message to custom queue for processing * * @param msg - message */ - void enqueue(TbMsg msg, QueueId queueId, Runnable onSuccess, Consumer onFailure); + void enqueue(TbMsg msg, String queueName, Runnable onSuccess, Consumer onFailure); void enqueueForTellFailure(TbMsg msg, String failureMessage); @@ -162,15 +159,15 @@ public interface TbContext { void enqueueForTellNext(TbMsg msg, Set relationTypes, Runnable onSuccess, Consumer onFailure); - void enqueueForTellNext(TbMsg msg, QueueId queueId, String relationType, Runnable onSuccess, Consumer onFailure); + void enqueueForTellNext(TbMsg msg, String queueName, String relationType, Runnable onSuccess, Consumer onFailure); - void enqueueForTellNext(TbMsg msg, QueueId queueId, Set relationTypes, Runnable onSuccess, Consumer onFailure); + void enqueueForTellNext(TbMsg msg, String queueName, Set relationTypes, Runnable onSuccess, Consumer onFailure); void ack(TbMsg tbMsg); - TbMsg newMsg(QueueId queueId, String type, EntityId originator, TbMsgMetaData metaData, String data); + TbMsg newMsg(String queueName, String type, EntityId originator, TbMsgMetaData metaData, String data); - TbMsg newMsg(QueueId queueId, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data); + TbMsg newMsg(String queueName, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data); TbMsg transformMsg(TbMsg origMsg, String type, EntityId originator, TbMsgMetaData metaData, String data); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCopyAttributesToEntityViewNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCopyAttributesToEntityViewNode.java index 7b4f1792c4..73e03892c9 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCopyAttributesToEntityViewNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCopyAttributesToEntityViewNode.java @@ -138,7 +138,7 @@ public class TbCopyAttributesToEntityViewNode implements TbNode { } private void transformAndTellNext(TbContext ctx, TbMsg msg, EntityView entityView) { - ctx.enqueueForTellNext(ctx.newMsg(msg.getQueueId(), msg.getType(), entityView.getId(), msg.getCustomerId(), msg.getMetaData(), msg.getData()), SUCCESS); + ctx.enqueueForTellNext(ctx.newMsg(msg.getQueueName(), msg.getType(), entityView.getId(), msg.getCustomerId(), msg.getMetaData(), msg.getData()), SUCCESS); } private boolean attributeContainsInEntityView(String scope, String attrKey, EntityView entityView) { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbMsgCountNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbMsgCountNode.java index f08b9c06a0..22bed5b4e4 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbMsgCountNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbMsgCountNode.java @@ -77,7 +77,7 @@ public class TbMsgCountNode implements TbNode { TbMsgMetaData metaData = new TbMsgMetaData(); metaData.putValue("delta", Long.toString(System.currentTimeMillis() - lastScheduledTs + delay)); - TbMsg tbMsg = TbMsg.newMsg(msg.getQueueId(), SessionMsgType.POST_TELEMETRY_REQUEST.name(), ctx.getTenantId(), msg.getCustomerId(), metaData, gson.toJson(telemetryJson)); + TbMsg tbMsg = TbMsg.newMsg(msg.getQueueName(), SessionMsgType.POST_TELEMETRY_REQUEST.name(), ctx.getTenantId(), msg.getCustomerId(), metaData, gson.toJson(telemetryJson)); ctx.enqueueForTellNext(tbMsg, SUCCESS); scheduleTickMsg(ctx, tbMsg); } else { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/flow/TbCheckpointNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/flow/TbCheckpointNode.java index ff69212c67..3fa2b5d64e 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/flow/TbCheckpointNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/flow/TbCheckpointNode.java @@ -41,17 +41,17 @@ import java.util.UUID; ) public class TbCheckpointNode implements TbNode { - private QueueId queueId; + private String queueName; @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { TbCheckpointNodeConfiguration config = TbNodeUtils.convert(configuration, TbCheckpointNodeConfiguration.class); - this.queueId = new QueueId(UUID.fromString(config.getQueueId())); + this.queueName = config.getQueueName(); } @Override public void onMsg(TbContext ctx, TbMsg msg) { - ctx.enqueueForTellNext(msg, queueId, TbRelationTypes.SUCCESS, () -> ctx.ack(msg), error -> ctx.tellFailure(msg, error)); + ctx.enqueueForTellNext(msg, queueName, TbRelationTypes.SUCCESS, () -> ctx.ack(msg), error -> ctx.tellFailure(msg, error)); } @Override diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/AlarmState.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/AlarmState.java index db41c0f6d5..831fd3c08c 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/AlarmState.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/AlarmState.java @@ -60,7 +60,7 @@ class AlarmState { private volatile Alarm currentAlarm; private volatile boolean initialFetchDone; private volatile TbMsgMetaData lastMsgMetaData; - private volatile QueueId lastMsgQueueId; + private volatile String lastMsgQueueName; private volatile DataSnapshot dataSnapshot; private final DynamicPredicateValueCtx dynamicPredicateValueCtx; @@ -74,7 +74,7 @@ class AlarmState { public boolean process(TbContext ctx, TbMsg msg, DataSnapshot data, SnapshotUpdate update) throws ExecutionException, InterruptedException { initCurrentAlarm(ctx); lastMsgMetaData = msg.getMetaData(); - lastMsgQueueId = msg.getQueueId(); + lastMsgQueueName = msg.getQueueName(); this.dataSnapshot = data; try { return createOrClearAlarms(ctx, msg, data, update, AlarmRuleState::eval); @@ -195,7 +195,7 @@ class AlarmState { metaData.putValue(DataConstants.IS_CLEARED_ALARM, Boolean.TRUE.toString()); } setAlarmConditionMetadata(ruleState, metaData); - TbMsg newMsg = ctx.newMsg(lastMsgQueueId != null ? lastMsgQueueId : null, "ALARM", + TbMsg newMsg = ctx.newMsg(lastMsgQueueName != null ? lastMsgQueueName : null, "ALARM", originator, msg != null ? msg.getCustomerId() : null, metaData, data); ctx.enqueueForTellNext(newMsg, relationType); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java index 265475c0d9..45b45e91f9 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/rpc/TbSendRPCRequestNode.java @@ -116,10 +116,10 @@ public class TbSendRPCRequestNode implements TbNode { ctx.getRpcService().sendRpcRequestToDevice(request, ruleEngineDeviceRpcResponse -> { if (ruleEngineDeviceRpcResponse.getError().isEmpty()) { - TbMsg next = ctx.newMsg(msg.getQueueId(), msg.getType(), msg.getOriginator(), msg.getCustomerId(), msg.getMetaData(), ruleEngineDeviceRpcResponse.getResponse().orElse("{}")); + TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getCustomerId(), msg.getMetaData(), ruleEngineDeviceRpcResponse.getResponse().orElse("{}")); ctx.enqueueForTellNext(next, TbRelationTypes.SUCCESS); } else { - TbMsg next = ctx.newMsg(msg.getQueueId(), msg.getType(), msg.getOriginator(), msg.getCustomerId(), msg.getMetaData(), wrap("error", ruleEngineDeviceRpcResponse.getError().get().name())); + TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getCustomerId(), msg.getMetaData(), wrap("error", ruleEngineDeviceRpcResponse.getError().get().name())); ctx.enqueueForTellFailure(next, ruleEngineDeviceRpcResponse.getError().get().name()); } });