Single update message for multiple queues
This commit is contained in:
parent
bf0cc22223
commit
a571153b7c
@ -50,22 +50,15 @@ public class DefaultTbQueueService extends AbstractTbEntityService implements Tb
|
||||
public Queue saveQueue(Queue queue) {
|
||||
boolean create = queue.getId() == null;
|
||||
Queue oldQueue;
|
||||
|
||||
if (create) {
|
||||
oldQueue = null;
|
||||
} else {
|
||||
oldQueue = queueService.findQueueById(queue.getTenantId(), queue.getId());
|
||||
}
|
||||
|
||||
//TODO: add checkNotNull
|
||||
Queue savedQueue = queueService.saveQueue(queue);
|
||||
|
||||
if (create) {
|
||||
onQueueCreated(savedQueue);
|
||||
} else {
|
||||
onQueueUpdated(savedQueue, oldQueue);
|
||||
}
|
||||
|
||||
createTopicsIfNeeded(savedQueue, oldQueue);
|
||||
tbClusterService.onQueuesUpdate(List.of(savedQueue));
|
||||
return savedQueue;
|
||||
}
|
||||
|
||||
@ -73,54 +66,14 @@ public class DefaultTbQueueService extends AbstractTbEntityService implements Tb
|
||||
public void deleteQueue(TenantId tenantId, QueueId queueId) {
|
||||
Queue queue = queueService.findQueueById(tenantId, queueId);
|
||||
queueService.deleteQueue(tenantId, queueId);
|
||||
onQueueDeleted(queue);
|
||||
tbClusterService.onQueuesDelete(List.of(queue));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteQueueByQueueName(TenantId tenantId, String queueName) {
|
||||
Queue queue = queueService.findQueueByTenantIdAndNameInternal(tenantId, queueName);
|
||||
queueService.deleteQueue(tenantId, queue.getId());
|
||||
onQueueDeleted(queue);
|
||||
}
|
||||
|
||||
private void onQueueCreated(Queue queue) {
|
||||
for (int i = 0; i < queue.getPartitions(); i++) {
|
||||
tbQueueAdmin.createTopicIfNotExists(
|
||||
new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName(),
|
||||
queue.getCustomProperties()
|
||||
);
|
||||
}
|
||||
|
||||
tbClusterService.onQueueChange(queue);
|
||||
}
|
||||
|
||||
private void onQueueUpdated(Queue queue, Queue oldQueue) {
|
||||
int oldPartitions = oldQueue.getPartitions();
|
||||
int currentPartitions = queue.getPartitions();
|
||||
|
||||
if (currentPartitions != oldPartitions) {
|
||||
if (currentPartitions > oldPartitions) {
|
||||
log.info("Added [{}] new partitions to [{}] queue", currentPartitions - oldPartitions, queue.getName());
|
||||
for (int i = oldPartitions; i < currentPartitions; i++) {
|
||||
tbQueueAdmin.createTopicIfNotExists(
|
||||
new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName(),
|
||||
queue.getCustomProperties()
|
||||
);
|
||||
}
|
||||
tbClusterService.onQueueChange(queue);
|
||||
} else {
|
||||
log.info("Removed [{}] partitions from [{}] queue", oldPartitions - currentPartitions, queue.getName());
|
||||
tbClusterService.onQueueChange(queue);
|
||||
// TODO: move all the messages left in old partitions and delete topics
|
||||
}
|
||||
} else if (!oldQueue.equals(queue)) {
|
||||
tbClusterService.onQueueChange(queue);
|
||||
}
|
||||
}
|
||||
|
||||
private void onQueueDeleted(Queue queue) {
|
||||
tbClusterService.onQueueDelete(queue);
|
||||
// queueStatsService.deleteQueueStatsByQueueId(tenantId, queueId);
|
||||
tbClusterService.onQueuesDelete(List.of(queue));
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -176,26 +129,56 @@ public class DefaultTbQueueService extends AbstractTbEntityService implements Tb
|
||||
log.debug("[{}] Handling profile queue config update: creating queues {}, updating {}, deleting {}. Affected tenants: {}",
|
||||
newTenantProfile.getUuidId(), toCreate, toUpdate, toRemove, tenantIds);
|
||||
}
|
||||
tenantIds.forEach(tenantId -> {
|
||||
toCreate.forEach(key -> saveQueue(new Queue(tenantId, newQueues.get(key))));
|
||||
|
||||
toUpdate.forEach(key -> {
|
||||
Queue queueToUpdate = new Queue(tenantId, newQueues.get(key));
|
||||
Queue foundQueue = queueService.findQueueByTenantIdAndName(tenantId, key);
|
||||
queueToUpdate.setId(foundQueue.getId());
|
||||
queueToUpdate.setCreatedTime(foundQueue.getCreatedTime());
|
||||
List<Queue> updated = new ArrayList<>();
|
||||
List<Queue> deleted = new ArrayList<>();
|
||||
for (TenantId tenantId : tenantIds) {
|
||||
for (String name : toCreate) {
|
||||
updated.add(new Queue(tenantId, newQueues.get(name)));
|
||||
}
|
||||
|
||||
if (!queueToUpdate.equals(foundQueue)) {
|
||||
saveQueue(queueToUpdate);
|
||||
for (String name : toUpdate) {
|
||||
Queue queue = new Queue(tenantId, newQueues.get(name));
|
||||
Queue foundQueue = queueService.findQueueByTenantIdAndName(tenantId, name);
|
||||
if (foundQueue != null) {
|
||||
queue.setId(foundQueue.getId());
|
||||
queue.setCreatedTime(foundQueue.getCreatedTime());
|
||||
}
|
||||
});
|
||||
if (!queue.equals(foundQueue)) {
|
||||
updated.add(queue);
|
||||
createTopicsIfNeeded(queue, foundQueue);
|
||||
}
|
||||
}
|
||||
|
||||
toRemove.forEach(q -> {
|
||||
Queue queue = queueService.findQueueByTenantIdAndNameInternal(tenantId, q);
|
||||
QueueId queueIdForRemove = queue.getId();
|
||||
deleteQueue(tenantId, queueIdForRemove);
|
||||
for (String name : toRemove) {
|
||||
Queue queue = queueService.findQueueByTenantIdAndNameInternal(tenantId, name);
|
||||
deleted.add(queue);
|
||||
}
|
||||
}
|
||||
|
||||
if (!updated.isEmpty()) {
|
||||
updated = updated.stream()
|
||||
.map(queueService::saveQueue)
|
||||
.collect(Collectors.toList());
|
||||
tbClusterService.onQueuesUpdate(updated);
|
||||
}
|
||||
if (!deleted.isEmpty()) {
|
||||
deleted.forEach(queue -> {
|
||||
queueService.deleteQueue(queue.getTenantId(), queue.getId());
|
||||
});
|
||||
});
|
||||
tbClusterService.onQueuesDelete(deleted);
|
||||
}
|
||||
}
|
||||
|
||||
private void createTopicsIfNeeded(Queue queue, Queue oldQueue) {
|
||||
int newPartitions = queue.getPartitions();
|
||||
int oldPartitions = oldQueue != null ? oldQueue.getPartitions() : 0;
|
||||
for (int i = oldPartitions; i < newPartitions; i++) {
|
||||
tbQueueAdmin.createTopicIfNotExists(
|
||||
new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName(),
|
||||
queue.getCustomProperties()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -59,6 +59,8 @@ import org.thingsboard.server.common.msg.rule.engine.DeviceEdgeUpdateMsg;
|
||||
import org.thingsboard.server.common.msg.rule.engine.DeviceNameOrTypeUpdateMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.FromDeviceRPCResponseProto;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.QueueDeleteMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.QueueUpdateMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
|
||||
@ -68,8 +70,8 @@ import org.thingsboard.server.queue.TbQueueCallback;
|
||||
import org.thingsboard.server.queue.TbQueueProducer;
|
||||
import org.thingsboard.server.queue.common.MultipleTbQueueCallbackWrapper;
|
||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
|
||||
import org.thingsboard.server.queue.discovery.TopicService;
|
||||
import org.thingsboard.server.queue.discovery.PartitionService;
|
||||
import org.thingsboard.server.queue.discovery.TopicService;
|
||||
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
|
||||
import org.thingsboard.server.queue.util.DataDecodingEncodingService;
|
||||
import org.thingsboard.server.service.gateway_device.GatewayNotificationsService;
|
||||
@ -77,9 +79,11 @@ import org.thingsboard.server.service.ota.OtaPackageStateService;
|
||||
import org.thingsboard.server.service.profile.TbAssetProfileCache;
|
||||
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.thingsboard.server.common.util.ProtoUtils.toProto;
|
||||
|
||||
@ -552,40 +556,40 @@ public class DefaultTbClusterService implements TbClusterService {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onQueueChange(Queue queue) {
|
||||
log.trace("[{}][{}] Processing queue change [{}] event", queue.getTenantId(), queue.getId(), queue.getName());
|
||||
public void onQueuesUpdate(List<Queue> queues) {
|
||||
List<QueueUpdateMsg> queueUpdateMsgs = queues.stream()
|
||||
.map(queue -> QueueUpdateMsg.newBuilder()
|
||||
.setTenantIdMSB(queue.getTenantId().getId().getMostSignificantBits())
|
||||
.setTenantIdLSB(queue.getTenantId().getId().getLeastSignificantBits())
|
||||
.setQueueIdMSB(queue.getId().getId().getMostSignificantBits())
|
||||
.setQueueIdLSB(queue.getId().getId().getLeastSignificantBits())
|
||||
.setQueueName(queue.getName())
|
||||
.setQueueTopic(queue.getTopic())
|
||||
.setPartitions(queue.getPartitions())
|
||||
.build())
|
||||
.collect(Collectors.toList());
|
||||
|
||||
TransportProtos.QueueUpdateMsg queueUpdateMsg = TransportProtos.QueueUpdateMsg.newBuilder()
|
||||
.setTenantIdMSB(queue.getTenantId().getId().getMostSignificantBits())
|
||||
.setTenantIdLSB(queue.getTenantId().getId().getLeastSignificantBits())
|
||||
.setQueueIdMSB(queue.getId().getId().getMostSignificantBits())
|
||||
.setQueueIdLSB(queue.getId().getId().getLeastSignificantBits())
|
||||
.setQueueName(queue.getName())
|
||||
.setQueueTopic(queue.getTopic())
|
||||
.setPartitions(queue.getPartitions())
|
||||
.build();
|
||||
|
||||
ToRuleEngineNotificationMsg ruleEngineMsg = ToRuleEngineNotificationMsg.newBuilder().setQueueUpdateMsg(queueUpdateMsg).build();
|
||||
ToCoreNotificationMsg coreMsg = ToCoreNotificationMsg.newBuilder().setQueueUpdateMsg(queueUpdateMsg).build();
|
||||
ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setQueueUpdateMsg(queueUpdateMsg).build();
|
||||
ToRuleEngineNotificationMsg ruleEngineMsg = ToRuleEngineNotificationMsg.newBuilder().addAllQueueUpdateMsgs(queueUpdateMsgs).build();
|
||||
ToCoreNotificationMsg coreMsg = ToCoreNotificationMsg.newBuilder().addAllQueueUpdateMsgs(queueUpdateMsgs).build();
|
||||
ToTransportMsg transportMsg = ToTransportMsg.newBuilder().addAllQueueUpdateMsgs(queueUpdateMsgs).build();
|
||||
doSendQueueNotifications(ruleEngineMsg, coreMsg, transportMsg);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onQueueDelete(Queue queue) {
|
||||
log.trace("[{}][{}] Processing queue delete [{}] event", queue.getTenantId(), queue.getId(), queue.getName());
|
||||
public void onQueuesDelete(List<Queue> queues) {
|
||||
List<QueueDeleteMsg> queueDeleteMsgs = queues.stream()
|
||||
.map(queue -> QueueDeleteMsg.newBuilder()
|
||||
.setTenantIdMSB(queue.getTenantId().getId().getMostSignificantBits())
|
||||
.setTenantIdLSB(queue.getTenantId().getId().getLeastSignificantBits())
|
||||
.setQueueIdMSB(queue.getId().getId().getMostSignificantBits())
|
||||
.setQueueIdLSB(queue.getId().getId().getLeastSignificantBits())
|
||||
.setQueueName(queue.getName())
|
||||
.build())
|
||||
.collect(Collectors.toList());
|
||||
|
||||
TransportProtos.QueueDeleteMsg queueDeleteMsg = TransportProtos.QueueDeleteMsg.newBuilder()
|
||||
.setTenantIdMSB(queue.getTenantId().getId().getMostSignificantBits())
|
||||
.setTenantIdLSB(queue.getTenantId().getId().getLeastSignificantBits())
|
||||
.setQueueIdMSB(queue.getId().getId().getMostSignificantBits())
|
||||
.setQueueIdLSB(queue.getId().getId().getLeastSignificantBits())
|
||||
.setQueueName(queue.getName())
|
||||
.build();
|
||||
|
||||
ToRuleEngineNotificationMsg ruleEngineMsg = ToRuleEngineNotificationMsg.newBuilder().setQueueDeleteMsg(queueDeleteMsg).build();
|
||||
ToCoreNotificationMsg coreMsg = ToCoreNotificationMsg.newBuilder().setQueueDeleteMsg(queueDeleteMsg).build();
|
||||
ToTransportMsg transportMsg = ToTransportMsg.newBuilder().setQueueDeleteMsg(queueDeleteMsg).build();
|
||||
ToRuleEngineNotificationMsg ruleEngineMsg = ToRuleEngineNotificationMsg.newBuilder().addAllQueueDeleteMsgs(queueDeleteMsgs).build();
|
||||
ToCoreNotificationMsg coreMsg = ToCoreNotificationMsg.newBuilder().addAllQueueDeleteMsgs(queueDeleteMsgs).build();
|
||||
ToTransportMsg transportMsg = ToTransportMsg.newBuilder().addAllQueueDeleteMsgs(queueDeleteMsgs).build();
|
||||
doSendQueueNotifications(ruleEngineMsg, coreMsg, transportMsg);
|
||||
}
|
||||
|
||||
|
||||
@ -391,13 +391,11 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
|
||||
} else if (!toCoreNotification.getFromEdgeSyncResponseMsg().isEmpty()) {
|
||||
//will be removed in 3.6.1 in favour of hasFromEdgeSyncResponse()
|
||||
forwardToAppActor(id, encodingService.decode(toCoreNotification.getFromEdgeSyncResponseMsg().toByteArray()), callback);
|
||||
} else if (toCoreNotification.hasQueueUpdateMsg()) {
|
||||
TransportProtos.QueueUpdateMsg queue = toCoreNotification.getQueueUpdateMsg();
|
||||
partitionService.updateQueue(queue);
|
||||
} else if (toCoreNotification.getQueueUpdateMsgsCount() > 0) {
|
||||
partitionService.updateQueues(toCoreNotification.getQueueUpdateMsgsList());
|
||||
callback.onSuccess();
|
||||
} else if (toCoreNotification.hasQueueDeleteMsg()) {
|
||||
TransportProtos.QueueDeleteMsg queue = toCoreNotification.getQueueDeleteMsg();
|
||||
partitionService.removeQueue(queue);
|
||||
} else if (toCoreNotification.getQueueDeleteMsgsCount() > 0) {
|
||||
partitionService.removeQueues(toCoreNotification.getQueueDeleteMsgsList());
|
||||
callback.onSuccess();
|
||||
} else if (toCoreNotification.hasVcResponseMsg()) {
|
||||
vcQueueService.processResponse(toCoreNotification.getVcResponseMsg());
|
||||
|
||||
@ -36,6 +36,8 @@ import org.thingsboard.server.common.util.ProtoUtils;
|
||||
import org.thingsboard.server.dao.queue.QueueService;
|
||||
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.QueueDeleteMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.QueueUpdateMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
|
||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
|
||||
import org.thingsboard.server.queue.discovery.PartitionService;
|
||||
@ -164,11 +166,11 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
|
||||
, proto.getResponse(), error);
|
||||
tbDeviceRpcService.processRpcResponseFromDevice(response);
|
||||
callback.onSuccess();
|
||||
} else if (nfMsg.hasQueueUpdateMsg()) {
|
||||
updateQueue(nfMsg.getQueueUpdateMsg());
|
||||
} else if (nfMsg.getQueueUpdateMsgsCount() > 0) {
|
||||
updateQueues(nfMsg.getQueueUpdateMsgsList());
|
||||
callback.onSuccess();
|
||||
} else if (nfMsg.hasQueueDeleteMsg()) {
|
||||
deleteQueue(nfMsg.getQueueDeleteMsg());
|
||||
} else if (nfMsg.getQueueDeleteMsgsCount() > 0) {
|
||||
deleteQueues(nfMsg.getQueueDeleteMsgsList());
|
||||
callback.onSuccess();
|
||||
} else {
|
||||
log.trace("Received notification with missing handler");
|
||||
@ -176,39 +178,48 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
|
||||
}
|
||||
}
|
||||
|
||||
private void updateQueue(TransportProtos.QueueUpdateMsg queueUpdateMsg) {
|
||||
log.info("Received queue update msg: [{}]", queueUpdateMsg);
|
||||
TenantId tenantId = new TenantId(new UUID(queueUpdateMsg.getTenantIdMSB(), queueUpdateMsg.getTenantIdLSB()));
|
||||
if (partitionService.isManagedByCurrentService(tenantId)) {
|
||||
QueueId queueId = new QueueId(new UUID(queueUpdateMsg.getQueueIdMSB(), queueUpdateMsg.getQueueIdLSB()));
|
||||
String queueName = queueUpdateMsg.getQueueName();
|
||||
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueName, tenantId);
|
||||
Queue queue = queueService.findQueueById(tenantId, queueId);
|
||||
private void updateQueues(List<QueueUpdateMsg> queueUpdateMsgs) {
|
||||
boolean partitionsChanged = false;
|
||||
for (QueueUpdateMsg queueUpdateMsg : queueUpdateMsgs) {
|
||||
log.info("Received queue update msg: [{}]", queueUpdateMsg);
|
||||
TenantId tenantId = new TenantId(new UUID(queueUpdateMsg.getTenantIdMSB(), queueUpdateMsg.getTenantIdLSB()));
|
||||
if (partitionService.isManagedByCurrentService(tenantId)) {
|
||||
QueueId queueId = new QueueId(new UUID(queueUpdateMsg.getQueueIdMSB(), queueUpdateMsg.getQueueIdLSB()));
|
||||
String queueName = queueUpdateMsg.getQueueName();
|
||||
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueName, tenantId);
|
||||
Queue queue = queueService.findQueueById(tenantId, queueId);
|
||||
|
||||
TbRuleEngineQueueConsumerManager consumerManager = getOrCreateConsumer(queueKey);
|
||||
Queue oldQueue = consumerManager.getQueue();
|
||||
consumerManager.update(queue);
|
||||
TbRuleEngineQueueConsumerManager consumerManager = getOrCreateConsumer(queueKey);
|
||||
Queue oldQueue = consumerManager.getQueue();
|
||||
consumerManager.update(queue);
|
||||
|
||||
if (oldQueue != null && queue.getPartitions() == oldQueue.getPartitions()) {
|
||||
return;
|
||||
if (oldQueue == null || queue.getPartitions() != oldQueue.getPartitions()) {
|
||||
partitionsChanged = true;
|
||||
}
|
||||
} else {
|
||||
partitionsChanged = true;
|
||||
}
|
||||
}
|
||||
|
||||
partitionService.updateQueue(queueUpdateMsg);
|
||||
partitionService.recalculatePartitions(ctx.getServiceInfoProvider().getServiceInfo(),
|
||||
new ArrayList<>(partitionService.getOtherServices(ServiceType.TB_RULE_ENGINE)));
|
||||
if (partitionsChanged) {
|
||||
partitionService.updateQueues(queueUpdateMsgs);
|
||||
partitionService.recalculatePartitions(ctx.getServiceInfoProvider().getServiceInfo(),
|
||||
new ArrayList<>(partitionService.getOtherServices(ServiceType.TB_RULE_ENGINE)));
|
||||
}
|
||||
}
|
||||
|
||||
private void deleteQueue(TransportProtos.QueueDeleteMsg queueDeleteMsg) {
|
||||
log.info("Received queue delete msg: [{}]", queueDeleteMsg);
|
||||
TenantId tenantId = new TenantId(new UUID(queueDeleteMsg.getTenantIdMSB(), queueDeleteMsg.getTenantIdLSB()));
|
||||
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueDeleteMsg.getQueueName(), tenantId);
|
||||
var consumerManager = consumers.remove(queueKey);
|
||||
if (consumerManager != null) {
|
||||
consumerManager.delete(true);
|
||||
private void deleteQueues(List<QueueDeleteMsg> queueDeleteMsgs) {
|
||||
for (QueueDeleteMsg queueDeleteMsg : queueDeleteMsgs) {
|
||||
log.info("Received queue delete msg: [{}]", queueDeleteMsg);
|
||||
TenantId tenantId = new TenantId(new UUID(queueDeleteMsg.getTenantIdMSB(), queueDeleteMsg.getTenantIdLSB()));
|
||||
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueDeleteMsg.getQueueName(), tenantId);
|
||||
var consumerManager = consumers.remove(queueKey);
|
||||
if (consumerManager != null) {
|
||||
consumerManager.delete(true);
|
||||
}
|
||||
}
|
||||
|
||||
partitionService.removeQueue(queueDeleteMsg);
|
||||
partitionService.removeQueues(queueDeleteMsgs);
|
||||
partitionService.recalculatePartitions(ctx.getServiceInfoProvider().getServiceInfo(), new ArrayList<>(partitionService.getOtherServices(ServiceType.TB_RULE_ENGINE)));
|
||||
}
|
||||
|
||||
|
||||
@ -184,9 +184,9 @@ public class TbCoreConsumerStats {
|
||||
toCoreNfEdgeSyncResponseCounter.increment();
|
||||
} else if (!msg.getFromEdgeSyncResponseMsg().isEmpty()) {
|
||||
toCoreNfEdgeSyncResponseCounter.increment();
|
||||
} else if (msg.hasQueueUpdateMsg()) {
|
||||
} else if (msg.getQueueUpdateMsgsCount() > 0) {
|
||||
toCoreNfQueueUpdateCounter.increment();
|
||||
} else if (msg.hasQueueDeleteMsg()) {
|
||||
} else if (msg.getQueueDeleteMsgsCount() > 0) {
|
||||
toCoreNfQueueDeleteCounter.increment();
|
||||
} else if (msg.hasVcResponseMsg()) {
|
||||
toCoreNfVersionControlResponseCounter.increment();
|
||||
|
||||
@ -315,7 +315,7 @@ public class HashPartitionServiceTest {
|
||||
.setPartitions(isolatedQueue.getPartitions())
|
||||
.build();
|
||||
|
||||
partitionService_common.updateQueue(queueUpdateMsg);
|
||||
partitionService_common.updateQueues(List.of(queueUpdateMsg));
|
||||
partitionService_common.recalculatePartitions(commonRuleEngine, List.of(dedicatedRuleEngine));
|
||||
// expecting event about no partitions for isolated queue key
|
||||
verifyPartitionChangeEvent(event -> {
|
||||
@ -323,7 +323,7 @@ public class HashPartitionServiceTest {
|
||||
return event.getPartitionsMap().get(queueKey).isEmpty();
|
||||
});
|
||||
|
||||
partitionService_dedicated.updateQueue(queueUpdateMsg);
|
||||
partitionService_dedicated.updateQueues(List.of(queueUpdateMsg));
|
||||
partitionService_dedicated.recalculatePartitions(dedicatedRuleEngine, List.of(commonRuleEngine));
|
||||
verifyPartitionChangeEvent(event -> {
|
||||
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.MAIN_QUEUE_NAME, tenantId);
|
||||
@ -342,7 +342,7 @@ public class HashPartitionServiceTest {
|
||||
.setQueueIdLSB(isolatedQueue.getUuidId().getLeastSignificantBits())
|
||||
.setQueueName(isolatedQueue.getName())
|
||||
.build();
|
||||
partitionService_dedicated.removeQueue(queueDeleteMsg);
|
||||
partitionService_dedicated.removeQueues(List.of(queueDeleteMsg));
|
||||
verifyPartitionChangeEvent(event -> {
|
||||
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, DataConstants.MAIN_QUEUE_NAME, tenantId);
|
||||
return event.getPartitionsMap().get(queueKey).isEmpty();
|
||||
|
||||
@ -40,6 +40,7 @@ import org.thingsboard.server.service.gateway_device.GatewayNotificationsService
|
||||
import org.thingsboard.server.service.profile.TbAssetProfileCache;
|
||||
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
@ -92,7 +93,7 @@ public class DefaultTbClusterServiceTest {
|
||||
|
||||
when(producerProvider.getRuleEngineNotificationsMsgProducer()).thenReturn(tbQueueProducer);
|
||||
|
||||
clusterService.onQueueChange(createTestQueue());
|
||||
clusterService.onQueuesUpdate(List.of(createTestQueue()));
|
||||
|
||||
verify(topicService, times(1)).getNotificationsTopic(ServiceType.TB_RULE_ENGINE, MONOLITH);
|
||||
verify(topicService, never()).getNotificationsTopic(eq(ServiceType.TB_CORE), any());
|
||||
@ -117,7 +118,7 @@ public class DefaultTbClusterServiceTest {
|
||||
|
||||
when(producerProvider.getRuleEngineNotificationsMsgProducer()).thenReturn(tbQueueProducer);
|
||||
|
||||
clusterService.onQueueChange(createTestQueue());
|
||||
clusterService.onQueuesUpdate(List.of(createTestQueue()));
|
||||
|
||||
verify(topicService, times(1)).getNotificationsTopic(ServiceType.TB_RULE_ENGINE, monolith1);
|
||||
verify(topicService, times(1)).getNotificationsTopic(ServiceType.TB_RULE_ENGINE, monolith2);
|
||||
@ -145,7 +146,7 @@ public class DefaultTbClusterServiceTest {
|
||||
when(producerProvider.getRuleEngineNotificationsMsgProducer()).thenReturn(tbREQueueProducer);
|
||||
when(producerProvider.getTransportNotificationsMsgProducer()).thenReturn(tbTransportQueueProducer);
|
||||
|
||||
clusterService.onQueueChange(createTestQueue());
|
||||
clusterService.onQueuesUpdate(List.of(createTestQueue()));
|
||||
|
||||
verify(topicService, times(1)).getNotificationsTopic(ServiceType.TB_RULE_ENGINE, MONOLITH);
|
||||
verify(topicService, times(1)).getNotificationsTopic(ServiceType.TB_TRANSPORT, TRANSPORT);
|
||||
@ -191,7 +192,7 @@ public class DefaultTbClusterServiceTest {
|
||||
when(producerProvider.getTbCoreNotificationsMsgProducer()).thenReturn(tbCoreQueueProducer);
|
||||
when(producerProvider.getTransportNotificationsMsgProducer()).thenReturn(tbTransportQueueProducer);
|
||||
|
||||
clusterService.onQueueChange(createTestQueue());
|
||||
clusterService.onQueuesUpdate(List.of(createTestQueue()));
|
||||
|
||||
verify(topicService, times(1)).getNotificationsTopic(ServiceType.TB_RULE_ENGINE, monolith1);
|
||||
verify(topicService, times(1)).getNotificationsTopic(ServiceType.TB_RULE_ENGINE, monolith2);
|
||||
|
||||
@ -17,8 +17,12 @@ package org.thingsboard.server.queue;
|
||||
|
||||
import org.thingsboard.server.common.data.queue.Queue;
|
||||
|
||||
public interface TbQueueClusterService {
|
||||
void onQueueChange(Queue queue);
|
||||
import java.util.List;
|
||||
|
||||
public interface TbQueueClusterService {
|
||||
|
||||
void onQueuesUpdate(List<Queue> queues);
|
||||
|
||||
void onQueuesDelete(List<Queue> queues);
|
||||
|
||||
void onQueueDelete(Queue queue);
|
||||
}
|
||||
|
||||
@ -1280,8 +1280,8 @@ message ToCoreNotificationMsg {
|
||||
FromDeviceRPCResponseProto fromDeviceRpcResponse = 2;
|
||||
bytes componentLifecycleMsg = 3 [deprecated = true];
|
||||
bytes edgeEventUpdateMsg = 4 [deprecated = true];
|
||||
QueueUpdateMsg queueUpdateMsg = 5;
|
||||
QueueDeleteMsg queueDeleteMsg = 6;
|
||||
repeated QueueUpdateMsg queueUpdateMsgs = 5;
|
||||
repeated QueueDeleteMsg queueDeleteMsgs = 6;
|
||||
VersionControlResponseMsg vcResponseMsg = 7;
|
||||
bytes toEdgeSyncRequestMsg = 8 [deprecated = true];
|
||||
bytes fromEdgeSyncResponseMsg = 9 [deprecated = true];
|
||||
@ -1307,8 +1307,8 @@ message ToRuleEngineMsg {
|
||||
message ToRuleEngineNotificationMsg {
|
||||
bytes componentLifecycleMsg = 1 [deprecated = true];
|
||||
FromDeviceRPCResponseProto fromDeviceRpcResponse = 2;
|
||||
QueueUpdateMsg queueUpdateMsg = 3;
|
||||
QueueDeleteMsg queueDeleteMsg = 4;
|
||||
repeated QueueUpdateMsg queueUpdateMsgs = 3;
|
||||
repeated QueueDeleteMsg queueDeleteMsgs = 4;
|
||||
ComponentLifecycleMsgProto componentLifecycle = 5;
|
||||
}
|
||||
|
||||
@ -1328,8 +1328,8 @@ message ToTransportMsg {
|
||||
ResourceUpdateMsg resourceUpdateMsg = 12;
|
||||
ResourceDeleteMsg resourceDeleteMsg = 13;
|
||||
UplinkNotificationMsg uplinkNotificationMsg = 14;
|
||||
QueueUpdateMsg queueUpdateMsg = 15;
|
||||
QueueDeleteMsg queueDeleteMsg = 16;
|
||||
repeated QueueUpdateMsg queueUpdateMsgs = 15;
|
||||
repeated QueueDeleteMsg queueDeleteMsgs = 16;
|
||||
}
|
||||
|
||||
message UsageStatsKVProto{
|
||||
|
||||
@ -171,27 +171,36 @@ public class HashPartitionService implements PartitionService {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateQueue(TransportProtos.QueueUpdateMsg queueUpdateMsg) {
|
||||
TenantId tenantId = new TenantId(new UUID(queueUpdateMsg.getTenantIdMSB(), queueUpdateMsg.getTenantIdLSB()));
|
||||
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueUpdateMsg.getQueueName(), tenantId);
|
||||
partitionTopicsMap.put(queueKey, queueUpdateMsg.getQueueTopic());
|
||||
partitionSizesMap.put(queueKey, queueUpdateMsg.getPartitions());
|
||||
myPartitions.remove(queueKey);
|
||||
if (!tenantId.isSysTenantId()) {
|
||||
tenantRoutingInfoMap.remove(tenantId);
|
||||
public void updateQueues(List<TransportProtos.QueueUpdateMsg> queueUpdateMsgs) {
|
||||
for (TransportProtos.QueueUpdateMsg queueUpdateMsg : queueUpdateMsgs) {
|
||||
TenantId tenantId = TenantId.fromUUID(new UUID(queueUpdateMsg.getTenantIdMSB(), queueUpdateMsg.getTenantIdLSB()));
|
||||
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueUpdateMsg.getQueueName(), tenantId);
|
||||
partitionTopicsMap.put(queueKey, queueUpdateMsg.getQueueTopic());
|
||||
partitionSizesMap.put(queueKey, queueUpdateMsg.getPartitions());
|
||||
myPartitions.remove(queueKey);
|
||||
if (!tenantId.isSysTenantId()) {
|
||||
tenantRoutingInfoMap.remove(tenantId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeQueue(TransportProtos.QueueDeleteMsg queueDeleteMsg) {
|
||||
TenantId tenantId = new TenantId(new UUID(queueDeleteMsg.getTenantIdMSB(), queueDeleteMsg.getTenantIdLSB()));
|
||||
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueDeleteMsg.getQueueName(), tenantId);
|
||||
myPartitions.remove(queueKey);
|
||||
partitionTopicsMap.remove(queueKey);
|
||||
partitionSizesMap.remove(queueKey);
|
||||
evictTenantInfo(tenantId);
|
||||
public void removeQueues(List<TransportProtos.QueueDeleteMsg> queueDeleteMsgs) {
|
||||
List<QueueKey> queueKeys = queueDeleteMsgs.stream()
|
||||
.map(queueDeleteMsg -> {
|
||||
TenantId tenantId = TenantId.fromUUID(new UUID(queueDeleteMsg.getTenantIdMSB(), queueDeleteMsg.getTenantIdLSB()));
|
||||
return new QueueKey(ServiceType.TB_RULE_ENGINE, queueDeleteMsg.getQueueName(), tenantId);
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
queueKeys.forEach(queueKey -> {
|
||||
myPartitions.remove(queueKey);
|
||||
partitionTopicsMap.remove(queueKey);
|
||||
partitionSizesMap.remove(queueKey);
|
||||
evictTenantInfo(queueKey.getTenantId());
|
||||
});
|
||||
if (serviceInfoProvider.isService(ServiceType.TB_RULE_ENGINE)) {
|
||||
publishPartitionChangeEvent(ServiceType.TB_RULE_ENGINE, Map.of(queueKey, Collections.emptySet()));
|
||||
publishPartitionChangeEvent(ServiceType.TB_RULE_ENGINE, queueKeys.stream()
|
||||
.collect(Collectors.toMap(k -> k, k -> Collections.emptySet())));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -63,9 +63,9 @@ public interface PartitionService {
|
||||
|
||||
int countTransportsByType(String type);
|
||||
|
||||
void updateQueue(TransportProtos.QueueUpdateMsg queueUpdateMsg);
|
||||
void updateQueues(List<TransportProtos.QueueUpdateMsg> queueUpdateMsgs);
|
||||
|
||||
void removeQueue(TransportProtos.QueueDeleteMsg queueDeleteMsg);
|
||||
void removeQueues(List<TransportProtos.QueueDeleteMsg> queueDeleteMsgs);
|
||||
|
||||
void removeTenant(TenantId tenantId);
|
||||
|
||||
|
||||
@ -50,9 +50,9 @@ import org.thingsboard.server.common.data.id.RuleChainId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.id.TenantProfileId;
|
||||
import org.thingsboard.server.common.data.limit.LimitedApi;
|
||||
import org.thingsboard.server.common.data.msg.TbMsgType;
|
||||
import org.thingsboard.server.common.data.notification.rule.trigger.RateLimitsTrigger;
|
||||
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
|
||||
import org.thingsboard.server.common.data.msg.TbMsgType;
|
||||
import org.thingsboard.server.common.data.rpc.RpcStatus;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
||||
@ -96,10 +96,9 @@ import org.thingsboard.server.queue.TbQueueProducer;
|
||||
import org.thingsboard.server.queue.TbQueueRequestTemplate;
|
||||
import org.thingsboard.server.queue.common.AsyncCallbackTemplate;
|
||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
|
||||
import org.thingsboard.server.queue.discovery.QueueKey;
|
||||
import org.thingsboard.server.queue.discovery.TopicService;
|
||||
import org.thingsboard.server.queue.discovery.PartitionService;
|
||||
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
|
||||
import org.thingsboard.server.queue.discovery.TopicService;
|
||||
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
|
||||
import org.thingsboard.server.queue.provider.TbTransportQueueFactory;
|
||||
import org.thingsboard.server.queue.scheduler.SchedulerComponent;
|
||||
@ -1056,10 +1055,10 @@ public class DefaultTransportService implements TransportService {
|
||||
log.warn("ResourceDelete - [{}] [{}]", id, mdRez);
|
||||
transportCallbackExecutor.submit(() -> mdRez.getListener().onResourceDelete(msg));
|
||||
});
|
||||
} else if (toSessionMsg.hasQueueUpdateMsg()) {
|
||||
partitionService.updateQueue(toSessionMsg.getQueueUpdateMsg());
|
||||
} else if (toSessionMsg.hasQueueDeleteMsg()) {
|
||||
partitionService.removeQueue(toSessionMsg.getQueueDeleteMsg());
|
||||
} else if (toSessionMsg.getQueueUpdateMsgsCount() > 0) {
|
||||
partitionService.updateQueues(toSessionMsg.getQueueUpdateMsgsList());
|
||||
} else if (toSessionMsg.getQueueDeleteMsgsCount() > 0) {
|
||||
partitionService.removeQueues(toSessionMsg.getQueueDeleteMsgsList());
|
||||
} else {
|
||||
//TODO: should we notify the device actor about missed session?
|
||||
log.debug("[{}] Missing session.", sessionId);
|
||||
|
||||
@ -57,9 +57,6 @@ public class BaseQueueService extends AbstractEntityService implements QueueServ
|
||||
@Autowired
|
||||
private DataValidator<Queue> queueValidator;
|
||||
|
||||
// @Autowired
|
||||
// private QueueStatsService queueStatsService;
|
||||
|
||||
@Override
|
||||
public Queue saveQueue(Queue queue) {
|
||||
log.trace("Executing createOrUpdateQueue [{}]", queue);
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user