Merge remote-tracking branch 'upstream/bug/queue-id-to-name' into bug/queue-id-to-name
This commit is contained in:
commit
a411d9c8a3
@ -1,49 +0,0 @@
|
||||
--
|
||||
-- Copyright © 2016-2022 The Thingsboard Authors
|
||||
--
|
||||
-- Licensed under the Apache License, Version 2.0 (the "License");
|
||||
-- you may not use this file except in compliance with the License.
|
||||
-- You may obtain a copy of the License at
|
||||
--
|
||||
-- http://www.apache.org/licenses/LICENSE-2.0
|
||||
--
|
||||
-- Unless required by applicable law or agreed to in writing, software
|
||||
-- distributed under the License is distributed on an "AS IS" BASIS,
|
||||
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
-- See the License for the specific language governing permissions and
|
||||
-- limitations under the License.
|
||||
--
|
||||
|
||||
ALTER TABLE device_profile
|
||||
ADD COLUMN IF NOT EXISTS default_queue_id uuid;
|
||||
|
||||
DO
|
||||
$$
|
||||
BEGIN
|
||||
IF EXISTS
|
||||
(SELECT column_name
|
||||
FROM information_schema.columns
|
||||
WHERE table_name = 'device_profile'
|
||||
AND column_name = 'default_queue_name'
|
||||
)
|
||||
THEN
|
||||
UPDATE device_profile
|
||||
SET default_queue_id = q.id
|
||||
FROM queue as q
|
||||
WHERE default_queue_name = q.name;
|
||||
END IF;
|
||||
END
|
||||
$$;
|
||||
|
||||
DO
|
||||
$$
|
||||
BEGIN
|
||||
IF NOT EXISTS(SELECT 1 FROM pg_constraint WHERE conname = 'fk_default_queue_device_profile') THEN
|
||||
ALTER TABLE device_profile
|
||||
ADD CONSTRAINT fk_default_queue_device_profile FOREIGN KEY (default_queue_id) REFERENCES queue (id);
|
||||
END IF;
|
||||
END;
|
||||
$$;
|
||||
|
||||
ALTER TABLE device_profile
|
||||
DROP COLUMN IF EXISTS default_queue_name;
|
||||
@ -508,13 +508,8 @@ public class ActorSystemContext {
|
||||
return partitionService.resolve(serviceType, tenantId, entityId);
|
||||
}
|
||||
|
||||
public TopicPartitionInfo resolve(ServiceType serviceType, QueueId queueId, TenantId tenantId, EntityId entityId) {
|
||||
return partitionService.resolve(serviceType, queueId, tenantId, entityId);
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public TopicPartitionInfo resolve(ServiceType serviceType, String queueName, TenantId tenantId, EntityId entityId) {
|
||||
return partitionService.resolve(serviceType, tenantId, entityId, queueName);
|
||||
return partitionService.resolve(serviceType, queueName, tenantId, entityId);
|
||||
}
|
||||
|
||||
public String getServiceId() {
|
||||
|
||||
@ -162,18 +162,11 @@ class DefaultTbContext implements TbContext {
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public void enqueue(TbMsg tbMsg, String queueName, Runnable onSuccess, Consumer<Throwable> onFailure) {
|
||||
TopicPartitionInfo tpi = resolvePartition(tbMsg, queueName);
|
||||
enqueue(tpi, tbMsg, onFailure, onSuccess);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void enqueue(TbMsg tbMsg, QueueId queueId, Runnable onSuccess, Consumer<Throwable> onFailure) {
|
||||
TopicPartitionInfo tpi = resolvePartition(tbMsg, queueId);
|
||||
enqueue(tpi, tbMsg, onFailure, onSuccess);
|
||||
}
|
||||
|
||||
private void enqueue(TopicPartitionInfo tpi, TbMsg tbMsg, Consumer<Throwable> onFailure, Runnable onSuccess) {
|
||||
if (!tbMsg.isValid()) {
|
||||
log.trace("[{}] Skip invalid message: {}", getTenantId(), tbMsg);
|
||||
@ -223,35 +216,30 @@ class DefaultTbContext implements TbContext {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void enqueueForTellNext(TbMsg tbMsg, QueueId queueId, String relationType, Runnable onSuccess, Consumer<Throwable> onFailure) {
|
||||
TopicPartitionInfo tpi = resolvePartition(tbMsg, queueId);
|
||||
enqueueForTellNext(tpi, queueId, tbMsg, Collections.singleton(relationType), null, onSuccess, onFailure);
|
||||
public void enqueueForTellNext(TbMsg tbMsg, String queueName, String relationType, Runnable onSuccess, Consumer<Throwable> onFailure) {
|
||||
TopicPartitionInfo tpi = resolvePartition(tbMsg, queueName);
|
||||
enqueueForTellNext(tpi, queueName, tbMsg, Collections.singleton(relationType), null, onSuccess, onFailure);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void enqueueForTellNext(TbMsg tbMsg, QueueId queueId, Set<String> relationTypes, Runnable onSuccess, Consumer<Throwable> onFailure) {
|
||||
TopicPartitionInfo tpi = resolvePartition(tbMsg, queueId);
|
||||
enqueueForTellNext(tpi, queueId, tbMsg, relationTypes, null, onSuccess, onFailure);
|
||||
public void enqueueForTellNext(TbMsg tbMsg, String queueName, Set<String> relationTypes, Runnable onSuccess, Consumer<Throwable> onFailure) {
|
||||
TopicPartitionInfo tpi = resolvePartition(tbMsg, queueName);
|
||||
enqueueForTellNext(tpi, queueName, tbMsg, relationTypes, null, onSuccess, onFailure);
|
||||
}
|
||||
|
||||
private TopicPartitionInfo resolvePartition(TbMsg tbMsg, QueueId queueId) {
|
||||
return mainCtx.resolve(ServiceType.TB_RULE_ENGINE, queueId, getTenantId(), tbMsg.getOriginator());
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
private TopicPartitionInfo resolvePartition(TbMsg tbMsg, String queueName) {
|
||||
return mainCtx.resolve(ServiceType.TB_RULE_ENGINE, queueName, getTenantId(), tbMsg.getOriginator());
|
||||
}
|
||||
|
||||
private TopicPartitionInfo resolvePartition(TbMsg tbMsg) {
|
||||
return resolvePartition(tbMsg, tbMsg.getQueueId());
|
||||
return resolvePartition(tbMsg, tbMsg.getQueueName());
|
||||
}
|
||||
|
||||
private void enqueueForTellNext(TopicPartitionInfo tpi, TbMsg source, Set<String> relationTypes, String failureMessage, Runnable onSuccess, Consumer<Throwable> onFailure) {
|
||||
enqueueForTellNext(tpi, source.getQueueId(), source, relationTypes, failureMessage, onSuccess, onFailure);
|
||||
enqueueForTellNext(tpi, source.getQueueName(), source, relationTypes, failureMessage, onSuccess, onFailure);
|
||||
}
|
||||
|
||||
private void enqueueForTellNext(TopicPartitionInfo tpi, QueueId queueId, TbMsg source, Set<String> relationTypes, String failureMessage, Runnable onSuccess, Consumer<Throwable> onFailure) {
|
||||
private void enqueueForTellNext(TopicPartitionInfo tpi, String queueName, TbMsg source, Set<String> relationTypes, String failureMessage, Runnable onSuccess, Consumer<Throwable> onFailure) {
|
||||
if (!source.isValid()) {
|
||||
log.trace("[{}] Skip invalid message: {}", getTenantId(), source);
|
||||
if (onFailure != null) {
|
||||
@ -261,7 +249,7 @@ class DefaultTbContext implements TbContext {
|
||||
}
|
||||
RuleChainId ruleChainId = nodeCtx.getSelf().getRuleChainId();
|
||||
RuleNodeId ruleNodeId = nodeCtx.getSelf().getId();
|
||||
TbMsg tbMsg = TbMsg.newMsg(source, queueId, ruleChainId, ruleNodeId);
|
||||
TbMsg tbMsg = TbMsg.newMsg(source, queueName, ruleChainId, ruleNodeId);
|
||||
TransportProtos.ToRuleEngineMsg.Builder msg = TransportProtos.ToRuleEngineMsg.newBuilder()
|
||||
.setTenantIdMSB(getTenantId().getId().getMostSignificantBits())
|
||||
.setTenantIdLSB(getTenantId().getId().getLeastSignificantBits())
|
||||
@ -320,13 +308,13 @@ class DefaultTbContext implements TbContext {
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbMsg newMsg(QueueId queueId, String type, EntityId originator, TbMsgMetaData metaData, String data) {
|
||||
return newMsg(queueId, type, originator, null, metaData, data);
|
||||
public TbMsg newMsg(String queueName, String type, EntityId originator, TbMsgMetaData metaData, String data) {
|
||||
return newMsg(queueName, type, originator, null, metaData, data);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TbMsg newMsg(QueueId queueId, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data) {
|
||||
return TbMsg.newMsg(queueId, type, originator, customerId, metaData, data, nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId());
|
||||
public TbMsg newMsg(String queueName, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data) {
|
||||
return TbMsg.newMsg(queueName, type, originator, customerId, metaData, data, nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -340,17 +328,17 @@ class DefaultTbContext implements TbContext {
|
||||
|
||||
public TbMsg deviceCreatedMsg(Device device, RuleNodeId ruleNodeId) {
|
||||
RuleChainId ruleChainId = null;
|
||||
QueueId queueId = null;
|
||||
String queueName = null;
|
||||
if (device.getDeviceProfileId() != null) {
|
||||
DeviceProfile deviceProfile = mainCtx.getDeviceProfileCache().find(device.getDeviceProfileId());
|
||||
if (deviceProfile == null) {
|
||||
log.warn("[{}] Device profile is null!", device.getDeviceProfileId());
|
||||
} else {
|
||||
ruleChainId = deviceProfile.getDefaultRuleChainId();
|
||||
queueId = deviceProfile.getDefaultQueueId();
|
||||
queueName = deviceProfile.getDefaultQueueName();
|
||||
}
|
||||
}
|
||||
return entityActionMsg(device, device.getId(), ruleNodeId, DataConstants.ENTITY_CREATED, queueId, ruleChainId);
|
||||
return entityActionMsg(device, device.getId(), ruleNodeId, DataConstants.ENTITY_CREATED, queueName, ruleChainId);
|
||||
}
|
||||
|
||||
public TbMsg assetCreatedMsg(Asset asset, RuleNodeId ruleNodeId) {
|
||||
@ -359,7 +347,7 @@ class DefaultTbContext implements TbContext {
|
||||
|
||||
public TbMsg alarmActionMsg(Alarm alarm, RuleNodeId ruleNodeId, String action) {
|
||||
RuleChainId ruleChainId = null;
|
||||
QueueId queueId = null;
|
||||
String queueName = null;
|
||||
if (EntityType.DEVICE.equals(alarm.getOriginator().getEntityType())) {
|
||||
DeviceId deviceId = new DeviceId(alarm.getOriginator().getId());
|
||||
DeviceProfile deviceProfile = mainCtx.getDeviceProfileCache().get(getTenantId(), deviceId);
|
||||
@ -367,10 +355,10 @@ class DefaultTbContext implements TbContext {
|
||||
log.warn("[{}] Device profile is null!", deviceId);
|
||||
} else {
|
||||
ruleChainId = deviceProfile.getDefaultRuleChainId();
|
||||
queueId = deviceProfile.getDefaultQueueId();
|
||||
queueName = deviceProfile.getDefaultQueueName();
|
||||
}
|
||||
}
|
||||
return entityActionMsg(alarm, alarm.getId(), ruleNodeId, action, queueId, ruleChainId);
|
||||
return entityActionMsg(alarm, alarm.getId(), ruleNodeId, action, queueName, ruleChainId);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -382,9 +370,9 @@ class DefaultTbContext implements TbContext {
|
||||
return entityActionMsg(entity, id, ruleNodeId, action, null, null);
|
||||
}
|
||||
|
||||
public <E, I extends EntityId> TbMsg entityActionMsg(E entity, I id, RuleNodeId ruleNodeId, String action, QueueId queueId, RuleChainId ruleChainId) {
|
||||
public <E, I extends EntityId> TbMsg entityActionMsg(E entity, I id, RuleNodeId ruleNodeId, String action, String queueName, RuleChainId ruleChainId) {
|
||||
try {
|
||||
return TbMsg.newMsg(queueId, action, id, getActionMetaData(ruleNodeId), mapper.writeValueAsString(mapper.valueToTree(entity)), ruleChainId, null);
|
||||
return TbMsg.newMsg(queueName, action, id, getActionMetaData(ruleNodeId), mapper.writeValueAsString(mapper.valueToTree(entity)), ruleChainId, null);
|
||||
} catch (JsonProcessingException | IllegalArgumentException e) {
|
||||
throw new RuntimeException("Failed to process " + id.getEntityType().name().toLowerCase() + " " + action + " msg: " + e);
|
||||
}
|
||||
|
||||
@ -293,7 +293,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
|
||||
try {
|
||||
checkComponentStateActive(msg);
|
||||
EntityId entityId = msg.getOriginator();
|
||||
TopicPartitionInfo tpi = systemContext.resolve(ServiceType.TB_RULE_ENGINE, msg.getQueueId(), tenantId, entityId);
|
||||
TopicPartitionInfo tpi = systemContext.resolve(ServiceType.TB_RULE_ENGINE, msg.getQueueName(), tenantId, entityId);
|
||||
|
||||
List<RuleNodeRelation> ruleNodeRelations = nodeRoutes.get(originatorNodeId);
|
||||
if (ruleNodeRelations == null) { // When unchecked, this will cause NullPointerException when rule node doesn't exist anymore
|
||||
|
||||
@ -48,9 +48,8 @@ public class DeviceProfileMsgConstructor {
|
||||
// builder.setDefaultRuleChainIdMSB(deviceProfile.getDefaultRuleChainId().getId().getMostSignificantBits())
|
||||
// .setDefaultRuleChainIdLSB(deviceProfile.getDefaultRuleChainId().getId().getLeastSignificantBits());
|
||||
// }
|
||||
if (deviceProfile.getDefaultQueueId() != null) {
|
||||
builder.setDefaultQueueIdMSB(deviceProfile.getDefaultQueueId().getId().getMostSignificantBits())
|
||||
.setDefaultQueueIdLSB(deviceProfile.getDefaultQueueId().getId().getLeastSignificantBits());
|
||||
if (deviceProfile.getDefaultQueueName() != null) {
|
||||
builder.setDefaultQueueName(deviceProfile.getDefaultQueueName());
|
||||
}
|
||||
if (deviceProfile.getDescription() != null) {
|
||||
builder.setDescription(deviceProfile.getDescription());
|
||||
|
||||
@ -158,21 +158,21 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor {
|
||||
return metaData;
|
||||
}
|
||||
|
||||
private Pair<QueueId, RuleChainId> getDefaultQueueNameAndRuleChainId(TenantId tenantId, EntityId entityId) {
|
||||
private Pair<String, RuleChainId> getDefaultQueueNameAndRuleChainId(TenantId tenantId, EntityId entityId) {
|
||||
if (EntityType.DEVICE.equals(entityId.getEntityType())) {
|
||||
DeviceProfile deviceProfile = deviceProfileCache.get(tenantId, new DeviceId(entityId.getId()));
|
||||
RuleChainId ruleChainId;
|
||||
QueueId queueId;
|
||||
String queueName;
|
||||
|
||||
if (deviceProfile == null) {
|
||||
log.warn("[{}] Device profile is null!", entityId);
|
||||
ruleChainId = null;
|
||||
queueId = null;
|
||||
queueName = null;
|
||||
} else {
|
||||
ruleChainId = deviceProfile.getDefaultRuleChainId();
|
||||
queueId = deviceProfile.getDefaultQueueId();
|
||||
queueName = deviceProfile.getDefaultQueueName();
|
||||
}
|
||||
return new ImmutablePair<>(queueId, ruleChainId);
|
||||
return new ImmutablePair<>(queueName, ruleChainId);
|
||||
} else {
|
||||
return new ImmutablePair<>(null, null);
|
||||
}
|
||||
@ -183,10 +183,8 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor {
|
||||
for (TransportProtos.TsKvListProto tsKv : msg.getTsKvListList()) {
|
||||
JsonObject json = JsonUtils.getJsonObject(tsKv.getKvList());
|
||||
metaData.putValue("ts", tsKv.getTs() + "");
|
||||
Pair<QueueId, RuleChainId> defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId);
|
||||
QueueId queueId = defaultQueueAndRuleChain.getKey();
|
||||
RuleChainId ruleChainId = defaultQueueAndRuleChain.getValue();
|
||||
TbMsg tbMsg = TbMsg.newMsg(queueId, SessionMsgType.POST_TELEMETRY_REQUEST.name(), entityId, customerId, metaData, gson.toJson(json), ruleChainId, null);
|
||||
var defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId);
|
||||
TbMsg tbMsg = TbMsg.newMsg(defaultQueueAndRuleChain.getKey(), SessionMsgType.POST_TELEMETRY_REQUEST.name(), entityId, customerId, metaData, gson.toJson(json), defaultQueueAndRuleChain.getValue(), null);
|
||||
tbClusterService.pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() {
|
||||
@Override
|
||||
public void onSuccess(TbQueueMsgMetadata metadata) {
|
||||
@ -206,10 +204,8 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor {
|
||||
private ListenableFuture<Void> processPostAttributes(TenantId tenantId, CustomerId customerId, EntityId entityId, TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData) {
|
||||
SettableFuture<Void> futureToSet = SettableFuture.create();
|
||||
JsonObject json = JsonUtils.getJsonObject(msg.getKvList());
|
||||
Pair<QueueId, RuleChainId> defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId);
|
||||
QueueId queueId = defaultQueueAndRuleChain.getKey();
|
||||
RuleChainId ruleChainId = defaultQueueAndRuleChain.getValue();
|
||||
TbMsg tbMsg = TbMsg.newMsg(queueId, SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), entityId, customerId, metaData, gson.toJson(json), ruleChainId, null);
|
||||
var defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId);
|
||||
TbMsg tbMsg = TbMsg.newMsg(defaultQueueAndRuleChain.getKey(), SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), entityId, customerId, metaData, gson.toJson(json), defaultQueueAndRuleChain.getValue(), null);
|
||||
tbClusterService.pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() {
|
||||
@Override
|
||||
public void onSuccess(TbQueueMsgMetadata metadata) {
|
||||
@ -233,10 +229,8 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor {
|
||||
Futures.addCallback(future, new FutureCallback<>() {
|
||||
@Override
|
||||
public void onSuccess(@Nullable List<String> keys) {
|
||||
Pair<QueueId, RuleChainId> defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId);
|
||||
QueueId queueId = defaultQueueAndRuleChain.getKey();
|
||||
RuleChainId ruleChainId = defaultQueueAndRuleChain.getValue();
|
||||
TbMsg tbMsg = TbMsg.newMsg(queueId, DataConstants.ATTRIBUTES_UPDATED, entityId, customerId, metaData, gson.toJson(json), ruleChainId, null);
|
||||
var defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId);
|
||||
TbMsg tbMsg = TbMsg.newMsg(defaultQueueAndRuleChain.getKey(), DataConstants.ATTRIBUTES_UPDATED, entityId, customerId, metaData, gson.toJson(json), defaultQueueAndRuleChain.getValue(), null);
|
||||
tbClusterService.pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() {
|
||||
@Override
|
||||
public void onSuccess(TbQueueMsgMetadata metadata) {
|
||||
|
||||
@ -48,13 +48,11 @@ import java.util.stream.Collectors;
|
||||
@TbCoreComponent
|
||||
@AllArgsConstructor
|
||||
public class DefaultTbQueueService extends AbstractTbEntityService implements TbQueueService {
|
||||
private static final String MAIN = "Main";
|
||||
private static final long DELETE_DELAY = 30;
|
||||
|
||||
private final QueueService queueService;
|
||||
private final TbClusterService tbClusterService;
|
||||
private final TbQueueAdmin tbQueueAdmin;
|
||||
private final DeviceProfileService deviceProfileService;
|
||||
private final SchedulerComponent scheduler;
|
||||
|
||||
@Override
|
||||
@ -205,35 +203,7 @@ public class DefaultTbQueueService extends AbstractTbEntityService implements Tb
|
||||
}
|
||||
|
||||
tenantIds.forEach(tenantId -> {
|
||||
Map<QueueId, List<DeviceProfile>> deviceProfileQueues;
|
||||
|
||||
if (oldTenantProfile != null && !newTenantProfile.getId().equals(oldTenantProfile.getId()) || !toRemove.isEmpty()) {
|
||||
List<DeviceProfile> deviceProfiles = deviceProfileService.findDeviceProfiles(tenantId, new PageLink(Integer.MAX_VALUE)).getData();
|
||||
deviceProfileQueues = deviceProfiles.stream()
|
||||
.filter(dp -> dp.getDefaultQueueId() != null)
|
||||
.collect(Collectors.groupingBy(DeviceProfile::getDefaultQueueId));
|
||||
} else {
|
||||
deviceProfileQueues = Collections.emptyMap();
|
||||
}
|
||||
|
||||
Map<String, QueueId> createdQueues = toCreate.stream()
|
||||
.map(key -> saveQueue(new Queue(tenantId, newQueues.get(key))))
|
||||
.collect(Collectors.toMap(Queue::getName, Queue::getId));
|
||||
|
||||
// assigning created queues to device profiles instead of system queues
|
||||
if (oldTenantProfile != null && !oldTenantProfile.isIsolatedTbRuleEngine()) {
|
||||
deviceProfileQueues.forEach((queueId, list) -> {
|
||||
Queue queue = queueService.findQueueById(TenantId.SYS_TENANT_ID, queueId);
|
||||
QueueId queueIdToAssign = createdQueues.get(queue.getName());
|
||||
if (queueIdToAssign == null) {
|
||||
queueIdToAssign = createdQueues.get(MAIN);
|
||||
}
|
||||
for (DeviceProfile deviceProfile : list) {
|
||||
deviceProfile.setDefaultQueueId(queueIdToAssign);
|
||||
saveDeviceProfile(deviceProfile);
|
||||
}
|
||||
});
|
||||
}
|
||||
toCreate.forEach(key -> saveQueue(new Queue(tenantId, newQueues.get(key))));
|
||||
|
||||
toUpdate.forEach(key -> {
|
||||
Queue queueToUpdate = new Queue(tenantId, newQueues.get(key));
|
||||
@ -241,9 +211,7 @@ public class DefaultTbQueueService extends AbstractTbEntityService implements Tb
|
||||
queueToUpdate.setId(foundQueue.getId());
|
||||
queueToUpdate.setCreatedTime(foundQueue.getCreatedTime());
|
||||
|
||||
if (queueToUpdate.equals(foundQueue)) {
|
||||
//Queue not changed
|
||||
} else {
|
||||
if (!queueToUpdate.equals(foundQueue)) {
|
||||
saveQueue(queueToUpdate);
|
||||
}
|
||||
});
|
||||
@ -251,26 +219,9 @@ public class DefaultTbQueueService extends AbstractTbEntityService implements Tb
|
||||
toRemove.forEach(q -> {
|
||||
Queue queue = queueService.findQueueByTenantIdAndNameInternal(tenantId, q);
|
||||
QueueId queueIdForRemove = queue.getId();
|
||||
if (deviceProfileQueues.containsKey(queueIdForRemove)) {
|
||||
Queue foundQueue = queueService.findQueueByTenantIdAndName(tenantId, q);
|
||||
if (foundQueue == null || queue.equals(foundQueue)) {
|
||||
foundQueue = queueService.findQueueByTenantIdAndName(tenantId, MAIN);
|
||||
}
|
||||
QueueId newQueueId = foundQueue.getId();
|
||||
deviceProfileQueues.get(queueIdForRemove).stream()
|
||||
.peek(dp -> dp.setDefaultQueueId(newQueueId))
|
||||
.forEach(this::saveDeviceProfile);
|
||||
}
|
||||
deleteQueue(tenantId, queueIdForRemove);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
//TODO: remove after implementing TbDeviceProfileService
|
||||
private void saveDeviceProfile(DeviceProfile deviceProfile) {
|
||||
DeviceProfile savedDeviceProfile = deviceProfileService.saveDeviceProfile(deviceProfile);
|
||||
tbClusterService.onDeviceProfileChange(savedDeviceProfile, null);
|
||||
tbClusterService.broadcastEntityStateChangeEvent(deviceProfile.getTenantId(), savedDeviceProfile.getId(), ComponentLifecycleEvent.UPDATED);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -589,10 +589,6 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService
|
||||
} catch (Exception e) {
|
||||
}
|
||||
|
||||
log.info("Updating device profiles...");
|
||||
schemaUpdateFile = Paths.get(installScripts.getDataDir(), "upgrade", "3.3.4", "schema_update_device_profile.sql");
|
||||
loadSql(schemaUpdateFile, conn);
|
||||
|
||||
log.info("Updating schema settings...");
|
||||
conn.createStatement().execute("UPDATE tb_schema_settings SET schema_version = 3004000;");
|
||||
log.info("Schema updated.");
|
||||
|
||||
@ -158,7 +158,6 @@ public class DefaultDataUpdateService implements DataUpdateService {
|
||||
log.info("Updating data from version 3.3.4 to 3.4.0 ...");
|
||||
rateLimitsUpdater.updateEntities();
|
||||
tenantsProfileQueueConfigurationUpdater.updateEntities();
|
||||
checkPointRuleNodesUpdater.updateEntities();
|
||||
break;
|
||||
default:
|
||||
throw new RuntimeException("Unable to update data, unsupported fromVersion: " + fromVersion);
|
||||
@ -629,47 +628,4 @@ public class DefaultDataUpdateService implements DataUpdateService {
|
||||
return mainQueueConfiguration;
|
||||
}
|
||||
|
||||
private final PaginatedUpdater<String, RuleNode> checkPointRuleNodesUpdater =
|
||||
new PaginatedUpdater<>() {
|
||||
|
||||
@Override
|
||||
protected String getName() {
|
||||
return "Checkpoint rule nodes updater";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean forceReportTotal() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected PageData<RuleNode> findEntities(String id, PageLink pageLink) {
|
||||
return ruleChainService.findAllRuleNodesByType("org.thingsboard.rule.engine.flow.TbCheckpointNode", pageLink);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void updateEntity(RuleNode ruleNode) {
|
||||
updateCheckPointRuleNodeConfiguration(ruleNode);
|
||||
}
|
||||
};
|
||||
|
||||
private void updateCheckPointRuleNodeConfiguration(RuleNode node) {
|
||||
try {
|
||||
ObjectNode configuration = (ObjectNode) node.getConfiguration();
|
||||
JsonNode queueNameNode = configuration.remove("queueName");
|
||||
if (queueNameNode != null) {
|
||||
RuleChain ruleChain = this.ruleChainService.findRuleChainById(TenantId.SYS_TENANT_ID, node.getRuleChainId());
|
||||
TenantId tenantId = ruleChain.getTenantId();
|
||||
Map<String, QueueId> queues =
|
||||
queueService.findQueuesByTenantId(tenantId).stream().collect(Collectors.toMap(Queue::getName, Queue::getId));
|
||||
String queueName = queueNameNode.asText();
|
||||
QueueId queueId = queues.get(queueName);
|
||||
configuration.put("queueId", queueId != null ? queueId.toString() : "");
|
||||
ruleChainService.saveRuleNode(tenantId, node);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to update checkpoint rule node configuration name=["+node.getName()+"], id=["+ node.getId().getId() +"]", e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -181,7 +181,7 @@ public class DefaultTbClusterService implements TbClusterService {
|
||||
tbMsg = transformMsg(tbMsg, deviceProfileCache.get(tenantId, new DeviceProfileId(entityId.getId())));
|
||||
}
|
||||
}
|
||||
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueId(), tenantId, entityId);
|
||||
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueName(), tenantId, entityId);
|
||||
log.trace("PUSHING msg: {} to:{}", tbMsg, tpi);
|
||||
ToRuleEngineMsg msg = ToRuleEngineMsg.newBuilder()
|
||||
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
|
||||
@ -194,16 +194,16 @@ public class DefaultTbClusterService implements TbClusterService {
|
||||
private TbMsg transformMsg(TbMsg tbMsg, DeviceProfile deviceProfile) {
|
||||
if (deviceProfile != null) {
|
||||
RuleChainId targetRuleChainId = deviceProfile.getDefaultRuleChainId();
|
||||
QueueId targetQueueId = deviceProfile.getDefaultQueueId();
|
||||
String targetQueueName = deviceProfile.getDefaultQueueName();
|
||||
boolean isRuleChainTransform = targetRuleChainId != null && !targetRuleChainId.equals(tbMsg.getRuleChainId());
|
||||
boolean isQueueTransform = targetQueueId != null && !targetQueueId.equals(tbMsg.getQueueId());
|
||||
boolean isQueueTransform = targetQueueName != null && !targetQueueName.equals(tbMsg.getQueueName());
|
||||
|
||||
if (isRuleChainTransform && isQueueTransform) {
|
||||
tbMsg = TbMsg.transformMsg(tbMsg, targetRuleChainId, targetQueueId);
|
||||
tbMsg = TbMsg.transformMsg(tbMsg, targetRuleChainId, targetQueueName);
|
||||
} else if (isRuleChainTransform) {
|
||||
tbMsg = TbMsg.transformMsg(tbMsg, targetRuleChainId);
|
||||
} else if (isQueueTransform) {
|
||||
tbMsg = TbMsg.transformMsg(tbMsg, targetQueueId);
|
||||
tbMsg = TbMsg.transformMsg(tbMsg, targetQueueName);
|
||||
}
|
||||
}
|
||||
return tbMsg;
|
||||
|
||||
@ -274,7 +274,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
|
||||
|
||||
final boolean timeout = !ctx.await(configuration.getPackProcessingTimeout(), TimeUnit.MILLISECONDS);
|
||||
|
||||
TbRuleEngineProcessingResult result = new TbRuleEngineProcessingResult(configuration.getId(), timeout, ctx);
|
||||
TbRuleEngineProcessingResult result = new TbRuleEngineProcessingResult(configuration.getName(), timeout, ctx);
|
||||
if (timeout) {
|
||||
printFirstOrAll(configuration, ctx, ctx.getPendingMap(), "Timeout");
|
||||
}
|
||||
@ -339,7 +339,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
|
||||
new TbMsgPackCallback(id, tenantId, ctx);
|
||||
try {
|
||||
if (toRuleEngineMsg.getTbMsg() != null && !toRuleEngineMsg.getTbMsg().isEmpty()) {
|
||||
forwardToRuleEngineActor(configuration.getId(), tenantId, toRuleEngineMsg, callback);
|
||||
forwardToRuleEngineActor(configuration.getName(), tenantId, toRuleEngineMsg, callback);
|
||||
} else {
|
||||
callback.onSuccess();
|
||||
}
|
||||
@ -353,7 +353,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
|
||||
log.info("{} to process [{}] messages", prefix, map.size());
|
||||
for (Map.Entry<UUID, TbProtoQueueMsg<ToRuleEngineMsg>> pending : map.entrySet()) {
|
||||
ToRuleEngineMsg tmp = pending.getValue().getValue();
|
||||
TbMsg tmpMsg = TbMsg.fromBytes(configuration.getId(), tmp.getTbMsg().toByteArray(), TbMsgCallback.EMPTY);
|
||||
TbMsg tmpMsg = TbMsg.fromBytes(configuration.getName(), tmp.getTbMsg().toByteArray(), TbMsgCallback.EMPTY);
|
||||
RuleNodeInfo ruleNodeInfo = ctx.getLastVisitedRuleNode(pending.getKey());
|
||||
if (printAll) {
|
||||
log.trace("[{}] {} to process message: {}, Last Rule Node: {}", TenantId.fromUUID(new UUID(tmp.getTenantIdMSB(), tmp.getTenantIdLSB())), prefix, tmpMsg, ruleNodeInfo);
|
||||
@ -461,8 +461,8 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
|
||||
partitionService.removeQueue(queueDeleteMsg);
|
||||
}
|
||||
|
||||
private void forwardToRuleEngineActor(QueueId queueId, TenantId tenantId, ToRuleEngineMsg toRuleEngineMsg, TbMsgCallback callback) {
|
||||
TbMsg tbMsg = TbMsg.fromBytes(queueId, toRuleEngineMsg.getTbMsg().toByteArray(), callback);
|
||||
private void forwardToRuleEngineActor(String queueName, TenantId tenantId, ToRuleEngineMsg toRuleEngineMsg, TbMsgCallback callback) {
|
||||
TbMsg tbMsg = TbMsg.fromBytes(queueName, toRuleEngineMsg.getTbMsg().toByteArray(), callback);
|
||||
QueueToRuleEngineMsg msg;
|
||||
ProtocolStringList relationTypesList = toRuleEngineMsg.getRelationTypesList();
|
||||
Set<String> relationTypes = null;
|
||||
|
||||
@ -29,7 +29,7 @@ import java.util.concurrent.ConcurrentMap;
|
||||
public class TbRuleEngineProcessingResult {
|
||||
|
||||
@Getter
|
||||
private final QueueId queueId;
|
||||
private final String queueName;
|
||||
@Getter
|
||||
private final boolean success;
|
||||
@Getter
|
||||
@ -37,8 +37,8 @@ public class TbRuleEngineProcessingResult {
|
||||
@Getter
|
||||
private final TbMsgPackProcessingContext ctx;
|
||||
|
||||
public TbRuleEngineProcessingResult(QueueId queueId, boolean timeout, TbMsgPackProcessingContext ctx) {
|
||||
this.queueId = queueId;
|
||||
public TbRuleEngineProcessingResult(String queueName, boolean timeout, TbMsgPackProcessingContext ctx) {
|
||||
this.queueName = queueName;
|
||||
this.timeout = timeout;
|
||||
this.ctx = ctx;
|
||||
this.success = !timeout && ctx.getPendingMap().isEmpty() && ctx.getFailedMap().isEmpty();
|
||||
|
||||
@ -125,7 +125,7 @@ public class TbRuleEngineProcessingStrategyFactory {
|
||||
}
|
||||
log.debug("[{}] Going to reprocess {} messages", queueName, toReprocess.size());
|
||||
if (log.isTraceEnabled()) {
|
||||
toReprocess.forEach((id, msg) -> log.trace("Going to reprocess [{}]: {}", id, TbMsg.fromBytes(result.getQueueId(), msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY)));
|
||||
toReprocess.forEach((id, msg) -> log.trace("Going to reprocess [{}]: {}", id, TbMsg.fromBytes(result.getQueueName(), msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY)));
|
||||
}
|
||||
if (pauseBetweenRetries > 0) {
|
||||
try {
|
||||
@ -164,10 +164,10 @@ public class TbRuleEngineProcessingStrategyFactory {
|
||||
log.debug("[{}] Reprocessing skipped for {} failed and {} timeout messages", queueName, result.getFailedMap().size(), result.getPendingMap().size());
|
||||
}
|
||||
if (log.isTraceEnabled()) {
|
||||
result.getFailedMap().forEach((id, msg) -> log.trace("Failed messages [{}]: {}", id, TbMsg.fromBytes(result.getQueueId(), msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY)));
|
||||
result.getFailedMap().forEach((id, msg) -> log.trace("Failed messages [{}]: {}", id, TbMsg.fromBytes(result.getQueueName(), msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY)));
|
||||
}
|
||||
if (log.isTraceEnabled()) {
|
||||
result.getPendingMap().forEach((id, msg) -> log.trace("Timeout messages [{}]: {}", id, TbMsg.fromBytes(result.getQueueId(), msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY)));
|
||||
result.getPendingMap().forEach((id, msg) -> log.trace("Timeout messages [{}]: {}", id, TbMsg.fromBytes(result.getQueueName(), msg.getValue().getTbMsg().toByteArray(), TbMsgCallback.EMPTY)));
|
||||
}
|
||||
return new TbRuleEngineProcessingDecision(true, null);
|
||||
}
|
||||
|
||||
@ -53,7 +53,6 @@ public class DeviceProfileImportService extends BaseEntityImportService<DevicePr
|
||||
deviceProfile.setDefaultDashboardId(idProvider.getInternalId(deviceProfile.getDefaultDashboardId()));
|
||||
deviceProfile.setFirmwareId(getOldEntityField(old, DeviceProfile::getFirmwareId));
|
||||
deviceProfile.setSoftwareId(getOldEntityField(old, DeviceProfile::getSoftwareId));
|
||||
deviceProfile.setDefaultQueueId(getOldEntityField(old, DeviceProfile::getDefaultQueueId));
|
||||
return deviceProfile;
|
||||
}
|
||||
|
||||
|
||||
@ -422,12 +422,6 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest {
|
||||
}
|
||||
|
||||
protected DeviceProfile createDeviceProfile(String name, DeviceProfileTransportConfiguration deviceProfileTransportConfiguration) {
|
||||
return createDeviceProfile(name, deviceProfileTransportConfiguration, null);
|
||||
}
|
||||
|
||||
protected DeviceProfile createDeviceProfile(String name,
|
||||
DeviceProfileTransportConfiguration deviceProfileTransportConfiguration,
|
||||
QueueId defaultQueueId) {
|
||||
DeviceProfile deviceProfile = new DeviceProfile();
|
||||
deviceProfile.setName(name);
|
||||
deviceProfile.setType(DeviceProfileType.DEFAULT);
|
||||
@ -445,7 +439,6 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest {
|
||||
deviceProfile.setProfileData(deviceProfileData);
|
||||
deviceProfile.setDefault(false);
|
||||
deviceProfile.setDefaultRuleChainId(null);
|
||||
deviceProfile.setDefaultQueueId(defaultQueueId);
|
||||
return deviceProfile;
|
||||
}
|
||||
|
||||
|
||||
@ -39,7 +39,6 @@ import org.thingsboard.server.common.data.DeviceTransportType;
|
||||
import org.thingsboard.server.common.data.OtaPackageInfo;
|
||||
import org.thingsboard.server.common.data.SaveOtaPackageInfoRequest;
|
||||
import org.thingsboard.server.common.data.Tenant;
|
||||
import org.thingsboard.server.common.data.TenantProfile;
|
||||
import org.thingsboard.server.common.data.User;
|
||||
import org.thingsboard.server.common.data.audit.ActionType;
|
||||
import org.thingsboard.server.common.data.device.profile.DeviceProfileTransportConfiguration;
|
||||
@ -49,15 +48,8 @@ import org.thingsboard.server.common.data.device.profile.ProtoTransportPayloadCo
|
||||
import org.thingsboard.server.common.data.device.profile.TransportPayloadTypeConfiguration;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.common.data.queue.ProcessingStrategy;
|
||||
import org.thingsboard.server.common.data.queue.ProcessingStrategyType;
|
||||
import org.thingsboard.server.common.data.queue.Queue;
|
||||
import org.thingsboard.server.common.data.queue.SubmitStrategy;
|
||||
import org.thingsboard.server.common.data.queue.SubmitStrategyType;
|
||||
import org.thingsboard.server.common.data.rule.RuleChain;
|
||||
import org.thingsboard.server.common.data.security.Authority;
|
||||
import org.thingsboard.server.common.data.tenant.profile.TenantProfileData;
|
||||
import org.thingsboard.server.common.data.tenant.profile.TenantProfileQueueConfiguration;
|
||||
import org.thingsboard.server.dao.exception.DataValidationException;
|
||||
|
||||
import java.util.ArrayList;
|
||||
@ -389,55 +381,6 @@ public abstract class BaseDeviceProfileControllerTest extends AbstractController
|
||||
.andExpect(statusReason(containsString("Can't assign dashboard from different tenant!")));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSaveDeviceProfileWithQueueFromDifferentTenant() throws Exception {
|
||||
loginDifferentTenant();
|
||||
loginSysAdmin();
|
||||
TenantProfile tenantProfile = new TenantProfile();
|
||||
tenantProfile.setDefault(false);
|
||||
tenantProfile.setName("Isolated TB Rule Engine");
|
||||
tenantProfile.setDescription("Isolated TB Rule Engine tenant profile");
|
||||
tenantProfile.setIsolatedTbCore(false);
|
||||
tenantProfile.setIsolatedTbRuleEngine(true);
|
||||
|
||||
TenantProfileQueueConfiguration mainQueueConfiguration = new TenantProfileQueueConfiguration();
|
||||
mainQueueConfiguration.setName("Main");
|
||||
mainQueueConfiguration.setTopic("tb_rule_engine.main");
|
||||
mainQueueConfiguration.setPollInterval(25);
|
||||
mainQueueConfiguration.setPartitions(10);
|
||||
mainQueueConfiguration.setConsumerPerPartition(true);
|
||||
mainQueueConfiguration.setPackProcessingTimeout(2000);
|
||||
SubmitStrategy mainQueueSubmitStrategy = new SubmitStrategy();
|
||||
mainQueueSubmitStrategy.setType(SubmitStrategyType.BURST);
|
||||
mainQueueSubmitStrategy.setBatchSize(1000);
|
||||
mainQueueConfiguration.setSubmitStrategy(mainQueueSubmitStrategy);
|
||||
ProcessingStrategy mainQueueProcessingStrategy = new ProcessingStrategy();
|
||||
mainQueueProcessingStrategy.setType(ProcessingStrategyType.SKIP_ALL_FAILURES);
|
||||
mainQueueProcessingStrategy.setRetries(3);
|
||||
mainQueueProcessingStrategy.setFailurePercentage(0);
|
||||
mainQueueProcessingStrategy.setPauseBetweenRetries(3);
|
||||
mainQueueProcessingStrategy.setMaxPauseBetweenRetries(3);
|
||||
mainQueueConfiguration.setProcessingStrategy(mainQueueProcessingStrategy);
|
||||
TenantProfileData profileData = tenantProfile.getProfileData();
|
||||
profileData.setQueueConfiguration(Collections.singletonList(mainQueueConfiguration));
|
||||
tenantProfile.setProfileData(profileData);
|
||||
TenantProfile savedTenantProfile = doPost("/api/tenantProfile", tenantProfile, TenantProfile.class);
|
||||
savedDifferentTenant.setTenantProfileId(savedTenantProfile.getId());
|
||||
savedDifferentTenant = doPost("/api/tenant", savedDifferentTenant, Tenant.class);
|
||||
loginDifferentTenant();
|
||||
PageLink pageLink = new PageLink(1);
|
||||
PageData<Queue> pageData = doGetTypedWithPageLink("/api/queues?serviceType=TB_RULE_ENGINE&",
|
||||
new TypeReference<>() {}, pageLink);
|
||||
Queue differentQueue = pageData.getData().get(0);
|
||||
|
||||
loginTenantAdmin();
|
||||
|
||||
DeviceProfile deviceProfile = this.createDeviceProfile("Device Profile");
|
||||
deviceProfile.setDefaultQueueId(differentQueue.getId());
|
||||
doPost("/api/deviceProfile", deviceProfile).andExpect(status().isBadRequest())
|
||||
.andExpect(statusReason(containsString("Can't assign queue from different tenant!")));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSaveDeviceProfileWithFirmwareFromDifferentTenant() throws Exception {
|
||||
loginDifferentTenant();
|
||||
|
||||
@ -163,7 +163,6 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
|
||||
private Tenant savedTenant;
|
||||
private TenantId tenantId;
|
||||
private User tenantAdmin;
|
||||
private QueueId defaultQueueId;
|
||||
|
||||
private DeviceProfile thermostatDeviceProfile;
|
||||
|
||||
@ -186,8 +185,6 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
|
||||
tenantId = savedTenant.getId();
|
||||
Assert.assertNotNull(savedTenant);
|
||||
|
||||
defaultQueueId = getRandomQueueId();
|
||||
|
||||
tenantAdmin = new User();
|
||||
tenantAdmin.setAuthority(Authority.TENANT_ADMIN);
|
||||
tenantAdmin.setTenantId(savedTenant.getId());
|
||||
@ -378,7 +375,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
|
||||
@Test
|
||||
public void testDeviceProfiles() throws Exception {
|
||||
// 1
|
||||
DeviceProfile deviceProfile = this.createDeviceProfile("ONE_MORE_DEVICE_PROFILE", null, defaultQueueId);
|
||||
DeviceProfile deviceProfile = this.createDeviceProfile("ONE_MORE_DEVICE_PROFILE", null);
|
||||
extendDeviceProfileData(deviceProfile);
|
||||
edgeImitator.expectMessageAmount(1);
|
||||
deviceProfile = doPost("/api/deviceProfile", deviceProfile, DeviceProfile.class);
|
||||
@ -389,8 +386,6 @@ abstract public class BaseEdgeTest extends AbstractControllerTest {
|
||||
Assert.assertEquals(UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, deviceProfileUpdateMsg.getMsgType());
|
||||
Assert.assertEquals(deviceProfileUpdateMsg.getIdMSB(), deviceProfile.getUuidId().getMostSignificantBits());
|
||||
Assert.assertEquals(deviceProfileUpdateMsg.getIdLSB(), deviceProfile.getUuidId().getLeastSignificantBits());
|
||||
Assert.assertEquals(defaultQueueId.getId().getMostSignificantBits(), deviceProfileUpdateMsg.getDefaultQueueIdMSB());
|
||||
Assert.assertEquals(defaultQueueId.getId().getLeastSignificantBits(), deviceProfileUpdateMsg.getDefaultQueueIdLSB());
|
||||
|
||||
// 2
|
||||
edgeImitator.expectMessageAmount(1);
|
||||
|
||||
@ -75,8 +75,6 @@ public class DeviceProfile extends SearchTextBased<DeviceProfileId> implements H
|
||||
@ApiModelProperty(position = 6, value = "Reference to the dashboard. Used in the mobile application to open the default dashboard when user navigates to device details.")
|
||||
private DashboardId defaultDashboardId;
|
||||
|
||||
@JsonIgnore
|
||||
private QueueId defaultQueueId;
|
||||
@NoXss
|
||||
@ApiModelProperty(position = 8, value = "Rule engine queue name. " +
|
||||
"If present, the specified queue will be used to store all unprocessed messages related to device, including telemetry, attribute updates, etc. " +
|
||||
@ -114,7 +112,6 @@ public class DeviceProfile extends SearchTextBased<DeviceProfileId> implements H
|
||||
this.isDefault = deviceProfile.isDefault();
|
||||
this.defaultRuleChainId = deviceProfile.getDefaultRuleChainId();
|
||||
this.defaultDashboardId = deviceProfile.getDefaultDashboardId();
|
||||
this.defaultQueueId = deviceProfile.getDefaultQueueId();
|
||||
this.defaultQueueName = deviceProfile.getDefaultQueueName();
|
||||
this.setProfileData(deviceProfile.getProfileData());
|
||||
this.provisionDeviceKey = deviceProfile.getProvisionDeviceKey();
|
||||
|
||||
@ -216,8 +216,6 @@ message DeviceProfileUpdateMsg {
|
||||
bytes profileDataBytes = 13;
|
||||
optional string provisionDeviceKey = 14;
|
||||
optional bytes image = 15;
|
||||
int64 defaultQueueIdMSB = 16;
|
||||
int64 defaultQueueIdLSB = 17;
|
||||
}
|
||||
|
||||
message DeviceCredentialsUpdateMsg {
|
||||
|
||||
@ -43,7 +43,7 @@ import java.util.UUID;
|
||||
@Slf4j
|
||||
public final class TbMsg implements Serializable {
|
||||
|
||||
private final QueueId queueId;
|
||||
private final String queueName;
|
||||
private final UUID id;
|
||||
private final long ts;
|
||||
private final String type;
|
||||
@ -67,12 +67,12 @@ public final class TbMsg implements Serializable {
|
||||
return ctx.getAndIncrementRuleNodeCounter();
|
||||
}
|
||||
|
||||
public static TbMsg newMsg(QueueId queueId, String type, EntityId originator, TbMsgMetaData metaData, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
|
||||
return newMsg(queueId, type, originator, null, metaData, data, ruleChainId, ruleNodeId);
|
||||
public static TbMsg newMsg(String queueName, String type, EntityId originator, TbMsgMetaData metaData, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
|
||||
return newMsg(queueName, type, originator, null, metaData, data, ruleChainId, ruleNodeId);
|
||||
}
|
||||
|
||||
public static TbMsg newMsg(QueueId queueId, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
|
||||
return new TbMsg(queueId, UUID.randomUUID(), System.currentTimeMillis(), type, originator, customerId,
|
||||
public static TbMsg newMsg(String queueName, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
|
||||
return new TbMsg(queueName, UUID.randomUUID(), System.currentTimeMillis(), type, originator, customerId,
|
||||
metaData.copy(), TbMsgDataType.JSON, data, ruleChainId, ruleNodeId, null, TbMsgCallback.EMPTY);
|
||||
}
|
||||
|
||||
@ -87,12 +87,12 @@ public final class TbMsg implements Serializable {
|
||||
|
||||
// REALLY NEW MSG
|
||||
|
||||
public static TbMsg newMsg(QueueId queueId, String type, EntityId originator, TbMsgMetaData metaData, String data) {
|
||||
return newMsg(queueId, type, originator, null, metaData, data);
|
||||
public static TbMsg newMsg(String queueName, String type, EntityId originator, TbMsgMetaData metaData, String data) {
|
||||
return newMsg(queueName, type, originator, null, metaData, data);
|
||||
}
|
||||
|
||||
public static TbMsg newMsg(QueueId queueId, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data) {
|
||||
return new TbMsg(queueId, UUID.randomUUID(), System.currentTimeMillis(), type, originator, customerId,
|
||||
public static TbMsg newMsg(String queueName, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data) {
|
||||
return new TbMsg(queueName, UUID.randomUUID(), System.currentTimeMillis(), type, originator, customerId,
|
||||
metaData.copy(), TbMsgDataType.JSON, data, null, null, null, TbMsgCallback.EMPTY);
|
||||
}
|
||||
|
||||
@ -118,40 +118,40 @@ public final class TbMsg implements Serializable {
|
||||
}
|
||||
|
||||
public static TbMsg transformMsg(TbMsg tbMsg, String type, EntityId originator, TbMsgMetaData metaData, String data) {
|
||||
return new TbMsg(tbMsg.queueId, tbMsg.id, tbMsg.ts, type, originator, tbMsg.customerId, metaData.copy(), tbMsg.dataType,
|
||||
return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, type, originator, tbMsg.customerId, metaData.copy(), tbMsg.dataType,
|
||||
data, tbMsg.ruleChainId, tbMsg.ruleNodeId, tbMsg.ctx.copy(), tbMsg.callback);
|
||||
}
|
||||
|
||||
public static TbMsg transformMsg(TbMsg tbMsg, CustomerId customerId) {
|
||||
return new TbMsg(tbMsg.queueId, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, customerId, tbMsg.metaData, tbMsg.dataType,
|
||||
return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, customerId, tbMsg.metaData, tbMsg.dataType,
|
||||
tbMsg.data, tbMsg.ruleChainId, tbMsg.ruleNodeId, tbMsg.ctx.copy(), tbMsg.getCallback());
|
||||
}
|
||||
|
||||
public static TbMsg transformMsg(TbMsg tbMsg, RuleChainId ruleChainId) {
|
||||
return new TbMsg(tbMsg.queueId, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, tbMsg.customerId, tbMsg.metaData, tbMsg.dataType,
|
||||
return new TbMsg(tbMsg.queueName, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, tbMsg.customerId, tbMsg.metaData, tbMsg.dataType,
|
||||
tbMsg.data, ruleChainId, null, tbMsg.ctx.copy(), tbMsg.getCallback());
|
||||
}
|
||||
|
||||
public static TbMsg transformMsg(TbMsg tbMsg, QueueId queueId) {
|
||||
return new TbMsg(queueId, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, tbMsg.customerId, tbMsg.metaData, tbMsg.dataType,
|
||||
public static TbMsg transformMsg(TbMsg tbMsg, String queueName) {
|
||||
return new TbMsg(queueName, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, tbMsg.customerId, tbMsg.metaData, tbMsg.dataType,
|
||||
tbMsg.data, tbMsg.getRuleChainId(), null, tbMsg.ctx.copy(), tbMsg.getCallback());
|
||||
}
|
||||
|
||||
public static TbMsg transformMsg(TbMsg tbMsg, RuleChainId ruleChainId, QueueId queueId) {
|
||||
return new TbMsg(queueId, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, tbMsg.customerId, tbMsg.metaData, tbMsg.dataType,
|
||||
public static TbMsg transformMsg(TbMsg tbMsg, RuleChainId ruleChainId, String queueName) {
|
||||
return new TbMsg(queueName, tbMsg.id, tbMsg.ts, tbMsg.type, tbMsg.originator, tbMsg.customerId, tbMsg.metaData, tbMsg.dataType,
|
||||
tbMsg.data, ruleChainId, null, tbMsg.ctx.copy(), tbMsg.getCallback());
|
||||
}
|
||||
|
||||
//used for enqueueForTellNext
|
||||
public static TbMsg newMsg(TbMsg tbMsg, QueueId queueId, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
|
||||
return new TbMsg(queueId, UUID.randomUUID(), tbMsg.getTs(), tbMsg.getType(), tbMsg.getOriginator(), tbMsg.customerId, tbMsg.getMetaData().copy(),
|
||||
public static TbMsg newMsg(TbMsg tbMsg, String queueName, RuleChainId ruleChainId, RuleNodeId ruleNodeId) {
|
||||
return new TbMsg(queueName, UUID.randomUUID(), tbMsg.getTs(), tbMsg.getType(), tbMsg.getOriginator(), tbMsg.customerId, tbMsg.getMetaData().copy(),
|
||||
tbMsg.getDataType(), tbMsg.getData(), ruleChainId, ruleNodeId, tbMsg.ctx.copy(), TbMsgCallback.EMPTY);
|
||||
}
|
||||
|
||||
private TbMsg(QueueId queueId, UUID id, long ts, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, TbMsgDataType dataType, String data,
|
||||
private TbMsg(String queueName, UUID id, long ts, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, TbMsgDataType dataType, String data,
|
||||
RuleChainId ruleChainId, RuleNodeId ruleNodeId, TbMsgProcessingCtx ctx, TbMsgCallback callback) {
|
||||
this.id = id;
|
||||
this.queueId = queueId;
|
||||
this.queueName = queueName;
|
||||
if (ts > 0) {
|
||||
this.ts = ts;
|
||||
} else {
|
||||
@ -220,7 +220,7 @@ public final class TbMsg implements Serializable {
|
||||
return builder.build().toByteArray();
|
||||
}
|
||||
|
||||
public static TbMsg fromBytes(QueueId queueId, byte[] data, TbMsgCallback callback) {
|
||||
public static TbMsg fromBytes(String queueName, byte[] data, TbMsgCallback callback) {
|
||||
try {
|
||||
MsgProtos.TbMsgProto proto = MsgProtos.TbMsgProto.parseFrom(data);
|
||||
TbMsgMetaData metaData = new TbMsgMetaData(proto.getMetaData().getDataMap());
|
||||
@ -247,7 +247,7 @@ public final class TbMsg implements Serializable {
|
||||
}
|
||||
|
||||
TbMsgDataType dataType = TbMsgDataType.values()[proto.getDataType()];
|
||||
return new TbMsg(queueId, UUID.fromString(proto.getId()), proto.getTs(), proto.getType(), entityId, customerId,
|
||||
return new TbMsg(queueName, UUID.fromString(proto.getId()), proto.getTs(), proto.getType(), entityId, customerId,
|
||||
metaData, dataType, proto.getData(), ruleChainId, ruleNodeId, ctx, callback);
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
throw new IllegalStateException("Could not parse protobuf for TbMsg", e);
|
||||
@ -259,12 +259,12 @@ public final class TbMsg implements Serializable {
|
||||
}
|
||||
|
||||
public TbMsg copyWithRuleChainId(RuleChainId ruleChainId, UUID msgId) {
|
||||
return new TbMsg(this.queueId, msgId, this.ts, this.type, this.originator, this.customerId,
|
||||
return new TbMsg(this.queueName, msgId, this.ts, this.type, this.originator, this.customerId,
|
||||
this.metaData, this.dataType, this.data, ruleChainId, null, this.ctx, callback);
|
||||
}
|
||||
|
||||
public TbMsg copyWithRuleNodeId(RuleChainId ruleChainId, RuleNodeId ruleNodeId, UUID msgId) {
|
||||
return new TbMsg(this.queueId, msgId, this.ts, this.type, this.originator, this.customerId,
|
||||
return new TbMsg(this.queueName, msgId, this.ts, this.type, this.originator, this.customerId,
|
||||
this.metaData, this.dataType, this.data, ruleChainId, ruleNodeId, this.ctx, callback);
|
||||
}
|
||||
|
||||
|
||||
@ -68,8 +68,6 @@ public class HashPartitionService implements PartitionService {
|
||||
private final TenantRoutingInfoService tenantRoutingInfoService;
|
||||
private final QueueRoutingInfoService queueRoutingInfoService;
|
||||
|
||||
private final ConcurrentMap<QueueId, QueueRoutingInfo> queuesById = new ConcurrentHashMap<>();
|
||||
|
||||
private ConcurrentMap<QueueKey, List<Integer>> myPartitions = new ConcurrentHashMap<>();
|
||||
|
||||
private final ConcurrentMap<QueueKey, String> partitionTopicsMap = new ConcurrentHashMap<>();
|
||||
@ -121,7 +119,6 @@ public class HashPartitionService implements PartitionService {
|
||||
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queue);
|
||||
partitionTopicsMap.put(queueKey, queue.getQueueTopic());
|
||||
partitionSizesMap.put(queueKey, queue.getPartitions());
|
||||
queuesById.put(queue.getQueueId(), queue);
|
||||
});
|
||||
}
|
||||
|
||||
@ -165,8 +162,6 @@ public class HashPartitionService implements PartitionService {
|
||||
public void updateQueue(TransportProtos.QueueUpdateMsg queueUpdateMsg) {
|
||||
TenantId tenantId = new TenantId(new UUID(queueUpdateMsg.getTenantIdMSB(), queueUpdateMsg.getTenantIdLSB()));
|
||||
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueUpdateMsg.getQueueName(), tenantId);
|
||||
QueueRoutingInfo queue = new QueueRoutingInfo(queueUpdateMsg);
|
||||
queuesById.put(queue.getQueueId(), queue);
|
||||
partitionTopicsMap.put(queueKey, queueUpdateMsg.getQueueTopic());
|
||||
partitionSizesMap.put(queueKey, queueUpdateMsg.getPartitions());
|
||||
myPartitions.remove(queueKey);
|
||||
@ -176,7 +171,6 @@ public class HashPartitionService implements PartitionService {
|
||||
public void removeQueue(TransportProtos.QueueDeleteMsg queueDeleteMsg) {
|
||||
TenantId tenantId = new TenantId(new UUID(queueDeleteMsg.getTenantIdMSB(), queueDeleteMsg.getTenantIdLSB()));
|
||||
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueDeleteMsg.getQueueName(), tenantId);
|
||||
queuesById.remove(new QueueId(new UUID(queueDeleteMsg.getQueueIdMSB(), queueDeleteMsg.getQueueIdLSB())));
|
||||
myPartitions.remove(queueKey);
|
||||
partitionTopicsMap.remove(queueKey);
|
||||
partitionSizesMap.remove(queueKey);
|
||||
@ -185,9 +179,7 @@ public class HashPartitionService implements PartitionService {
|
||||
}
|
||||
|
||||
@Override
|
||||
@Deprecated
|
||||
public TopicPartitionInfo resolve(ServiceType serviceType, TenantId tenantId, EntityId entityId, String queueName) {
|
||||
log.warn("This method is deprecated and will be removed!!!");
|
||||
public TopicPartitionInfo resolve(ServiceType serviceType, String queueName, TenantId tenantId, EntityId entityId) {
|
||||
TenantId isolatedOrSystemTenantId = getIsolatedOrSystemTenantId(serviceType, tenantId);
|
||||
QueueKey queueKey = new QueueKey(serviceType, queueName, isolatedOrSystemTenantId);
|
||||
if (!partitionSizesMap.containsKey(queueKey)) {
|
||||
@ -201,32 +193,6 @@ public class HashPartitionService implements PartitionService {
|
||||
return resolve(serviceType, null, tenantId, entityId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TopicPartitionInfo resolve(ServiceType serviceType, QueueId queueId, TenantId tenantId, EntityId entityId) {
|
||||
QueueKey queueKey;
|
||||
if (queueId == null) {
|
||||
queueKey = getMainQueueKey(serviceType, tenantId);
|
||||
} else {
|
||||
QueueRoutingInfo queueRoutingInfo = queuesById.get(queueId);
|
||||
|
||||
if (queueRoutingInfo == null) {
|
||||
log.debug("Queue was removed but still used in CheckPoint rule node. [{}][{}]", tenantId, entityId);
|
||||
queueKey = getMainQueueKey(serviceType, tenantId);
|
||||
} else if (!queueRoutingInfo.getTenantId().equals(getIsolatedOrSystemTenantId(serviceType, tenantId))) {
|
||||
log.debug("Tenant profile was changed but CheckPoint rule node still uses the queue from system level. [{}][{}]", tenantId, entityId);
|
||||
queueKey = getMainQueueKey(serviceType, tenantId);
|
||||
} else {
|
||||
queueKey = new QueueKey(serviceType, queueRoutingInfo);
|
||||
}
|
||||
}
|
||||
|
||||
return resolve(queueKey, entityId);
|
||||
}
|
||||
|
||||
private QueueKey getMainQueueKey(ServiceType serviceType, TenantId tenantId) {
|
||||
return new QueueKey(serviceType, getIsolatedOrSystemTenantId(serviceType, tenantId));
|
||||
}
|
||||
|
||||
private TopicPartitionInfo resolve(QueueKey queueKey, EntityId entityId) {
|
||||
int hash = hashFunction.newHasher()
|
||||
.putLong(entityId.getId().getMostSignificantBits())
|
||||
|
||||
@ -32,13 +32,10 @@ import java.util.UUID;
|
||||
*/
|
||||
public interface PartitionService {
|
||||
|
||||
@Deprecated
|
||||
TopicPartitionInfo resolve(ServiceType serviceType, TenantId tenantId, EntityId entityId, String queueName);
|
||||
TopicPartitionInfo resolve(ServiceType serviceType, String queueName, TenantId tenantId, EntityId entityId);
|
||||
|
||||
TopicPartitionInfo resolve(ServiceType serviceType, TenantId tenantId, EntityId entityId);
|
||||
|
||||
TopicPartitionInfo resolve(ServiceType serviceType, QueueId queueId, TenantId tenantId, EntityId entityId);
|
||||
|
||||
/**
|
||||
* Received from the Discovery service when network topology is changed.
|
||||
* @param currentService - current service information {@link org.thingsboard.server.gen.transport.TransportProtos.ServiceInfo}
|
||||
|
||||
@ -1088,7 +1088,7 @@ public class DefaultTransportService implements TransportService {
|
||||
}
|
||||
|
||||
private void sendToRuleEngine(TenantId tenantId, TbMsg tbMsg, TbQueueCallback callback) {
|
||||
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueId(), tenantId, tbMsg.getOriginator());
|
||||
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueName(), tenantId, tbMsg.getOriginator());
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("[{}][{}] Pushing to topic {} message {}", tenantId, tbMsg.getOriginator(), tpi.getFullTopicName(), tbMsg);
|
||||
}
|
||||
@ -1105,18 +1105,18 @@ public class DefaultTransportService implements TransportService {
|
||||
DeviceProfileId deviceProfileId = new DeviceProfileId(new UUID(sessionInfo.getDeviceProfileIdMSB(), sessionInfo.getDeviceProfileIdLSB()));
|
||||
DeviceProfile deviceProfile = deviceProfileCache.get(deviceProfileId);
|
||||
RuleChainId ruleChainId;
|
||||
QueueId queueId;
|
||||
String queueName;
|
||||
|
||||
if (deviceProfile == null) {
|
||||
log.warn("[{}] Device profile is null!", deviceProfileId);
|
||||
ruleChainId = null;
|
||||
queueId = null;
|
||||
queueName = null;
|
||||
} else {
|
||||
ruleChainId = deviceProfile.getDefaultRuleChainId();
|
||||
queueId = deviceProfile.getDefaultQueueId();
|
||||
queueName = deviceProfile.getDefaultQueueName();
|
||||
}
|
||||
|
||||
TbMsg tbMsg = TbMsg.newMsg(queueId, sessionMsgType.name(), deviceId, customerId, metaData, gson.toJson(json), ruleChainId, null);
|
||||
TbMsg tbMsg = TbMsg.newMsg(queueName, sessionMsgType.name(), deviceId, customerId, metaData, gson.toJson(json), ruleChainId, null);
|
||||
sendToRuleEngine(tenantId, tbMsg, callback);
|
||||
}
|
||||
|
||||
|
||||
@ -120,12 +120,6 @@ public class DeviceProfileServiceImpl extends AbstractCachedEntityService<Device
|
||||
DeviceProfile oldDeviceProfile = deviceProfileValidator.validate(deviceProfile, DeviceProfile::getTenantId);
|
||||
DeviceProfile savedDeviceProfile;
|
||||
try {
|
||||
if (StringUtils.isNotEmpty(deviceProfile.getDefaultQueueName())) {
|
||||
Queue existing = queueService.findQueueByTenantIdAndName(deviceProfile.getTenantId(), deviceProfile.getDefaultQueueName());
|
||||
if (existing != null) {
|
||||
deviceProfile.setDefaultQueueId(existing.getId());
|
||||
}
|
||||
}
|
||||
savedDeviceProfile = deviceProfileDao.saveAndFlush(deviceProfile.getTenantId(), deviceProfile);
|
||||
publishEvictEvent(new DeviceProfileEvictEvent(savedDeviceProfile.getTenantId(), savedDeviceProfile.getName(),
|
||||
oldDeviceProfile != null ? oldDeviceProfile.getName() : null, savedDeviceProfile.getId(), savedDeviceProfile.isDefault()));
|
||||
|
||||
@ -88,9 +88,6 @@ public final class DeviceProfileEntity extends BaseSqlEntity<DeviceProfile> impl
|
||||
@Column(name = ModelConstants.DEVICE_PROFILE_DEFAULT_DASHBOARD_ID_PROPERTY)
|
||||
private UUID defaultDashboardId;
|
||||
|
||||
@Column(name = ModelConstants.DEVICE_PROFILE_DEFAULT_QUEUE_ID_PROPERTY)
|
||||
private UUID defaultQueueId;
|
||||
|
||||
@Column(name = ModelConstants.DEVICE_PROFILE_DEFAULT_QUEUE_NAME_PROPERTY)
|
||||
private String defaultQueueName;
|
||||
|
||||
@ -137,9 +134,6 @@ public final class DeviceProfileEntity extends BaseSqlEntity<DeviceProfile> impl
|
||||
this.defaultDashboardId = deviceProfile.getDefaultDashboardId().getId();
|
||||
}
|
||||
this.defaultQueueName = deviceProfile.getDefaultQueueName();
|
||||
if (deviceProfile.getDefaultQueueId() != null) {
|
||||
this.defaultQueueId = deviceProfile.getDefaultQueueId().getId();
|
||||
}
|
||||
this.provisionDeviceKey = deviceProfile.getProvisionDeviceKey();
|
||||
if (deviceProfile.getFirmwareId() != null) {
|
||||
this.firmwareId = deviceProfile.getFirmwareId().getId();
|
||||
@ -188,9 +182,6 @@ public final class DeviceProfileEntity extends BaseSqlEntity<DeviceProfile> impl
|
||||
if (defaultDashboardId != null) {
|
||||
deviceProfile.setDefaultDashboardId(new DashboardId(defaultDashboardId));
|
||||
}
|
||||
if (defaultQueueId != null) {
|
||||
deviceProfile.setDefaultQueueId(new QueueId(defaultQueueId));
|
||||
}
|
||||
deviceProfile.setProvisionDeviceKey(provisionDeviceKey);
|
||||
|
||||
if (firmwareId != null) {
|
||||
|
||||
@ -126,16 +126,11 @@ public class DeviceProfileDataValidator extends AbstractHasOtaPackageValidator<D
|
||||
throw new DataValidationException("Another default device profile is present in scope of current tenant!");
|
||||
}
|
||||
}
|
||||
if (deviceProfile.getDefaultQueueId() != null) {
|
||||
Queue queue = queueService.findQueueById(tenantId, deviceProfile.getDefaultQueueId());
|
||||
if (deviceProfile.getDefaultQueueName() != null) {
|
||||
Queue queue = queueService.findQueueByTenantIdAndName(tenantId, deviceProfile.getDefaultQueueName());
|
||||
if (queue == null) {
|
||||
throw new DataValidationException("Device profile is referencing to non-existent queue!");
|
||||
}
|
||||
TenantProfile tenantProfile = tenantProfileCache.get(deviceProfile.getTenantId());
|
||||
if ((tenantProfile.isIsolatedTbRuleEngine() && !queue.getTenantId().equals(deviceProfile.getTenantId()))
|
||||
|| (!tenantProfile.isIsolatedTbRuleEngine() && !queue.getTenantId().isNullUid())) {
|
||||
throw new DataValidationException("Can't assign queue from different tenant!");
|
||||
}
|
||||
}
|
||||
if (deviceProfile.getProvisionType() == null) {
|
||||
deviceProfile.setProvisionType(DeviceProfileProvisionType.DISABLED);
|
||||
|
||||
@ -253,7 +253,6 @@ CREATE TABLE IF NOT EXISTS device_profile (
|
||||
software_id uuid,
|
||||
default_rule_chain_id uuid,
|
||||
default_dashboard_id uuid,
|
||||
default_queue_id uuid,
|
||||
default_queue_name varchar(255),
|
||||
provision_device_key varchar,
|
||||
external_id uuid,
|
||||
@ -261,7 +260,6 @@ CREATE TABLE IF NOT EXISTS device_profile (
|
||||
CONSTRAINT device_provision_key_unq_key UNIQUE (provision_device_key),
|
||||
CONSTRAINT fk_default_rule_chain_device_profile FOREIGN KEY (default_rule_chain_id) REFERENCES rule_chain(id),
|
||||
CONSTRAINT fk_default_dashboard_device_profile FOREIGN KEY (default_dashboard_id) REFERENCES dashboard(id),
|
||||
CONSTRAINT fk_default_queue_device_profile FOREIGN KEY (default_queue_id) REFERENCES queue(id),
|
||||
CONSTRAINT fk_firmware_device_profile FOREIGN KEY (firmware_id) REFERENCES ota_package(id),
|
||||
CONSTRAINT fk_software_device_profile FOREIGN KEY (software_id) REFERENCES ota_package(id)
|
||||
);
|
||||
|
||||
@ -142,15 +142,12 @@ public interface TbContext {
|
||||
*/
|
||||
void output(TbMsg msg, String relationType);
|
||||
|
||||
@Deprecated
|
||||
void enqueue(TbMsg tbMsg, String queueName, Runnable onSuccess, Consumer<Throwable> onFailure);
|
||||
|
||||
/**
|
||||
* Puts new message to custom queue for processing
|
||||
*
|
||||
* @param msg - message
|
||||
*/
|
||||
void enqueue(TbMsg msg, QueueId queueId, Runnable onSuccess, Consumer<Throwable> onFailure);
|
||||
void enqueue(TbMsg msg, String queueName, Runnable onSuccess, Consumer<Throwable> onFailure);
|
||||
|
||||
void enqueueForTellFailure(TbMsg msg, String failureMessage);
|
||||
|
||||
@ -162,15 +159,15 @@ public interface TbContext {
|
||||
|
||||
void enqueueForTellNext(TbMsg msg, Set<String> relationTypes, Runnable onSuccess, Consumer<Throwable> onFailure);
|
||||
|
||||
void enqueueForTellNext(TbMsg msg, QueueId queueId, String relationType, Runnable onSuccess, Consumer<Throwable> onFailure);
|
||||
void enqueueForTellNext(TbMsg msg, String queueName, String relationType, Runnable onSuccess, Consumer<Throwable> onFailure);
|
||||
|
||||
void enqueueForTellNext(TbMsg msg, QueueId queueId, Set<String> relationTypes, Runnable onSuccess, Consumer<Throwable> onFailure);
|
||||
void enqueueForTellNext(TbMsg msg, String queueName, Set<String> relationTypes, Runnable onSuccess, Consumer<Throwable> onFailure);
|
||||
|
||||
void ack(TbMsg tbMsg);
|
||||
|
||||
TbMsg newMsg(QueueId queueId, String type, EntityId originator, TbMsgMetaData metaData, String data);
|
||||
TbMsg newMsg(String queueName, String type, EntityId originator, TbMsgMetaData metaData, String data);
|
||||
|
||||
TbMsg newMsg(QueueId queueId, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data);
|
||||
TbMsg newMsg(String queueName, String type, EntityId originator, CustomerId customerId, TbMsgMetaData metaData, String data);
|
||||
|
||||
TbMsg transformMsg(TbMsg origMsg, String type, EntityId originator, TbMsgMetaData metaData, String data);
|
||||
|
||||
|
||||
@ -138,7 +138,7 @@ public class TbCopyAttributesToEntityViewNode implements TbNode {
|
||||
}
|
||||
|
||||
private void transformAndTellNext(TbContext ctx, TbMsg msg, EntityView entityView) {
|
||||
ctx.enqueueForTellNext(ctx.newMsg(msg.getQueueId(), msg.getType(), entityView.getId(), msg.getCustomerId(), msg.getMetaData(), msg.getData()), SUCCESS);
|
||||
ctx.enqueueForTellNext(ctx.newMsg(msg.getQueueName(), msg.getType(), entityView.getId(), msg.getCustomerId(), msg.getMetaData(), msg.getData()), SUCCESS);
|
||||
}
|
||||
|
||||
private boolean attributeContainsInEntityView(String scope, String attrKey, EntityView entityView) {
|
||||
|
||||
@ -77,7 +77,7 @@ public class TbMsgCountNode implements TbNode {
|
||||
TbMsgMetaData metaData = new TbMsgMetaData();
|
||||
metaData.putValue("delta", Long.toString(System.currentTimeMillis() - lastScheduledTs + delay));
|
||||
|
||||
TbMsg tbMsg = TbMsg.newMsg(msg.getQueueId(), SessionMsgType.POST_TELEMETRY_REQUEST.name(), ctx.getTenantId(), msg.getCustomerId(), metaData, gson.toJson(telemetryJson));
|
||||
TbMsg tbMsg = TbMsg.newMsg(msg.getQueueName(), SessionMsgType.POST_TELEMETRY_REQUEST.name(), ctx.getTenantId(), msg.getCustomerId(), metaData, gson.toJson(telemetryJson));
|
||||
ctx.enqueueForTellNext(tbMsg, SUCCESS);
|
||||
scheduleTickMsg(ctx, tbMsg);
|
||||
} else {
|
||||
|
||||
@ -41,17 +41,17 @@ import java.util.UUID;
|
||||
)
|
||||
public class TbCheckpointNode implements TbNode {
|
||||
|
||||
private QueueId queueId;
|
||||
private String queueName;
|
||||
|
||||
@Override
|
||||
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
|
||||
TbCheckpointNodeConfiguration config = TbNodeUtils.convert(configuration, TbCheckpointNodeConfiguration.class);
|
||||
this.queueId = new QueueId(UUID.fromString(config.getQueueId()));
|
||||
this.queueName = config.getQueueName();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMsg(TbContext ctx, TbMsg msg) {
|
||||
ctx.enqueueForTellNext(msg, queueId, TbRelationTypes.SUCCESS, () -> ctx.ack(msg), error -> ctx.tellFailure(msg, error));
|
||||
ctx.enqueueForTellNext(msg, queueName, TbRelationTypes.SUCCESS, () -> ctx.ack(msg), error -> ctx.tellFailure(msg, error));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -60,7 +60,7 @@ class AlarmState {
|
||||
private volatile Alarm currentAlarm;
|
||||
private volatile boolean initialFetchDone;
|
||||
private volatile TbMsgMetaData lastMsgMetaData;
|
||||
private volatile QueueId lastMsgQueueId;
|
||||
private volatile String lastMsgQueueName;
|
||||
private volatile DataSnapshot dataSnapshot;
|
||||
private final DynamicPredicateValueCtx dynamicPredicateValueCtx;
|
||||
|
||||
@ -74,7 +74,7 @@ class AlarmState {
|
||||
public boolean process(TbContext ctx, TbMsg msg, DataSnapshot data, SnapshotUpdate update) throws ExecutionException, InterruptedException {
|
||||
initCurrentAlarm(ctx);
|
||||
lastMsgMetaData = msg.getMetaData();
|
||||
lastMsgQueueId = msg.getQueueId();
|
||||
lastMsgQueueName = msg.getQueueName();
|
||||
this.dataSnapshot = data;
|
||||
try {
|
||||
return createOrClearAlarms(ctx, msg, data, update, AlarmRuleState::eval);
|
||||
@ -195,7 +195,7 @@ class AlarmState {
|
||||
metaData.putValue(DataConstants.IS_CLEARED_ALARM, Boolean.TRUE.toString());
|
||||
}
|
||||
setAlarmConditionMetadata(ruleState, metaData);
|
||||
TbMsg newMsg = ctx.newMsg(lastMsgQueueId != null ? lastMsgQueueId : null, "ALARM",
|
||||
TbMsg newMsg = ctx.newMsg(lastMsgQueueName != null ? lastMsgQueueName : null, "ALARM",
|
||||
originator, msg != null ? msg.getCustomerId() : null, metaData, data);
|
||||
ctx.enqueueForTellNext(newMsg, relationType);
|
||||
}
|
||||
|
||||
@ -116,10 +116,10 @@ public class TbSendRPCRequestNode implements TbNode {
|
||||
|
||||
ctx.getRpcService().sendRpcRequestToDevice(request, ruleEngineDeviceRpcResponse -> {
|
||||
if (ruleEngineDeviceRpcResponse.getError().isEmpty()) {
|
||||
TbMsg next = ctx.newMsg(msg.getQueueId(), msg.getType(), msg.getOriginator(), msg.getCustomerId(), msg.getMetaData(), ruleEngineDeviceRpcResponse.getResponse().orElse("{}"));
|
||||
TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getCustomerId(), msg.getMetaData(), ruleEngineDeviceRpcResponse.getResponse().orElse("{}"));
|
||||
ctx.enqueueForTellNext(next, TbRelationTypes.SUCCESS);
|
||||
} else {
|
||||
TbMsg next = ctx.newMsg(msg.getQueueId(), msg.getType(), msg.getOriginator(), msg.getCustomerId(), msg.getMetaData(), wrap("error", ruleEngineDeviceRpcResponse.getError().get().name()));
|
||||
TbMsg next = ctx.newMsg(msg.getQueueName(), msg.getType(), msg.getOriginator(), msg.getCustomerId(), msg.getMetaData(), wrap("error", ruleEngineDeviceRpcResponse.getError().get().name()));
|
||||
ctx.enqueueForTellFailure(next, ruleEngineDeviceRpcResponse.getError().get().name());
|
||||
}
|
||||
});
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user