Merge pull request #6926 from thingsboard/bug/queue-id-to-name

[3.4] Revert queue id to queue name in device profiles
This commit is contained in:
Andrew Shvayka 2022-07-12 13:45:52 +03:00 committed by GitHub
commit 4a5a5f5d98
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
49 changed files with 157 additions and 472 deletions

View File

@ -70,3 +70,4 @@ CREATE TABLE IF NOT EXISTS user_auth_settings (
two_fa_settings varchar
);
CREATE INDEX IF NOT EXISTS idx_api_usage_state_entity_id ON api_usage_state(entity_id);

View File

@ -1,49 +0,0 @@
--
-- Copyright © 2016-2022 The Thingsboard Authors
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
ALTER TABLE device_profile
ADD COLUMN IF NOT EXISTS default_queue_id uuid;
DO
$$
BEGIN
IF EXISTS
(SELECT column_name
FROM information_schema.columns
WHERE table_name = 'device_profile'
AND column_name = 'default_queue_name'
)
THEN
UPDATE device_profile
SET default_queue_id = q.id
FROM queue as q
WHERE default_queue_name = q.name;
END IF;
END
$$;
DO
$$
BEGIN
IF NOT EXISTS(SELECT 1 FROM pg_constraint WHERE conname = 'fk_default_queue_device_profile') THEN
ALTER TABLE device_profile
ADD CONSTRAINT fk_default_queue_device_profile FOREIGN KEY (default_queue_id) REFERENCES queue (id);
END IF;
END;
$$;
ALTER TABLE device_profile
DROP COLUMN IF EXISTS default_queue_name;

View File

@ -508,13 +508,8 @@ public class ActorSystemContext {
return partitionService.resolve(serviceType, tenantId, entityId);
}
public TopicPartitionInfo resolve(ServiceType serviceType, QueueId queueId, TenantId tenantId, EntityId entityId) {
return partitionService.resolve(serviceType, queueId, tenantId, entityId);
}
@Deprecated
public TopicPartitionInfo resolve(ServiceType serviceType, String queueName, TenantId tenantId, EntityId entityId) {
return partitionService.resolve(serviceType, tenantId, entityId, queueName);
return partitionService.resolve(serviceType, queueName, tenantId, entityId);
}
public String getServiceId() {

View File

@ -162,18 +162,11 @@ class DefaultTbContext implements TbContext {
}
@Override
@Deprecated
public void enqueue(TbMsg tbMsg, String queueName, Runnable onSuccess, Consumer<Throwable> onFailure) {
TopicPartitionInfo tpi = resolvePartition(tbMsg, queueName);
enqueue(tpi, tbMsg, onFailure, onSuccess);
}
@Override
public void enqueue(TbMsg tbMsg, QueueId queueId, Runnable onSuccess, Consumer<Throwable> onFailure) {
TopicPartitionInfo tpi = resolvePartition(tbMsg, queueId);
enqueue(tpi, tbMsg, onFailure, onSuccess);
}
private void enqueue(TopicPartitionInfo tpi, TbMsg tbMsg, Consumer<Throwable> onFailure, Runnable onSuccess) {
if (!tbMsg.isValid()) {
log.trace("[{}] Skip invalid message: {}", getTenantId(), tbMsg);
@ -223,35 +216,30 @@ class DefaultTbContext implements TbContext {
}
@Override
public void enqueueForTellNext(TbMsg tbMsg, QueueId queueId, String relationType, Runnable onSuccess, Consumer<Throwable> onFailure) {
TopicPartitionInfo tpi = resolvePartition(tbMsg, queueId);
enqueueForTellNext(tpi, queueId, tbMsg, Collections.singleton(relationType), null, onSuccess, onFailure);
public void enqueueForTellNext(TbMsg tbMsg, String queueName, String relationType, Runnable onSuccess, Consumer<Throwable> onFailure) {
TopicPartitionInfo tpi = resolvePartition(tbMsg, queueName);
enqueueForTellNext(tpi, queueName, tbMsg, Collections.singleton(relationType), null, onSuccess, onFailure);
}
@Override
public void enqueueForTellNext(TbMsg tbMsg, QueueId queueId, Set<String> relationTypes, Runnable onSuccess, Consumer<Throwable> onFailure) {
TopicPartitionInfo tpi = resolvePartition(tbMsg, queueId);
enqueueForTellNext(tpi, queueId, tbMsg, relationTypes, null, onSuccess, onFailure);
public void enqueueForTellNext(TbMsg tbMsg, String queueName, Set<String> relationTypes, Runnable onSuccess, Consumer<Throwable> onFailure) {
TopicPartitionInfo tpi = resolvePartition(tbMsg, queueName);
enqueueForTellNext(tpi, queueName, tbMsg, relationTypes, null, onSuccess, onFailure);
}
private TopicPartitionInfo resolvePartition(TbMsg tbMsg, QueueId queueId) {
return mainCtx.resolve(ServiceType.TB_RULE_ENGINE, queueId, getTenantId(), tbMsg.getOriginator());
}
@Deprecated
private TopicPartitionInfo resolvePartition(TbMsg tbMsg, String queueName) {
return mainCtx.resolve(ServiceType.TB_RULE_ENGINE, queueName, getTenantId(), tbMsg.getOriginator());
}
private TopicPartitionInfo resolvePartition(TbMsg tbMsg) {
return resolvePartition(tbMsg, tbMsg.getQueueId());
return resolvePartition(tbMsg, tbMsg.getQueueName());
}
private void enqueueForTellNext(TopicPartitionInfo tpi, TbMsg source, Set<String> relationTypes, String failureMessage, Runnable onSuccess, Consumer<Throwable> onFailure) {
enqueueForTellNext(tpi, source.getQueueId(), source, relationTypes, failureMessage, onSuccess, onFailure);
enqueueForTellNext(tpi, source.getQueueName(), source, relationTypes, failureMessage, onSuccess, onFailure);
}
private void enqueueForTellNext(TopicPartitionInfo tpi, QueueId queueId, TbMsg source, Set<String> relationTypes, String failureMessage, Runnable onSuccess, Consumer<Throwable> onFailure) {
private void enqueueForTellNext(TopicPartitionInfo tpi, String queueName, TbMsg source, Set<String> relationTypes, String failureMessage, Runnable onSuccess, Consumer<Throwable> onFailure) {
if (!source.isValid()) {
log.trace("[{}] Skip invalid message: {}", getTenantId(), source);
if (onFailure != null) {
@ -261,7 +249,7 @@ class DefaultTbContext implements TbContext {
}
RuleChainId ruleChainId = nodeCtx.getSelf().getRuleChainId();
RuleNodeId ruleNodeId = nodeCtx.getSelf().getId();
TbMsg tbMsg = TbMsg.newMsg(source, queueId, ruleChainId, ruleNodeId);
TbMsg tbMsg = TbMsg.newMsg(source, queueName, ruleChainId, ruleNodeId);
TransportProtos.ToRuleEngineMsg.Builder msg = TransportProtos.ToRuleEngineMsg.newBuilder()
.setTenantIdMSB(getTenantId().getId().getMostSignificantBits())
.setTenantIdLSB(getTenantId().getId().getLeastSignificantBits())
@ -320,13 +308,13 @@ class DefaultTbContext implements TbContext {
}
@Override
public TbMsg newMsg(QueueId queueId, String type, EntityId originator, TbMsgMetaData metaData, String data) {
return newMsg(queueId, type, originator, null, metaData, data);
public TbMsg newMsg(String queueName, String type, EntityId originator, TbMsgMetaData metaData, String data) {
return newMsg(queueName, type, originator, null, metaData, data);
}
@Override
public TbMsg newMsg(QueueId queueId, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data) {
return TbMsg.newMsg(queueId, type, originator, customerId, metaData, data, nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId());
public TbMsg newMsg(String queueName, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data) {
return TbMsg.newMsg(queueName, type, originator, customerId, metaData, data, nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId());
}
@Override
@ -340,17 +328,17 @@ class DefaultTbContext implements TbContext {
public TbMsg deviceCreatedMsg(Device device, RuleNodeId ruleNodeId) {
RuleChainId ruleChainId = null;
QueueId queueId = null;
String queueName = null;
if (device.getDeviceProfileId() != null) {
DeviceProfile deviceProfile = mainCtx.getDeviceProfileCache().find(device.getDeviceProfileId());
if (deviceProfile == null) {
log.warn("[{}] Device profile is null!", device.getDeviceProfileId());
} else {
ruleChainId = deviceProfile.getDefaultRuleChainId();
queueId = deviceProfile.getDefaultQueueId();
queueName = deviceProfile.getDefaultQueueName();
}
}
return entityActionMsg(device, device.getId(), ruleNodeId, DataConstants.ENTITY_CREATED, queueId, ruleChainId);
return entityActionMsg(device, device.getId(), ruleNodeId, DataConstants.ENTITY_CREATED, queueName, ruleChainId);
}
public TbMsg assetCreatedMsg(Asset asset, RuleNodeId ruleNodeId) {
@ -359,7 +347,7 @@ class DefaultTbContext implements TbContext {
public TbMsg alarmActionMsg(Alarm alarm, RuleNodeId ruleNodeId, String action) {
RuleChainId ruleChainId = null;
QueueId queueId = null;
String queueName = null;
if (EntityType.DEVICE.equals(alarm.getOriginator().getEntityType())) {
DeviceId deviceId = new DeviceId(alarm.getOriginator().getId());
DeviceProfile deviceProfile = mainCtx.getDeviceProfileCache().get(getTenantId(), deviceId);
@ -367,10 +355,10 @@ class DefaultTbContext implements TbContext {
log.warn("[{}] Device profile is null!", deviceId);
} else {
ruleChainId = deviceProfile.getDefaultRuleChainId();
queueId = deviceProfile.getDefaultQueueId();
queueName = deviceProfile.getDefaultQueueName();
}
}
return entityActionMsg(alarm, alarm.getId(), ruleNodeId, action, queueId, ruleChainId);
return entityActionMsg(alarm, alarm.getId(), ruleNodeId, action, queueName, ruleChainId);
}
@Override
@ -382,9 +370,9 @@ class DefaultTbContext implements TbContext {
return entityActionMsg(entity, id, ruleNodeId, action, null, null);
}
public <E, I extends EntityId> TbMsg entityActionMsg(E entity, I id, RuleNodeId ruleNodeId, String action, QueueId queueId, RuleChainId ruleChainId) {
public <E, I extends EntityId> TbMsg entityActionMsg(E entity, I id, RuleNodeId ruleNodeId, String action, String queueName, RuleChainId ruleChainId) {
try {
return TbMsg.newMsg(queueId, action, id, getActionMetaData(ruleNodeId), mapper.writeValueAsString(mapper.valueToTree(entity)), ruleChainId, null);
return TbMsg.newMsg(queueName, action, id, getActionMetaData(ruleNodeId), mapper.writeValueAsString(mapper.valueToTree(entity)), ruleChainId, null);
} catch (JsonProcessingException | IllegalArgumentException e) {
throw new RuntimeException("Failed to process " + id.getEntityType().name().toLowerCase() + " " + action + " msg: " + e);
}

View File

@ -293,7 +293,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
try {
checkComponentStateActive(msg);
EntityId entityId = msg.getOriginator();
TopicPartitionInfo tpi = systemContext.resolve(ServiceType.TB_RULE_ENGINE, msg.getQueueId(), tenantId, entityId);
TopicPartitionInfo tpi = systemContext.resolve(ServiceType.TB_RULE_ENGINE, msg.getQueueName(), tenantId, entityId);
List<RuleNodeRelation> ruleNodeRelations = nodeRoutes.get(originatorNodeId);
if (ruleNodeRelations == null) { // When unchecked, this will cause NullPointerException when rule node doesn't exist anymore

View File

@ -75,6 +75,14 @@ public class QueueController extends BaseController {
return checkNotNull(queueService.findQueueById(getTenantId(), queueId));
}
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN')")
@RequestMapping(value = "/queues/name/{queueName}", method = RequestMethod.GET)
@ResponseBody
public Queue getQueueByName(@PathVariable("queueName") String queueName) throws ThingsboardException {
checkParameter("queueName", queueName);
return checkNotNull(queueService.findQueueByTenantIdAndName(getTenantId(), queueName));
}
@PreAuthorize("hasAnyAuthority('SYS_ADMIN')")
@RequestMapping(value = "/queues", params = {"serviceType"}, method = RequestMethod.POST)
@ResponseBody

View File

@ -48,9 +48,8 @@ public class DeviceProfileMsgConstructor {
// builder.setDefaultRuleChainIdMSB(deviceProfile.getDefaultRuleChainId().getId().getMostSignificantBits())
// .setDefaultRuleChainIdLSB(deviceProfile.getDefaultRuleChainId().getId().getLeastSignificantBits());
// }
if (deviceProfile.getDefaultQueueId() != null) {
builder.setDefaultQueueIdMSB(deviceProfile.getDefaultQueueId().getId().getMostSignificantBits())
.setDefaultQueueIdLSB(deviceProfile.getDefaultQueueId().getId().getLeastSignificantBits());
if (deviceProfile.getDefaultQueueName() != null) {
builder.setDefaultQueueName(deviceProfile.getDefaultQueueName());
}
if (deviceProfile.getDescription() != null) {
builder.setDescription(deviceProfile.getDescription());

View File

@ -158,21 +158,21 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor {
return metaData;
}
private Pair<QueueId, RuleChainId> getDefaultQueueNameAndRuleChainId(TenantId tenantId, EntityId entityId) {
private Pair<String, RuleChainId> getDefaultQueueNameAndRuleChainId(TenantId tenantId, EntityId entityId) {
if (EntityType.DEVICE.equals(entityId.getEntityType())) {
DeviceProfile deviceProfile = deviceProfileCache.get(tenantId, new DeviceId(entityId.getId()));
RuleChainId ruleChainId;
QueueId queueId;
String queueName;
if (deviceProfile == null) {
log.warn("[{}] Device profile is null!", entityId);
ruleChainId = null;
queueId = null;
queueName = null;
} else {
ruleChainId = deviceProfile.getDefaultRuleChainId();
queueId = deviceProfile.getDefaultQueueId();
queueName = deviceProfile.getDefaultQueueName();
}
return new ImmutablePair<>(queueId, ruleChainId);
return new ImmutablePair<>(queueName, ruleChainId);
} else {
return new ImmutablePair<>(null, null);
}
@ -183,10 +183,8 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor {
for (TransportProtos.TsKvListProto tsKv : msg.getTsKvListList()) {
JsonObject json = JsonUtils.getJsonObject(tsKv.getKvList());
metaData.putValue("ts", tsKv.getTs() + "");
Pair<QueueId, RuleChainId> defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId);
QueueId queueId = defaultQueueAndRuleChain.getKey();
RuleChainId ruleChainId = defaultQueueAndRuleChain.getValue();
TbMsg tbMsg = TbMsg.newMsg(queueId, SessionMsgType.POST_TELEMETRY_REQUEST.name(), entityId, customerId, metaData, gson.toJson(json), ruleChainId, null);
var defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId);
TbMsg tbMsg = TbMsg.newMsg(defaultQueueAndRuleChain.getKey(), SessionMsgType.POST_TELEMETRY_REQUEST.name(), entityId, customerId, metaData, gson.toJson(json), defaultQueueAndRuleChain.getValue(), null);
tbClusterService.pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
@ -206,10 +204,8 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor {
private ListenableFuture<Void> processPostAttributes(TenantId tenantId, CustomerId customerId, EntityId entityId, TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData) {
SettableFuture<Void> futureToSet = SettableFuture.create();
JsonObject json = JsonUtils.getJsonObject(msg.getKvList());
Pair<QueueId, RuleChainId> defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId);
QueueId queueId = defaultQueueAndRuleChain.getKey();
RuleChainId ruleChainId = defaultQueueAndRuleChain.getValue();
TbMsg tbMsg = TbMsg.newMsg(queueId, SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), entityId, customerId, metaData, gson.toJson(json), ruleChainId, null);
var defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId);
TbMsg tbMsg = TbMsg.newMsg(defaultQueueAndRuleChain.getKey(), SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), entityId, customerId, metaData, gson.toJson(json), defaultQueueAndRuleChain.getValue(), null);
tbClusterService.pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
@ -233,10 +229,8 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor {
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(@Nullable List<String> keys) {
Pair<QueueId, RuleChainId> defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId);
QueueId queueId = defaultQueueAndRuleChain.getKey();
RuleChainId ruleChainId = defaultQueueAndRuleChain.getValue();
TbMsg tbMsg = TbMsg.newMsg(queueId, DataConstants.ATTRIBUTES_UPDATED, entityId, customerId, metaData, gson.toJson(json), ruleChainId, null);
var defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId);
TbMsg tbMsg = TbMsg.newMsg(defaultQueueAndRuleChain.getKey(), DataConstants.ATTRIBUTES_UPDATED, entityId, customerId, metaData, gson.toJson(json), defaultQueueAndRuleChain.getValue(), null);
tbClusterService.pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {

View File

@ -48,13 +48,11 @@ import java.util.stream.Collectors;
@TbCoreComponent
@AllArgsConstructor
public class DefaultTbQueueService extends AbstractTbEntityService implements TbQueueService {
private static final String MAIN = "Main";
private static final long DELETE_DELAY = 30;
private final QueueService queueService;
private final TbClusterService tbClusterService;
private final TbQueueAdmin tbQueueAdmin;
private final DeviceProfileService deviceProfileService;
private final SchedulerComponent scheduler;
@Override
@ -205,35 +203,7 @@ public class DefaultTbQueueService extends AbstractTbEntityService implements Tb
}
tenantIds.forEach(tenantId -> {
Map<QueueId, List<DeviceProfile>> deviceProfileQueues;
if (oldTenantProfile != null && !newTenantProfile.getId().equals(oldTenantProfile.getId()) || !toRemove.isEmpty()) {
List<DeviceProfile> deviceProfiles = deviceProfileService.findDeviceProfiles(tenantId, new PageLink(Integer.MAX_VALUE)).getData();
deviceProfileQueues = deviceProfiles.stream()
.filter(dp -> dp.getDefaultQueueId() != null)
.collect(Collectors.groupingBy(DeviceProfile::getDefaultQueueId));
} else {
deviceProfileQueues = Collections.emptyMap();
}
Map<String, QueueId> createdQueues = toCreate.stream()
.map(key -> saveQueue(new Queue(tenantId, newQueues.get(key))))
.collect(Collectors.toMap(Queue::getName, Queue::getId));
// assigning created queues to device profiles instead of system queues
if (oldTenantProfile != null && !oldTenantProfile.isIsolatedTbRuleEngine()) {
deviceProfileQueues.forEach((queueId, list) -> {
Queue queue = queueService.findQueueById(TenantId.SYS_TENANT_ID, queueId);
QueueId queueIdToAssign = createdQueues.get(queue.getName());
if (queueIdToAssign == null) {
queueIdToAssign = createdQueues.get(MAIN);
}
for (DeviceProfile deviceProfile : list) {
deviceProfile.setDefaultQueueId(queueIdToAssign);
saveDeviceProfile(deviceProfile);
}
});
}
toCreate.forEach(key -> saveQueue(new Queue(tenantId, newQueues.get(key))));
toUpdate.forEach(key -> {
Queue queueToUpdate = new Queue(tenantId, newQueues.get(key));
@ -241,9 +211,7 @@ public class DefaultTbQueueService extends AbstractTbEntityService implements Tb
queueToUpdate.setId(foundQueue.getId());
queueToUpdate.setCreatedTime(foundQueue.getCreatedTime());
if (queueToUpdate.equals(foundQueue)) {
//Queue not changed
} else {
if (!queueToUpdate.equals(foundQueue)) {
saveQueue(queueToUpdate);
}
});
@ -251,26 +219,9 @@ public class DefaultTbQueueService extends AbstractTbEntityService implements Tb
toRemove.forEach(q -> {
Queue queue = queueService.findQueueByTenantIdAndNameInternal(tenantId, q);
QueueId queueIdForRemove = queue.getId();
if (deviceProfileQueues.containsKey(queueIdForRemove)) {
Queue foundQueue = queueService.findQueueByTenantIdAndName(tenantId, q);
if (foundQueue == null || queue.equals(foundQueue)) {
foundQueue = queueService.findQueueByTenantIdAndName(tenantId, MAIN);
}
QueueId newQueueId = foundQueue.getId();
deviceProfileQueues.get(queueIdForRemove).stream()
.peek(dp -> dp.setDefaultQueueId(newQueueId))
.forEach(this::saveDeviceProfile);
}
deleteQueue(tenantId, queueIdForRemove);
});
});
}
//TODO: remove after implementing TbDeviceProfileService
private void saveDeviceProfile(DeviceProfile deviceProfile) {
DeviceProfile savedDeviceProfile = deviceProfileService.saveDeviceProfile(deviceProfile);
tbClusterService.onDeviceProfileChange(savedDeviceProfile, null);
tbClusterService.broadcastEntityStateChangeEvent(deviceProfile.getTenantId(), savedDeviceProfile.getId(), ComponentLifecycleEvent.UPDATED);
}
}

View File

@ -589,10 +589,6 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService
} catch (Exception e) {
}
log.info("Updating device profiles...");
schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "3.3.4", "schema_update_device_profile.sql");
loadSql(schemaUpdateFile, conn);
log.info("Updating schema settings...");
conn.createStatement().execute("UPDATE tb_schema_settings SET schema_version = 3004000;");
log.info("Schema updated.");

View File

@ -158,7 +158,6 @@ public class DefaultDataUpdateService implements DataUpdateService {
log.info("Updating data from version 3.3.4 to 3.4.0 ...");
rateLimitsUpdater.updateEntities();
tenantsProfileQueueConfigurationUpdater.updateEntities();
checkPointRuleNodesUpdater.updateEntities();
break;
default:
throw new RuntimeException("Unable to update data, unsupported fromVersion: " + fromVersion);
@ -629,47 +628,4 @@ public class DefaultDataUpdateService implements DataUpdateService {
return mainQueueConfiguration;
}
private final PaginatedUpdater<String, RuleNode> checkPointRuleNodesUpdater =
new PaginatedUpdater<>() {
@Override
protected String getName() {
return "Checkpoint rule nodes updater";
}
@Override
protected boolean forceReportTotal() {
return true;
}
@Override
protected PageData<RuleNode> findEntities(String id, PageLink pageLink) {
return ruleChainService.findAllRuleNodesByType("org.thingsboard.rule.engine.flow.TbCheckpointNode", pageLink);
}
@Override
protected void updateEntity(RuleNode ruleNode) {
updateCheckPointRuleNodeConfiguration(ruleNode);
}
};
private void updateCheckPointRuleNodeConfiguration(RuleNode node) {
try {
ObjectNode configuration = (ObjectNode) node.getConfiguration();
JsonNode queueNameNode = configuration.remove("queueName");
if (queueNameNode != null) {
RuleChain ruleChain = this.ruleChainService.findRuleChainById(TenantId.SYS_TENANT_ID, node.getRuleChainId());
TenantId tenantId = ruleChain.getTenantId();
Map<String, QueueId> queues =
queueService.findQueuesByTenantId(tenantId).stream().collect(Collectors.toMap(Queue::getName, Queue::getId));
String queueName = queueNameNode.asText();
QueueId queueId = queues.get(queueName);
configuration.put("queueId", queueId != null ? queueId.toString() : "");
ruleChainService.saveRuleNode(tenantId, node);
}
} catch (Exception e) {
log.error("Failed to update checkpoint rule node configuration name=["+node.getName()+"], id=["+ node.getId().getId() +"]", e);
}
}
}

View File

@ -181,7 +181,7 @@ public class DefaultTbClusterService implements TbClusterService {
tbMsg = transformMsg(tbMsg, deviceProfileCache.get(tenantId, new DeviceProfileId(entityId.getId())));
}
}
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueId(), tenantId, entityId);
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueName(), tenantId, entityId);
log.trace("PUSHING msg: {} to:{}", tbMsg, tpi);
ToRuleEngineMsg msg = ToRuleEngineMsg.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
@ -194,16 +194,16 @@ public class DefaultTbClusterService implements TbClusterService {
private TbMsg transformMsg(TbMsg tbMsg, DeviceProfile deviceProfile) {
if (deviceProfile != null) {
RuleChainId targetRuleChainId = deviceProfile.getDefaultRuleChainId();
QueueId targetQueueId = deviceProfile.getDefaultQueueId();
String targetQueueName = deviceProfile.getDefaultQueueName();
boolean isRuleChainTransform = targetRuleChainId != null && !targetRuleChainId.equals(tbMsg.getRuleChainId());
boolean isQueueTransform = targetQueueId != null && !targetQueueId.equals(tbMsg.getQueueId());
boolean isQueueTransform = targetQueueName != null && !targetQueueName.equals(tbMsg.getQueueName());
if (isRuleChainTransform && isQueueTransform) {
tbMsg = TbMsg.transformMsg(tbMsg, targetRuleChainId, targetQueueId);
tbMsg = TbMsg.transformMsg(tbMsg, targetRuleChainId, targetQueueName);
} else if (isRuleChainTransform) {
tbMsg = TbMsg.transformMsg(tbMsg, targetRuleChainId);
} else if (isQueueTransform) {
tbMsg = TbMsg.transformMsg(tbMsg, targetQueueId);
tbMsg = TbMsg.transformMsg(tbMsg, targetQueueName);
}
}
return tbMsg;

View File

@ -274,7 +274,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
final boolean timeout = !ctx.await(configuration.getPackProcessingTimeout(), TimeUnit.MILLISECONDS);
TbRuleEngineProcessingResult result = new TbRuleEngineProcessingResult(configuration.getId(), timeout, ctx);
TbRuleEngineProcessingResult result = new TbRuleEngineProcessingResult(configuration.getName(), timeout, ctx);
if (timeout) {
printFirstOrAll(configuration, ctx, ctx.getPendingMap(), "Timeout");
}
@ -339,7 +339,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
new TbMsgPackCallback(id, tenantId, ctx);
try {
if (toRuleEngineMsg.getTbMsg() != null && !toRuleEngineMsg.getTbMsg().isEmpty()) {
forwardToRuleEngineActor(configuration.getId(), tenantId, toRuleEngineMsg, callback);
forwardToRuleEngineActor(configuration.getName(), tenantId, toRuleEngineMsg, callback);
} else {
callback.onSuccess();
}
@ -353,7 +353,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
log.info("{} to process [{}] messages", prefix, map.size());
for (Map.Entry<UUID, TbProtoQueueMsg<ToRuleEngineMsg>> pending : map.entrySet()) {
ToRuleEngineMsg tmp = pending.getValue().getValue();
TbMsg tmpMsg = TbMsg.fromBytes(configuration.getId(), tmp.getTbMsg().toByteArray(), TbMsgCallback.EMPTY);
TbMsg tmpMsg = TbMsg.fromBytes(configuration.getName(), tmp.getTbMsg().toByteArray(), TbMsgCallback.EMPTY);
RuleNodeInfo ruleNodeInfo = ctx.getLastVisitedRuleNode(pending.getKey());
if (printAll) {
log.trace("[{}] {} to process message: {}, Last Rule Node: {}", TenantId.fromUUID(new UUID(tmp.getTenantIdMSB(), tmp.getTenantIdLSB())), prefix, tmpMsg, ruleNodeInfo);
@ -461,8 +461,8 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
partitionService.removeQueue(queueDeleteMsg);
}
private void forwardToRuleEngineActor(QueueId queueId, TenantId tenantId, ToRuleEngineMsg toRuleEngineMsg, TbMsgCallback callback) {
TbMsg tbMsg = TbMsg.fromBytes(queueId, toRuleEngineMsg.getTbMsg().toByteArray(), callback);
private void forwardToRuleEngineActor(String queueName, TenantId tenantId, ToRuleEngineMsg toRuleEngineMsg, TbMsgCallback callback) {
TbMsg tbMsg = TbMsg.fromBytes(queueName, toRuleEngineMsg.getTbMsg().toByteArray(), callback);
QueueToRuleEngineMsg msg;
ProtocolStringList relationTypesList = toRuleEngineMsg.getRelationTypesList();
Set<String> relationTypes = null;

View File

@ -29,7 +29,7 @@ import java.util.concurrent.ConcurrentMap;
public class TbRuleEngineProcessingResult {
@Getter
private final QueueId queueId;
private final String queueName;
@Getter
private final boolean success;
@Getter
@ -37,8 +37,8 @@ public class TbRuleEngineProcessingResult {
@Getter
private final TbMsgPackProcessingContext ctx;
public TbRuleEngineProcessingResult(QueueId queueId, boolean timeout, TbMsgPackProcessingContext ctx) {
this.queueId = queueId;
public TbRuleEngineProcessingResult(String queueName, boolean timeout, TbMsgPackProcessingContext ctx) {
this.queueName = queueName;
this.timeout = timeout;
this.ctx = ctx;
this.success = !timeout && ctx.getPendingMap().isEmpty() && ctx.getFailedMap().isEmpty();

View File

@ -125,7 +125,7 @@ public class TbRuleEngineProcessingStrategyFactory {
}
log.debug("[{}] Going to reprocess {} messages", queueName, toReprocess.size());
if (log.isTraceEnabled()) {
toReprocess.forEach((id, msg) -> log.trace("Going to reprocess [{}]: {}", id, TbMsg.fromBytes(result.getQueueId(), msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY)));
toReprocess.forEach((id, msg) -> log.trace("Going to reprocess [{}]: {}", id, TbMsg.fromBytes(result.getQueueName(), msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY)));
}
if (pauseBetweenRetries > 0) {
try {
@ -164,10 +164,10 @@ public class TbRuleEngineProcessingStrategyFactory {
log.debug("[{}] Reprocessing skipped for {} failed and {} timeout messages", queueName, result.getFailedMap().size(), result.getPendingMap().size());
}
if (log.isTraceEnabled()) {
result.getFailedMap().forEach((id, msg) -> log.trace("Failed messages [{}]: {}", id, TbMsg.fromBytes(result.getQueueId(), msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY)));
result.getFailedMap().forEach((id, msg) -> log.trace("Failed messages [{}]: {}", id, TbMsg.fromBytes(result.getQueueName(), msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY)));
}
if (log.isTraceEnabled()) {
result.getPendingMap().forEach((id, msg) -> log.trace("Timeout messages [{}]: {}", id, TbMsg.fromBytes(result.getQueueId(), msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY)));
result.getPendingMap().forEach((id, msg) -> log.trace("Timeout messages [{}]: {}", id, TbMsg.fromBytes(result.getQueueName(), msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY)));
}
return new TbRuleEngineProcessingDecision(true, null);
}

View File

@ -53,7 +53,6 @@ public class DeviceProfileImportService extends BaseEntityImportService<DevicePr
deviceProfile.setDefaultDashboardId(idProvider.getInternalId(deviceProfile.getDefaultDashboardId()));
deviceProfile.setFirmwareId(getOldEntityField(old, DeviceProfile::getFirmwareId));
deviceProfile.setSoftwareId(getOldEntityField(old, DeviceProfile::getSoftwareId));
deviceProfile.setDefaultQueueId(getOldEntityField(old, DeviceProfile::getDefaultQueueId));
return deviceProfile;
}

View File

@ -422,12 +422,6 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest {
}
protected DeviceProfile createDeviceProfile(String name, DeviceProfileTransportConfiguration deviceProfileTransportConfiguration) {
return createDeviceProfile(name, deviceProfileTransportConfiguration, null);
}
protected DeviceProfile createDeviceProfile(String name,
DeviceProfileTransportConfiguration deviceProfileTransportConfiguration,
QueueId defaultQueueId) {
DeviceProfile deviceProfile = new DeviceProfile();
deviceProfile.setName(name);
deviceProfile.setType(DeviceProfileType.DEFAULT);
@ -445,7 +439,6 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest {
deviceProfile.setProfileData(deviceProfileData);
deviceProfile.setDefault(false);
deviceProfile.setDefaultRuleChainId(null);
deviceProfile.setDefaultQueueId(defaultQueueId);
return deviceProfile;
}

View File

@ -39,7 +39,6 @@ import org.thingsboard.server.common.data.DeviceTransportType;
import org.thingsboard.server.common.data.OtaPackageInfo;
import org.thingsboard.server.common.data.SaveOtaPackageInfoRequest;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.TenantProfile;
import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.audit.ActionType;
import org.thingsboard.server.common.data.device.profile.DeviceProfileTransportConfiguration;
@ -49,15 +48,8 @@ import org.thingsboard.server.common.data.device.profile.ProtoTransportPayloadCo
import org.thingsboard.server.common.data.device.profile.TransportPayloadTypeConfiguration;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.queue.ProcessingStrategy;
import org.thingsboard.server.common.data.queue.ProcessingStrategyType;
import org.thingsboard.server.common.data.queue.Queue;
import org.thingsboard.server.common.data.queue.SubmitStrategy;
import org.thingsboard.server.common.data.queue.SubmitStrategyType;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.security.Authority;
import org.thingsboard.server.common.data.tenant.profile.TenantProfileData;
import org.thingsboard.server.common.data.tenant.profile.TenantProfileQueueConfiguration;
import org.thingsboard.server.dao.exception.DataValidationException;
import java.util.ArrayList;
@ -389,55 +381,6 @@ public abstract class BaseDeviceProfileControllerTest extends AbstractController
.andExpect(statusReason(containsString("Can't assign dashboard from different tenant!")));
}
@Test
public void testSaveDeviceProfileWithQueueFromDifferentTenant() throws Exception {
loginDifferentTenant();
loginSysAdmin();
TenantProfile tenantProfile = new TenantProfile();
tenantProfile.setDefault(false);
tenantProfile.setName("Isolated TB Rule Engine");
tenantProfile.setDescription("Isolated TB Rule Engine tenant profile");
tenantProfile.setIsolatedTbCore(false);
tenantProfile.setIsolatedTbRuleEngine(true);
TenantProfileQueueConfiguration mainQueueConfiguration = new TenantProfileQueueConfiguration();
mainQueueConfiguration.setName("Main");
mainQueueConfiguration.setTopic("tb_rule_engine.main");
mainQueueConfiguration.setPollInterval(25);
mainQueueConfiguration.setPartitions(10);
mainQueueConfiguration.setConsumerPerPartition(true);
mainQueueConfiguration.setPackProcessingTimeout(2000);
SubmitStrategy mainQueueSubmitStrategy = new SubmitStrategy();
mainQueueSubmitStrategy.setType(SubmitStrategyType.BURST);
mainQueueSubmitStrategy.setBatchSize(1000);
mainQueueConfiguration.setSubmitStrategy(mainQueueSubmitStrategy);
ProcessingStrategy mainQueueProcessingStrategy = new ProcessingStrategy();
mainQueueProcessingStrategy.setType(ProcessingStrategyType.SKIP_ALL_FAILURES);
mainQueueProcessingStrategy.setRetries(3);
mainQueueProcessingStrategy.setFailurePercentage(0);
mainQueueProcessingStrategy.setPauseBetweenRetries(3);
mainQueueProcessingStrategy.setMaxPauseBetweenRetries(3);
mainQueueConfiguration.setProcessingStrategy(mainQueueProcessingStrategy);
TenantProfileData profileData = tenantProfile.getProfileData();
profileData.setQueueConfiguration(Collections.singletonList(mainQueueConfiguration));
tenantProfile.setProfileData(profileData);
TenantProfile savedTenantProfile = doPost("/api/tenantProfile", tenantProfile, TenantProfile.class);
savedDifferentTenant.setTenantProfileId(savedTenantProfile.getId());
savedDifferentTenant = doPost("/api/tenant", savedDifferentTenant, Tenant.class);
loginDifferentTenant();
PageLink pageLink = new PageLink(1);
PageData<Queue> pageData = doGetTypedWithPageLink("/api/queues?serviceType=TB_RULE_ENGINE&",
new TypeReference<>() {}, pageLink);
Queue differentQueue = pageData.getData().get(0);
loginTenantAdmin();
DeviceProfile deviceProfile = this.createDeviceProfile("Device Profile");
deviceProfile.setDefaultQueueId(differentQueue.getId());
doPost("/api/deviceProfile", deviceProfile).andExpect(status().isBadRequest())
.andExpect(statusReason(containsString("Can't assign queue from different tenant!")));
}
@Test
public void testSaveDeviceProfileWithFirmwareFromDifferentTenant() throws Exception {
loginDifferentTenant();

View File

@ -163,7 +163,6 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
private Tenant savedTenant;
private TenantId tenantId;
private User tenantAdmin;
private QueueId defaultQueueId;
private DeviceProfile thermostatDeviceProfile;
@ -186,8 +185,6 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
tenantId = savedTenant.getId();
Assert.assertNotNull(savedTenant);
defaultQueueId = getRandomQueueId();
tenantAdmin = new User();
tenantAdmin.setAuthority(Authority.TENANT_ADMIN);
tenantAdmin.setTenantId(savedTenant.getId());
@ -378,7 +375,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
@Test
public void testDeviceProfiles() throws Exception {
// 1
DeviceProfile deviceProfile = this.createDeviceProfile("ONE_MORE_DEVICE_PROFILE", null, defaultQueueId);
DeviceProfile deviceProfile = this.createDeviceProfile("ONE_MORE_DEVICE_PROFILE", null);
extendDeviceProfileData(deviceProfile);
edgeImitator.expectMessageAmount(1);
deviceProfile = doPost("/api/deviceProfile", deviceProfile, DeviceProfile.class);
@ -389,8 +386,6 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
Assert.assertEquals(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, deviceProfileUpdateMsg.getMsgType());
Assert.assertEquals(deviceProfileUpdateMsg.getIdMSB(), deviceProfile.getUuidId().getMostSignificantBits());
Assert.assertEquals(deviceProfileUpdateMsg.getIdLSB(), deviceProfile.getUuidId().getLeastSignificantBits());
Assert.assertEquals(defaultQueueId.getId().getMostSignificantBits(), deviceProfileUpdateMsg.getDefaultQueueIdMSB());
Assert.assertEquals(defaultQueueId.getId().getLeastSignificantBits(), deviceProfileUpdateMsg.getDefaultQueueIdLSB());
// 2
edgeImitator.expectMessageAmount(1);

View File

@ -74,12 +74,11 @@ public class DeviceProfile extends SearchTextBased<DeviceProfileId> implements H
private RuleChainId defaultRuleChainId;
@ApiModelProperty(position = 6, value = "Reference to the dashboard. Used in the mobile application to open the default dashboard when user navigates to device details.")
private DashboardId defaultDashboardId;
@NoXss
@ApiModelProperty(position = 8, value = "Reference to the rule engine queue. " +
@ApiModelProperty(position = 8, value = "Rule engine queue name. " +
"If present, the specified queue will be used to store all unprocessed messages related to device, including telemetry, attribute updates, etc. " +
"Otherwise, the 'Main' queue will be used to store those messages.")
private QueueId defaultQueueId;
private String defaultQueueName;
@Valid
private transient DeviceProfileData profileData;
@ -113,7 +112,7 @@ public class DeviceProfile extends SearchTextBased<DeviceProfileId> implements H
this.isDefault = deviceProfile.isDefault();
this.defaultRuleChainId = deviceProfile.getDefaultRuleChainId();
this.defaultDashboardId = deviceProfile.getDefaultDashboardId();
this.defaultQueueId = deviceProfile.getDefaultQueueId();
this.defaultQueueName = deviceProfile.getDefaultQueueName();
this.setProfileData(deviceProfile.getProfileData());
this.provisionDeviceKey = deviceProfile.getProvisionDeviceKey();
this.firmwareId = deviceProfile.getFirmwareId();
@ -174,13 +173,4 @@ public class DeviceProfile extends SearchTextBased<DeviceProfileId> implements H
}
}
@JsonIgnore
public String getDefaultQueueName() {
return defaultQueueName;
}
@JsonProperty
public void setDefaultQueueName(String defaultQueueName) {
this.defaultQueueName = defaultQueueName;
}
}

View File

@ -216,8 +216,6 @@ message DeviceProfileUpdateMsg {
bytes profileDataBytes = 13;
optional string provisionDeviceKey = 14;
optional bytes image = 15;
int64 defaultQueueIdMSB = 16;
int64 defaultQueueIdLSB = 17;
}
message DeviceCredentialsUpdateMsg {

View File

@ -43,7 +43,7 @@ import java.util.UUID;
@Slf4j
public final class TbMsg implements Serializable {
private final QueueId queueId;
private final String queueName;
private final UUID id;
private final long ts;
private final String type;
@ -67,12 +67,12 @@ public final class TbMsg implements Serializable {
return ctx.getAndIncrementRuleNodeCounter();
}
public static TbMsg newMsg(QueueId queueId, String type, EntityId originator, TbMsgMetaData metaData, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
return newMsg(queueId, type, originator, null, metaData, data, ruleChainId, ruleNodeId);
public static TbMsg newMsg(String queueName, String type, EntityId originator, TbMsgMetaData metaData, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
return newMsg(queueName, type, originator, null, metaData, data, ruleChainId, ruleNodeId);
}
public static TbMsg newMsg(QueueId queueId, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
return new TbMsg(queueId, UUID.randomUUID(), System.currentTimeMillis(), type, originator, customerId,
public static TbMsg newMsg(String queueName, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
return new TbMsg(queueName, UUID.randomUUID(), System.currentTimeMillis(), type, originator, customerId,
metaData.copy(), TbMsgDataType.JSON, data, ruleChainId, ruleNodeId, null, TbMsgCallback.EMPTY);
}
@ -87,12 +87,12 @@ public final class TbMsg implements Serializable {
// REALLY NEW MSG
public static TbMsg newMsg(QueueId queueId, String type, EntityId originator, TbMsgMetaData metaData, String data) {
return newMsg(queueId, type, originator, null, metaData, data);
public static TbMsg newMsg(String queueName, String type, EntityId originator, TbMsgMetaData metaData, String data) {
return newMsg(queueName, type, originator, null, metaData, data);
}
public static TbMsg newMsg(QueueId queueId, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data) {
return new TbMsg(queueId, UUID.randomUUID(), System.currentTimeMillis(), type, originator, customerId,
public static TbMsg newMsg(String queueName, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data) {
return new TbMsg(queueName, UUID.randomUUID(), System.currentTimeMillis(), type, originator, customerId,
metaData.copy(), TbMsgDataType.JSON, data, null, null, null, TbMsgCallback.EMPTY);
}
@ -118,40 +118,40 @@ public final class TbMsg implements Serializable {
}
public static TbMsg transformMsg(TbMsg tbMsg, String type, EntityId originator, TbMsgMetaData metaData, String data) {
return new TbMsg(tbMsg.queueId, tbMsg.id, tbMsg.ts, type, originator, tbMsg.customerId, metaData.copy(), tbMsg.dataType,
return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, type, originator, tbMsg.customerId, metaData.copy(), tbMsg.dataType,
data, tbMsg.ruleChainId, tbMsg.ruleNodeId, tbMsg.ctx.copy(), tbMsg.callback);
}
public static TbMsg transformMsg(TbMsg tbMsg, CustomerId customerId) {
return new TbMsg(tbMsg.queueId, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, customerId, tbMsg.metaData, tbMsg.dataType,
return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, customerId, tbMsg.metaData, tbMsg.dataType,
tbMsg.data, tbMsg.ruleChainId, tbMsg.ruleNodeId, tbMsg.ctx.copy(), tbMsg.getCallback());
}
public static TbMsg transformMsg(TbMsg tbMsg, RuleChainId ruleChainId) {
return new TbMsg(tbMsg.queueId, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, tbMsg.customerId, tbMsg.metaData, tbMsg.dataType,
return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, tbMsg.customerId, tbMsg.metaData, tbMsg.dataType,
tbMsg.data, ruleChainId, null, tbMsg.ctx.copy(), tbMsg.getCallback());
}
public static TbMsg transformMsg(TbMsg tbMsg, QueueId queueId) {
return new TbMsg(queueId, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, tbMsg.customerId, tbMsg.metaData, tbMsg.dataType,
public static TbMsg transformMsg(TbMsg tbMsg, String queueName) {
return new TbMsg(queueName, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, tbMsg.customerId, tbMsg.metaData, tbMsg.dataType,
tbMsg.data, tbMsg.getRuleChainId(), null, tbMsg.ctx.copy(), tbMsg.getCallback());
}
public static TbMsg transformMsg(TbMsg tbMsg, RuleChainId ruleChainId, QueueId queueId) {
return new TbMsg(queueId, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, tbMsg.customerId, tbMsg.metaData, tbMsg.dataType,
public static TbMsg transformMsg(TbMsg tbMsg, RuleChainId ruleChainId, String queueName) {
return new TbMsg(queueName, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, tbMsg.customerId, tbMsg.metaData, tbMsg.dataType,
tbMsg.data, ruleChainId, null, tbMsg.ctx.copy(), tbMsg.getCallback());
}
//used for enqueueForTellNext
public static TbMsg newMsg(TbMsg tbMsg, QueueId queueId, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
return new TbMsg(queueId, UUID.randomUUID(), tbMsg.getTs(), tbMsg.getType(), tbMsg.getOriginator(), tbMsg.customerId, tbMsg.getMetaData().copy(),
public static TbMsg newMsg(TbMsg tbMsg, String queueName, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
return new TbMsg(queueName, UUID.randomUUID(), tbMsg.getTs(), tbMsg.getType(), tbMsg.getOriginator(), tbMsg.customerId, tbMsg.getMetaData().copy(),
tbMsg.getDataType(), tbMsg.getData(), ruleChainId, ruleNodeId, tbMsg.ctx.copy(), TbMsgCallback.EMPTY);
}
private TbMsg(QueueId queueId, UUID id, long ts, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, TbMsgDataType dataType, String data,
private TbMsg(String queueName, UUID id, long ts, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, TbMsgDataType dataType, String data,
RuleChainId ruleChainId, RuleNodeId ruleNodeId, TbMsgProcessingCtx ctx, TbMsgCallback callback) {
this.id = id;
this.queueId = queueId;
this.queueName = queueName;
if (ts > 0) {
this.ts = ts;
} else {
@ -220,7 +220,7 @@ public final class TbMsg implements Serializable {
return builder.build().toByteArray();
}
public static TbMsg fromBytes(QueueId queueId, byte[] data, TbMsgCallback callback) {
public static TbMsg fromBytes(String queueName, byte[] data, TbMsgCallback callback) {
try {
MsgProtos.TbMsgProto proto = MsgProtos.TbMsgProto.parseFrom(data);
TbMsgMetaData metaData = new TbMsgMetaData(proto.getMetaData().getDataMap());
@ -247,7 +247,7 @@ public final class TbMsg implements Serializable {
}
TbMsgDataType dataType = TbMsgDataType.values()[proto.getDataType()];
return new TbMsg(queueId, UUID.fromString(proto.getId()), proto.getTs(), proto.getType(), entityId, customerId,
return new TbMsg(queueName, UUID.fromString(proto.getId()), proto.getTs(), proto.getType(), entityId, customerId,
metaData, dataType, proto.getData(), ruleChainId, ruleNodeId, ctx, callback);
} catch (InvalidProtocolBufferException e) {
throw new IllegalStateException("Could not parse protobuf for TbMsg", e);
@ -259,12 +259,12 @@ public final class TbMsg implements Serializable {
}
public TbMsg copyWithRuleChainId(RuleChainId ruleChainId, UUID msgId) {
return new TbMsg(this.queueId, msgId, this.ts, this.type, this.originator, this.customerId,
return new TbMsg(this.queueName, msgId, this.ts, this.type, this.originator, this.customerId,
this.metaData, this.dataType, this.data, ruleChainId, null, this.ctx, callback);
}
public TbMsg copyWithRuleNodeId(RuleChainId ruleChainId, RuleNodeId ruleNodeId, UUID msgId) {
return new TbMsg(this.queueId, msgId, this.ts, this.type, this.originator, this.customerId,
return new TbMsg(this.queueName, msgId, this.ts, this.type, this.originator, this.customerId,
this.metaData, this.dataType, this.data, ruleChainId, ruleNodeId, this.ctx, callback);
}

View File

@ -68,8 +68,6 @@ public class HashPartitionService implements PartitionService {
private final TenantRoutingInfoService tenantRoutingInfoService;
private final QueueRoutingInfoService queueRoutingInfoService;
private final ConcurrentMap<QueueId, QueueRoutingInfo> queuesById = new ConcurrentHashMap<>();
private ConcurrentMap<QueueKey, List<Integer>> myPartitions = new ConcurrentHashMap<>();
private final ConcurrentMap<QueueKey, String> partitionTopicsMap = new ConcurrentHashMap<>();
@ -121,7 +119,6 @@ public class HashPartitionService implements PartitionService {
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queue);
partitionTopicsMap.put(queueKey, queue.getQueueTopic());
partitionSizesMap.put(queueKey, queue.getPartitions());
queuesById.put(queue.getQueueId(), queue);
});
}
@ -165,8 +162,6 @@ public class HashPartitionService implements PartitionService {
public void updateQueue(TransportProtos.QueueUpdateMsg queueUpdateMsg) {
TenantId tenantId = new TenantId(new UUID(queueUpdateMsg.getTenantIdMSB(), queueUpdateMsg.getTenantIdLSB()));
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueUpdateMsg.getQueueName(), tenantId);
QueueRoutingInfo queue = new QueueRoutingInfo(queueUpdateMsg);
queuesById.put(queue.getQueueId(), queue);
partitionTopicsMap.put(queueKey, queueUpdateMsg.getQueueTopic());
partitionSizesMap.put(queueKey, queueUpdateMsg.getPartitions());
myPartitions.remove(queueKey);
@ -176,7 +171,6 @@ public class HashPartitionService implements PartitionService {
public void removeQueue(TransportProtos.QueueDeleteMsg queueDeleteMsg) {
TenantId tenantId = new TenantId(new UUID(queueDeleteMsg.getTenantIdMSB(), queueDeleteMsg.getTenantIdLSB()));
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueDeleteMsg.getQueueName(), tenantId);
queuesById.remove(new QueueId(new UUID(queueDeleteMsg.getQueueIdMSB(), queueDeleteMsg.getQueueIdLSB())));
myPartitions.remove(queueKey);
partitionTopicsMap.remove(queueKey);
partitionSizesMap.remove(queueKey);
@ -185,9 +179,7 @@ public class HashPartitionService implements PartitionService {
}
@Override
@Deprecated
public TopicPartitionInfo resolve(ServiceType serviceType, TenantId tenantId, EntityId entityId, String queueName) {
log.warn("This method is deprecated and will be removed!!!");
public TopicPartitionInfo resolve(ServiceType serviceType, String queueName, TenantId tenantId, EntityId entityId) {
TenantId isolatedOrSystemTenantId = getIsolatedOrSystemTenantId(serviceType, tenantId);
QueueKey queueKey = new QueueKey(serviceType, queueName, isolatedOrSystemTenantId);
if (!partitionSizesMap.containsKey(queueKey)) {
@ -201,32 +193,6 @@ public class HashPartitionService implements PartitionService {
return resolve(serviceType, null, tenantId, entityId);
}
@Override
public TopicPartitionInfo resolve(ServiceType serviceType, QueueId queueId, TenantId tenantId, EntityId entityId) {
QueueKey queueKey;
if (queueId == null) {
queueKey = getMainQueueKey(serviceType, tenantId);
} else {
QueueRoutingInfo queueRoutingInfo = queuesById.get(queueId);
if (queueRoutingInfo == null) {
log.debug("Queue was removed but still used in CheckPoint rule node. [{}][{}]", tenantId, entityId);
queueKey = getMainQueueKey(serviceType, tenantId);
} else if (!queueRoutingInfo.getTenantId().equals(getIsolatedOrSystemTenantId(serviceType, tenantId))) {
log.debug("Tenant profile was changed but CheckPoint rule node still uses the queue from system level. [{}][{}]", tenantId, entityId);
queueKey = getMainQueueKey(serviceType, tenantId);
} else {
queueKey = new QueueKey(serviceType, queueRoutingInfo);
}
}
return resolve(queueKey, entityId);
}
private QueueKey getMainQueueKey(ServiceType serviceType, TenantId tenantId) {
return new QueueKey(serviceType, getIsolatedOrSystemTenantId(serviceType, tenantId));
}
private TopicPartitionInfo resolve(QueueKey queueKey, EntityId entityId) {
int hash = hashFunction.newHasher()
.putLong(entityId.getId().getMostSignificantBits())

View File

@ -32,13 +32,10 @@ import java.util.UUID;
*/
public interface PartitionService {
@Deprecated
TopicPartitionInfo resolve(ServiceType serviceType, TenantId tenantId, EntityId entityId, String queueName);
TopicPartitionInfo resolve(ServiceType serviceType, String queueName, TenantId tenantId, EntityId entityId);
TopicPartitionInfo resolve(ServiceType serviceType, TenantId tenantId, EntityId entityId);
TopicPartitionInfo resolve(ServiceType serviceType, QueueId queueId, TenantId tenantId, EntityId entityId);
/**
* Received from the Discovery service when network topology is changed.
* @param currentService - current service information {@link org.thingsboard.server.gen.transport.TransportProtos.ServiceInfo}

View File

@ -1088,7 +1088,7 @@ public class DefaultTransportService implements TransportService {
}
private void sendToRuleEngine(TenantId tenantId, TbMsg tbMsg, TbQueueCallback callback) {
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueId(), tenantId, tbMsg.getOriginator());
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueName(), tenantId, tbMsg.getOriginator());
if (log.isTraceEnabled()) {
log.trace("[{}][{}] Pushing to topic {} message {}", tenantId, tbMsg.getOriginator(), tpi.getFullTopicName(), tbMsg);
}
@ -1105,18 +1105,18 @@ public class DefaultTransportService implements TransportService {
DeviceProfileId deviceProfileId = new DeviceProfileId(new UUID(sessionInfo.getDeviceProfileIdMSB(), sessionInfo.getDeviceProfileIdLSB()));
DeviceProfile deviceProfile = deviceProfileCache.get(deviceProfileId);
RuleChainId ruleChainId;
QueueId queueId;
String queueName;
if (deviceProfile == null) {
log.warn("[{}] Device profile is null!", deviceProfileId);
ruleChainId = null;
queueId = null;
queueName = null;
} else {
ruleChainId = deviceProfile.getDefaultRuleChainId();
queueId = deviceProfile.getDefaultQueueId();
queueName = deviceProfile.getDefaultQueueName();
}
TbMsg tbMsg = TbMsg.newMsg(queueId, sessionMsgType.name(), deviceId, customerId, metaData, gson.toJson(json), ruleChainId, null);
TbMsg tbMsg = TbMsg.newMsg(queueName, sessionMsgType.name(), deviceId, customerId, metaData, gson.toJson(json), ruleChainId, null);
sendToRuleEngine(tenantId, tbMsg, callback);
}

View File

@ -120,12 +120,6 @@ public class DeviceProfileServiceImpl extends AbstractCachedEntityService<Device
DeviceProfile oldDeviceProfile = deviceProfileValidator.validate(deviceProfile, DeviceProfile::getTenantId);
DeviceProfile savedDeviceProfile;
try {
if (deviceProfile.getDefaultQueueId() == null && StringUtils.isNotEmpty(deviceProfile.getDefaultQueueName())) {
Queue existing = queueService.findQueueByTenantIdAndName(deviceProfile.getTenantId(), deviceProfile.getDefaultQueueName());
if (existing != null) {
deviceProfile.setDefaultQueueId(existing.getId());
}
}
savedDeviceProfile = deviceProfileDao.saveAndFlush(deviceProfile.getTenantId(), deviceProfile);
publishEvictEvent(new DeviceProfileEvictEvent(savedDeviceProfile.getTenantId(), savedDeviceProfile.getName(),
oldDeviceProfile != null ? oldDeviceProfile.getName() : null, savedDeviceProfile.getId(), savedDeviceProfile.isDefault()));

View File

@ -181,6 +181,7 @@ public class ModelConstants {
public static final String DEVICE_PROFILE_DEFAULT_RULE_CHAIN_ID_PROPERTY = "default_rule_chain_id";
public static final String DEVICE_PROFILE_DEFAULT_DASHBOARD_ID_PROPERTY = "default_dashboard_id";
public static final String DEVICE_PROFILE_DEFAULT_QUEUE_ID_PROPERTY = "default_queue_id";
public static final String DEVICE_PROFILE_DEFAULT_QUEUE_NAME_PROPERTY = "default_queue_name";
public static final String DEVICE_PROFILE_PROVISION_DEVICE_KEY = "provision_device_key";
public static final String DEVICE_PROFILE_FIRMWARE_ID_PROPERTY = "firmware_id";
public static final String DEVICE_PROFILE_SOFTWARE_ID_PROPERTY = "software_id";

View File

@ -88,8 +88,8 @@ public final class DeviceProfileEntity extends BaseSqlEntity<DeviceProfile> impl
@Column(name = ModelConstants.DEVICE_PROFILE_DEFAULT_DASHBOARD_ID_PROPERTY)
private UUID defaultDashboardId;
@Column(name = ModelConstants.DEVICE_PROFILE_DEFAULT_QUEUE_ID_PROPERTY)
private UUID defaultQueueId;
@Column(name = ModelConstants.DEVICE_PROFILE_DEFAULT_QUEUE_NAME_PROPERTY)
private String defaultQueueName;
@Type(type = "jsonb")
@Column(name = ModelConstants.DEVICE_PROFILE_PROFILE_DATA_PROPERTY, columnDefinition = "jsonb")
@ -133,9 +133,7 @@ public final class DeviceProfileEntity extends BaseSqlEntity<DeviceProfile> impl
if (deviceProfile.getDefaultDashboardId() != null) {
this.defaultDashboardId = deviceProfile.getDefaultDashboardId().getId();
}
if (deviceProfile.getDefaultQueueId() != null) {
this.defaultQueueId = deviceProfile.getDefaultQueueId().getId();
}
this.defaultQueueName = deviceProfile.getDefaultQueueName();
this.provisionDeviceKey = deviceProfile.getProvisionDeviceKey();
if (deviceProfile.getFirmwareId() != null) {
this.firmwareId = deviceProfile.getFirmwareId().getId();
@ -176,6 +174,7 @@ public final class DeviceProfileEntity extends BaseSqlEntity<DeviceProfile> impl
deviceProfile.setProvisionType(provisionType);
deviceProfile.setDescription(description);
deviceProfile.setDefault(isDefault);
deviceProfile.setDefaultQueueName(defaultQueueName);
deviceProfile.setProfileData(JacksonUtil.convertValue(profileData, DeviceProfileData.class));
if (defaultRuleChainId != null) {
deviceProfile.setDefaultRuleChainId(new RuleChainId(defaultRuleChainId));
@ -183,15 +182,11 @@ public final class DeviceProfileEntity extends BaseSqlEntity<DeviceProfile> impl
if (defaultDashboardId != null) {
deviceProfile.setDefaultDashboardId(new DashboardId(defaultDashboardId));
}
if (defaultQueueId != null) {
deviceProfile.setDefaultQueueId(new QueueId(defaultQueueId));
}
deviceProfile.setProvisionDeviceKey(provisionDeviceKey);
if (firmwareId != null) {
deviceProfile.setFirmwareId(new OtaPackageId(firmwareId));
}
if (softwareId != null) {
deviceProfile.setSoftwareId(new OtaPackageId(softwareId));
}

View File

@ -126,16 +126,11 @@ public class DeviceProfileDataValidator extends AbstractHasOtaPackageValidator<D
throw new DataValidationException("Another default device profile is present in scope of current tenant!");
}
}
if (deviceProfile.getDefaultQueueId() != null) {
Queue queue = queueService.findQueueById(tenantId, deviceProfile.getDefaultQueueId());
if (deviceProfile.getDefaultQueueName() != null) {
Queue queue = queueService.findQueueByTenantIdAndName(tenantId, deviceProfile.getDefaultQueueName());
if (queue == null) {
throw new DataValidationException("Device profile is referencing to non-existent queue!");
}
TenantProfile tenantProfile = tenantProfileCache.get(deviceProfile.getTenantId());
if ((tenantProfile.isIsolatedTbRuleEngine() && !queue.getTenantId().equals(deviceProfile.getTenantId()))
|| (!tenantProfile.isIsolatedTbRuleEngine() && !queue.getTenantId().isNullUid())) {
throw new DataValidationException("Can't assign queue from different tenant!");
}
}
if (deviceProfile.getProvisionType() == null) {
deviceProfile.setProvisionType(DeviceProfileProvisionType.DISABLED);

View File

@ -70,4 +70,6 @@ CREATE INDEX IF NOT EXISTS idx_widgets_bundle_external_id ON widgets_bundle(tena
CREATE INDEX IF NOT EXISTS idx_rule_node_external_id ON rule_node(rule_chain_id, external_id);
CREATE INDEX IF NOT EXISTS idx_rule_node_type ON rule_node(type);
CREATE INDEX IF NOT EXISTS idx_rule_node_type ON rule_node(type);
CREATE INDEX IF NOT EXISTS idx_api_usage_state_entity_id ON api_usage_state(entity_id);

View File

@ -253,14 +253,13 @@ CREATE TABLE IF NOT EXISTS device_profile (
software_id uuid,
default_rule_chain_id uuid,
default_dashboard_id uuid,
default_queue_id uuid,
default_queue_name varchar(255),
provision_device_key varchar,
external_id uuid,
CONSTRAINT device_profile_name_unq_key UNIQUE (tenant_id, name),
CONSTRAINT device_provision_key_unq_key UNIQUE (provision_device_key),
CONSTRAINT fk_default_rule_chain_device_profile FOREIGN KEY (default_rule_chain_id) REFERENCES rule_chain(id),
CONSTRAINT fk_default_dashboard_device_profile FOREIGN KEY (default_dashboard_id) REFERENCES dashboard(id),
CONSTRAINT fk_default_queue_device_profile FOREIGN KEY (default_queue_id) REFERENCES queue(id),
CONSTRAINT fk_firmware_device_profile FOREIGN KEY (firmware_id) REFERENCES ota_package(id),
CONSTRAINT fk_software_device_profile FOREIGN KEY (software_id) REFERENCES ota_package(id)
);

View File

@ -142,15 +142,12 @@ public interface TbContext {
*/
void output(TbMsg msg, String relationType);
@Deprecated
void enqueue(TbMsg tbMsg, String queueName, Runnable onSuccess, Consumer<Throwable> onFailure);
/**
* Puts new message to custom queue for processing
*
* @param msg - message
*/
void enqueue(TbMsg msg, QueueId queueId, Runnable onSuccess, Consumer<Throwable> onFailure);
void enqueue(TbMsg msg, String queueName, Runnable onSuccess, Consumer<Throwable> onFailure);
void enqueueForTellFailure(TbMsg msg, String failureMessage);
@ -162,15 +159,15 @@ public interface TbContext {
void enqueueForTellNext(TbMsg msg, Set<String> relationTypes, Runnable onSuccess, Consumer<Throwable> onFailure);
void enqueueForTellNext(TbMsg msg, QueueId queueId, String relationType, Runnable onSuccess, Consumer<Throwable> onFailure);
void enqueueForTellNext(TbMsg msg, String queueName, String relationType, Runnable onSuccess, Consumer<Throwable> onFailure);
void enqueueForTellNext(TbMsg msg, QueueId queueId, Set<String> relationTypes, Runnable onSuccess, Consumer<Throwable> onFailure);
void enqueueForTellNext(TbMsg msg, String queueName, Set<String> relationTypes, Runnable onSuccess, Consumer<Throwable> onFailure);
void ack(TbMsg tbMsg);
TbMsg newMsg(QueueId queueId, String type, EntityId originator, TbMsgMetaData metaData, String data);
TbMsg newMsg(String queueName, String type, EntityId originator, TbMsgMetaData metaData, String data);
TbMsg newMsg(QueueId queueId, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data);
TbMsg newMsg(String queueName, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data);
TbMsg transformMsg(TbMsg origMsg, String type, EntityId originator, TbMsgMetaData metaData, String data);

View File

@ -138,7 +138,7 @@ public class TbCopyAttributesToEntityViewNode implements TbNode {
}
private void transformAndTellNext(TbContext ctx, TbMsg msg, EntityView entityView) {
ctx.enqueueForTellNext(ctx.newMsg(msg.getQueueId(), msg.getType(), entityView.getId(), msg.getCustomerId(), msg.getMetaData(), msg.getData()), SUCCESS);
ctx.enqueueForTellNext(ctx.newMsg(msg.getQueueName(), msg.getType(), entityView.getId(), msg.getCustomerId(), msg.getMetaData(), msg.getData()), SUCCESS);
}
private boolean attributeContainsInEntityView(String scope, String attrKey, EntityView entityView) {

View File

@ -77,7 +77,7 @@ public class TbMsgCountNode implements TbNode {
TbMsgMetaData metaData = new TbMsgMetaData();
metaData.putValue("delta", Long.toString(System.currentTimeMillis() - lastScheduledTs + delay));
TbMsg tbMsg = TbMsg.newMsg(msg.getQueueId(), SessionMsgType.POST_TELEMETRY_REQUEST.name(), ctx.getTenantId(), msg.getCustomerId(), metaData, gson.toJson(telemetryJson));
TbMsg tbMsg = TbMsg.newMsg(msg.getQueueName(), SessionMsgType.POST_TELEMETRY_REQUEST.name(), ctx.getTenantId(), msg.getCustomerId(), metaData, gson.toJson(telemetryJson));
ctx.enqueueForTellNext(tbMsg, SUCCESS);
scheduleTickMsg(ctx, tbMsg);
} else {

View File

@ -41,17 +41,17 @@ import java.util.UUID;
)
public class TbCheckpointNode implements TbNode {
private QueueId queueId;
private String queueName;
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
TbCheckpointNodeConfiguration config = TbNodeUtils.convert(configuration, TbCheckpointNodeConfiguration.class);
this.queueId = new QueueId(UUID.fromString(config.getQueueId()));
this.queueName = config.getQueueName();
}
@Override
public void onMsg(TbContext ctx, TbMsg msg) {
ctx.enqueueForTellNext(msg, queueId, TbRelationTypes.SUCCESS, () -> ctx.ack(msg), error -> ctx.tellFailure(msg, error));
ctx.enqueueForTellNext(msg, queueName, TbRelationTypes.SUCCESS, () -> ctx.ack(msg), error -> ctx.tellFailure(msg, error));
}
@Override

View File

@ -22,7 +22,7 @@ import org.thingsboard.server.common.data.id.QueueId;
@Data
public class TbCheckpointNodeConfiguration implements NodeConfiguration<TbCheckpointNodeConfiguration> {
private String queueId;
private String queueName;
@Override
public TbCheckpointNodeConfiguration defaultConfiguration() {

View File

@ -60,7 +60,7 @@ class AlarmState {
private volatile Alarm currentAlarm;
private volatile boolean initialFetchDone;
private volatile TbMsgMetaData lastMsgMetaData;
private volatile QueueId lastMsgQueueId;
private volatile String lastMsgQueueName;
private volatile DataSnapshot dataSnapshot;
private final DynamicPredicateValueCtx dynamicPredicateValueCtx;
@ -74,7 +74,7 @@ class AlarmState {
public boolean process(TbContext ctx, TbMsg msg, DataSnapshot data, SnapshotUpdate update) throws ExecutionException, InterruptedException {
initCurrentAlarm(ctx);
lastMsgMetaData = msg.getMetaData();
lastMsgQueueId = msg.getQueueId();
lastMsgQueueName = msg.getQueueName();
this.dataSnapshot = data;
try {
return createOrClearAlarms(ctx, msg, data, update, AlarmRuleState::eval);
@ -195,7 +195,7 @@ class AlarmState {
metaData.putValue(DataConstants.IS_CLEARED_ALARM, Boolean.TRUE.toString());
}
setAlarmConditionMetadata(ruleState, metaData);
TbMsg newMsg = ctx.newMsg(lastMsgQueueId != null ? lastMsgQueueId : null, "ALARM",
TbMsg newMsg = ctx.newMsg(lastMsgQueueName != null ? lastMsgQueueName : null, "ALARM",
originator, msg != null ? msg.getCustomerId() : null, metaData, data);
ctx.enqueueForTellNext(newMsg, relationType);
}

View File

@ -116,10 +116,10 @@ public class TbSendRPCRequestNode implements TbNode {
ctx.getRpcService().sendRpcRequestToDevice(request, ruleEngineDeviceRpcResponse -> {
if (ruleEngineDeviceRpcResponse.getError().isEmpty()) {
TbMsg next = ctx.newMsg(msg.getQueueId(), msg.getType(), msg.getOriginator(), msg.getCustomerId(), msg.getMetaData(), ruleEngineDeviceRpcResponse.getResponse().orElse("{}"));
TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getCustomerId(), msg.getMetaData(), ruleEngineDeviceRpcResponse.getResponse().orElse("{}"));
ctx.enqueueForTellNext(next, TbRelationTypes.SUCCESS);
} else {
TbMsg next = ctx.newMsg(msg.getQueueId(), msg.getType(), msg.getOriginator(), msg.getCustomerId(), msg.getMetaData(), wrap("error", ruleEngineDeviceRpcResponse.getError().get().name()));
TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getCustomerId(), msg.getMetaData(), wrap("error", ruleEngineDeviceRpcResponse.getError().get().name()));
ctx.enqueueForTellFailure(next, ruleEngineDeviceRpcResponse.getError().get().name());
}
});

View File

@ -35,6 +35,10 @@ export class QueueService {
return this.http.get<QueueInfo>(`/api/queues/${queueId}`, defaultHttpOptionsFromConfig(config));
}
public getQueueByName(queueName: string, config?: RequestConfig): Observable<QueueInfo> {
return this.http.get<QueueInfo>(`/api/queues/name/${queueName}`, defaultHttpOptionsFromConfig(config));
}
public getTenantQueuesByServiceType(pageLink: PageLink,
serviceType: ServiceType,
config?: RequestConfig): Observable<PageData<QueueInfo>> {

View File

@ -56,7 +56,7 @@
</tb-dashboard-autocomplete>
<tb-queue-autocomplete
[queueType]="serviceType"
formControlName="defaultQueueId">
formControlName="defaultQueueName">
</tb-queue-autocomplete>
<mat-form-field fxHide class="mat-block">
<mat-label translate>device-profile.type</mat-label>

View File

@ -50,7 +50,6 @@ import { StepperSelectionEvent } from '@angular/cdk/stepper';
import { deepTrim } from '@core/utils';
import { ServiceType } from '@shared/models/queue.models';
import { DashboardId } from '@shared/models/id/dashboard-id';
import { QueueId } from '@shared/models/id/queue-id';
export interface AddDeviceProfileDialogData {
deviceProfileName: string;
@ -111,7 +110,7 @@ export class AddDeviceProfileDialogComponent extends
image: [null, []],
defaultRuleChainId: [null, []],
defaultDashboardId: [null, []],
defaultQueueId: [null, []],
defaultQueueName: [null, []],
description: ['', []]
}
);
@ -188,6 +187,7 @@ export class AddDeviceProfileDialogComponent extends
name: this.deviceProfileDetailsFormGroup.get('name').value,
type: this.deviceProfileDetailsFormGroup.get('type').value,
image: this.deviceProfileDetailsFormGroup.get('image').value,
defaultQueueName: this.deviceProfileDetailsFormGroup.get('defaultQueueName').value,
transportType: this.transportConfigFormGroup.get('transportType').value,
provisionType: deviceProvisionConfiguration.type,
provisionDeviceKey,
@ -205,9 +205,6 @@ export class AddDeviceProfileDialogComponent extends
if (this.deviceProfileDetailsFormGroup.get('defaultDashboardId').value) {
deviceProfile.defaultDashboardId = new DashboardId(this.deviceProfileDetailsFormGroup.get('defaultDashboardId').value);
}
if (this.deviceProfileDetailsFormGroup.get('defaultQueueId').value) {
deviceProfile.defaultQueueId = new QueueId(this.deviceProfileDetailsFormGroup.get('defaultQueueId').value);
}
this.deviceProfileService.saveDeviceProfile(deepTrim(deviceProfile)).subscribe(
(savedDeviceProfile) => {
this.dialogRef.close(savedDeviceProfile);

View File

@ -75,7 +75,7 @@
</tb-dashboard-autocomplete>
<tb-queue-autocomplete
[queueType]="serviceType"
formControlName="defaultQueueId">
formControlName="defaultQueueName">
</tb-queue-autocomplete>
<tb-ota-package-autocomplete
[useFullEntityId]="true"

View File

@ -42,7 +42,6 @@ import { ServiceType } from '@shared/models/queue.models';
import { EntityId } from '@shared/models/id/entity-id';
import { OtaUpdateType } from '@shared/models/ota-package.models';
import { DashboardId } from '@shared/models/id/dashboard-id';
import { QueueId } from '@shared/models/id/queue-id';
@Component({
selector: 'tb-device-profile',
@ -118,7 +117,7 @@ export class DeviceProfileComponent extends EntityComponent<DeviceProfile> {
}),
defaultRuleChainId: [entity && entity.defaultRuleChainId ? entity.defaultRuleChainId.id : null, []],
defaultDashboardId: [entity && entity.defaultDashboardId ? entity.defaultDashboardId.id : null, []],
defaultQueueId: [entity && entity.defaultQueueId ? entity.defaultQueueId.id : null, []],
defaultQueueName: [entity ? entity.defaultQueueName : null, []],
firmwareId: [entity ? entity.firmwareId : null],
softwareId: [entity ? entity.softwareId : null],
description: [entity ? entity.description : '', []],
@ -198,7 +197,7 @@ export class DeviceProfileComponent extends EntityComponent<DeviceProfile> {
}}, {emitEvent: false});
this.entityForm.patchValue({defaultRuleChainId: entity.defaultRuleChainId ? entity.defaultRuleChainId.id : null}, {emitEvent: false});
this.entityForm.patchValue({defaultDashboardId: entity.defaultDashboardId ? entity.defaultDashboardId.id : null}, {emitEvent: false});
this.entityForm.patchValue({defaultQueueId: entity.defaultQueueId ? entity.defaultQueueId.id : null}, {emitEvent: false});
this.entityForm.patchValue({defaultQueueName: entity.defaultQueueName}, {emitEvent: false});
this.entityForm.patchValue({firmwareId: entity.firmwareId}, {emitEvent: false});
this.entityForm.patchValue({softwareId: entity.softwareId}, {emitEvent: false});
this.entityForm.patchValue({description: entity.description}, {emitEvent: false});
@ -211,9 +210,6 @@ export class DeviceProfileComponent extends EntityComponent<DeviceProfile> {
if (formValue.defaultDashboardId) {
formValue.defaultDashboardId = new DashboardId(formValue.defaultDashboardId);
}
if (formValue.defaultQueueId) {
formValue.defaultQueueId = new QueueId(formValue.defaultQueueId);
}
const deviceProvisionConfiguration: DeviceProvisionConfiguration = formValue.profileData.provisionConfiguration;
formValue.provisionType = deviceProvisionConfiguration.type;
formValue.provisionDeviceKey = deviceProvisionConfiguration.provisionDeviceKey;

View File

@ -95,7 +95,7 @@
<tb-queue-autocomplete
[ngClass]="{invisible: deviceWizardFormGroup.get('addProfileType').value !== 1}"
[queueType]="serviceType"
formControlName="defaultQueueId">
formControlName="defaultQueueName">
</tb-queue-autocomplete>
</div>
</div>

View File

@ -49,7 +49,6 @@ import { MediaBreakpoints } from '@shared/models/constants';
import { RuleChainId } from '@shared/models/id/rule-chain-id';
import { ServiceType } from '@shared/models/queue.models';
import { deepTrim } from '@core/utils';
import { QueueId } from '@shared/models/id/queue-id';
@Component({
selector: 'tb-device-wizard',
@ -114,7 +113,7 @@ export class DeviceWizardDialogComponent extends
deviceProfileId: [null, Validators.required],
newDeviceProfileTitle: [{value: null, disabled: true}],
defaultRuleChainId: [{value: null, disabled: true}],
defaultQueueId: [{value: null, disabled: true}],
defaultQueueName: [{value: null, disabled: true}],
description: ['']
}
);
@ -127,7 +126,7 @@ export class DeviceWizardDialogComponent extends
this.deviceWizardFormGroup.get('newDeviceProfileTitle').setValidators(null);
this.deviceWizardFormGroup.get('newDeviceProfileTitle').disable();
this.deviceWizardFormGroup.get('defaultRuleChainId').disable();
this.deviceWizardFormGroup.get('defaultQueueId').disable();
this.deviceWizardFormGroup.get('defaultQueueName').disable();
this.deviceWizardFormGroup.updateValueAndValidity();
this.createProfile = false;
} else {
@ -136,7 +135,7 @@ export class DeviceWizardDialogComponent extends
this.deviceWizardFormGroup.get('newDeviceProfileTitle').setValidators([Validators.required]);
this.deviceWizardFormGroup.get('newDeviceProfileTitle').enable();
this.deviceWizardFormGroup.get('defaultRuleChainId').enable();
this.deviceWizardFormGroup.get('defaultQueueId').enable();
this.deviceWizardFormGroup.get('defaultQueueName').enable();
this.deviceWizardFormGroup.updateValueAndValidity();
this.createProfile = true;
@ -298,6 +297,7 @@ export class DeviceWizardDialogComponent extends
const deviceProfile: DeviceProfile = {
name: this.deviceWizardFormGroup.get('newDeviceProfileTitle').value,
type: DeviceProfileType.DEFAULT,
defaultQueueName: this.deviceWizardFormGroup.get('defaultQueueName').value,
transportType: this.transportConfigFormGroup.get('transportType').value,
provisionType: deviceProvisionConfiguration.type,
provisionDeviceKey,
@ -311,9 +311,6 @@ export class DeviceWizardDialogComponent extends
if (this.deviceWizardFormGroup.get('defaultRuleChainId').value) {
deviceProfile.defaultRuleChainId = new RuleChainId(this.deviceWizardFormGroup.get('defaultRuleChainId').value);
}
if (this.deviceWizardFormGroup.get('defaultQueueId').value) {
deviceProfile.defaultQueueId = new QueueId(this.deviceWizardFormGroup.get('defaultQueueId').value);
}
return this.deviceProfileService.saveDeviceProfile(deepTrim(deviceProfile)).pipe(
tap((profile) => {
this.currentDeviceProfileTransportType = profile.transportType;

View File

@ -18,11 +18,11 @@
<mat-form-field [formGroup]="selectQueueFormGroup" class="mat-block autocomplete-queue">
<input matInput type="text" placeholder="{{ 'queue.queue-name' | translate }}"
#queueInput
formControlName="queueId"
formControlName="queueName"
(focusin)="onFocus()"
[required]="required"
[matAutocomplete]="queueAutocomplete">
<button *ngIf="selectQueueFormGroup.get('queueId').value && !disabled"
<button *ngIf="selectQueueFormGroup.get('queueName').value && !disabled"
type="button"
matSuffix mat-button mat-icon-button aria-label="Clear"
(click)="clear()">
@ -50,7 +50,7 @@
</div>
</mat-option>
</mat-autocomplete>
<mat-error *ngIf="selectQueueFormGroup.get('queueId').hasError('required')">
<mat-error *ngIf="selectQueueFormGroup.get('queueName').hasError('required')">
{{ 'queue.queue-required' | translate }}
</mat-error>
</mat-form-field>

View File

@ -23,7 +23,6 @@ import { AppState } from '@core/core.state';
import { TranslateService } from '@ngx-translate/core';
import { coerceBooleanProperty } from '@angular/cdk/coercion';
import { EntityId } from '@shared/models/id/entity-id';
import { EntityType } from '@shared/models/entity-type.models';
import { BaseData } from '@shared/models/base-data';
import { EntityService } from '@core/http/entity.service';
import { TruncatePipe } from '@shared/pipe/truncate.pipe';
@ -87,7 +86,7 @@ export class QueueAutocompleteComponent implements ControlValueAccessor, OnInit
private queueService: QueueService,
private fb: FormBuilder) {
this.selectQueueFormGroup = this.fb.group({
queueId: [null]
queueName: [null]
});
}
@ -99,7 +98,7 @@ export class QueueAutocompleteComponent implements ControlValueAccessor, OnInit
}
ngOnInit() {
this.filteredQueues = this.selectQueueFormGroup.get('queueId').valueChanges
this.filteredQueues = this.selectQueueFormGroup.get('queueName').valueChanges
.pipe(
debounceTime(150),
tap(value => {
@ -107,7 +106,7 @@ export class QueueAutocompleteComponent implements ControlValueAccessor, OnInit
if (typeof value === 'string' || !value) {
modelValue = null;
} else {
modelValue = value.id.id;
modelValue = value.name;
}
this.updateView(modelValue);
if (value === null) {
@ -123,15 +122,6 @@ export class QueueAutocompleteComponent implements ControlValueAccessor, OnInit
ngAfterViewInit(): void {}
getCurrentEntity(): BaseData<EntityId> | null {
const currentRuleChain = this.selectQueueFormGroup.get('queueId').value;
if (currentRuleChain && typeof currentRuleChain !== 'string') {
return currentRuleChain as BaseData<EntityId>;
} else {
return null;
}
}
setDisabledState(isDisabled: boolean): void {
this.disabled = isDisabled;
if (this.disabled) {
@ -148,15 +138,14 @@ export class QueueAutocompleteComponent implements ControlValueAccessor, OnInit
writeValue(value: string | null): void {
this.searchText = '';
if (value != null) {
const targetEntityType = EntityType.QUEUE;
this.entityService.getEntity(targetEntityType, value, {ignoreLoading: true, ignoreErrors: true}).subscribe(
this.queueService.getQueueByName(value, {ignoreLoading: true, ignoreErrors: true}).subscribe(
(entity) => {
this.modelValue = entity.id.id;
this.selectQueueFormGroup.get('queueId').patchValue(entity, {emitEvent: false});
this.modelValue = entity.name;
this.selectQueueFormGroup.get('queueName').patchValue(entity, {emitEvent: false});
},
() => {
this.modelValue = null;
this.selectQueueFormGroup.get('queueId').patchValue('', {emitEvent: false});
this.selectQueueFormGroup.get('queueName').patchValue('', {emitEvent: false});
if (value !== null) {
this.propagateChange(this.modelValue);
}
@ -164,20 +153,20 @@ export class QueueAutocompleteComponent implements ControlValueAccessor, OnInit
);
} else {
this.modelValue = null;
this.selectQueueFormGroup.get('queueId').patchValue('', {emitEvent: false});
this.selectQueueFormGroup.get('queueName').patchValue('', {emitEvent: false});
}
this.dirty = true;
}
onFocus() {
if (this.dirty) {
this.selectQueueFormGroup.get('queueId').updateValueAndValidity({onlySelf: true, emitEvent: true});
this.selectQueueFormGroup.get('queueName').updateValueAndValidity({onlySelf: true, emitEvent: true});
this.dirty = false;
}
}
reset() {
this.selectQueueFormGroup.get('queueId').patchValue('', {emitEvent: false});
this.selectQueueFormGroup.get('queueName').patchValue('', {emitEvent: false});
}
updateView(value: string | null) {
@ -214,7 +203,7 @@ export class QueueAutocompleteComponent implements ControlValueAccessor, OnInit
}
clear() {
this.selectQueueFormGroup.get('queueId').patchValue('', {emitEvent: true});
this.selectQueueFormGroup.get('queueName').patchValue('', {emitEvent: true});
setTimeout(() => {
this.queueInput.nativeElement.blur();
this.queueInput.nativeElement.focus();

View File

@ -29,7 +29,6 @@ import * as _moment from 'moment';
import { AbstractControl, ValidationErrors } from '@angular/forms';
import { OtaPackageId } from '@shared/models/id/ota-package-id';
import { DashboardId } from '@shared/models/id/dashboard-id';
import { QueueId } from '@shared/models/id/queue-id';
import { DataType } from '@shared/models/constants';
import {
getDefaultProfileClientLwM2mSettingsConfig,
@ -576,7 +575,7 @@ export interface DeviceProfile extends BaseData<DeviceProfileId>, ExportableEnti
provisionDeviceKey?: string;
defaultRuleChainId?: RuleChainId;
defaultDashboardId?: DashboardId;
defaultQueueId?: QueueId;
defaultQueueName?: string;
firmwareId?: OtaPackageId;
softwareId?: OtaPackageId;
profileData: DeviceProfileData;