using repartitionExecutor instead of synchronized

This commit is contained in:
YevhenBondarenko 2022-05-19 00:36:26 +02:00
parent 92117d1280
commit 4f7c166566
2 changed files with 5 additions and 4 deletions

View File

@ -393,10 +393,10 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
tbDeviceRpcService.processRpcResponseFromDevice(response); tbDeviceRpcService.processRpcResponseFromDevice(response);
callback.onSuccess(); callback.onSuccess();
} else if (nfMsg.hasQueueUpdateMsg()) { } else if (nfMsg.hasQueueUpdateMsg()) {
updateQueue(nfMsg.getQueueUpdateMsg()); repartitionExecutor.execute(() -> updateQueue(nfMsg.getQueueUpdateMsg()));
callback.onSuccess(); callback.onSuccess();
} else if (nfMsg.hasQueueDeleteMsg()) { } else if (nfMsg.hasQueueDeleteMsg()) {
deleteQueue(nfMsg.getQueueDeleteMsg()); repartitionExecutor.execute(() -> deleteQueue(nfMsg.getQueueDeleteMsg()));
callback.onSuccess(); callback.onSuccess();
} else { } else {
log.trace("Received notification with missing handler"); log.trace("Received notification with missing handler");
@ -404,7 +404,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
} }
} }
private synchronized void updateQueue(TransportProtos.QueueUpdateMsg queueUpdateMsg) { private void updateQueue(TransportProtos.QueueUpdateMsg queueUpdateMsg) {
log.info("Received queue update msg: [{}]", queueUpdateMsg); log.info("Received queue update msg: [{}]", queueUpdateMsg);
String queueName = queueUpdateMsg.getQueueName(); String queueName = queueUpdateMsg.getQueueName();
TenantId tenantId = new TenantId(new UUID(queueUpdateMsg.getTenantIdMSB(), queueUpdateMsg.getTenantIdLSB())); TenantId tenantId = new TenantId(new UUID(queueUpdateMsg.getTenantIdMSB(), queueUpdateMsg.getTenantIdLSB()));

View File

@ -173,7 +173,6 @@ public class HashPartitionService implements PartitionService {
} else { } else {
QueueRoutingInfo queueRoutingInfo = queuesById.get(queueId); QueueRoutingInfo queueRoutingInfo = queuesById.get(queueId);
//TODO: replace if we can notify CheckPoint rule nodes about queue changes
if (queueRoutingInfo == null) { if (queueRoutingInfo == null) {
log.debug("Queue was removed but still used in CheckPoint rule node. [{}][{}]", tenantId, entityId); log.debug("Queue was removed but still used in CheckPoint rule node. [{}][{}]", tenantId, entityId);
queueKey = getMainQueueKey(serviceType, tenantId); queueKey = getMainQueueKey(serviceType, tenantId);
@ -205,6 +204,8 @@ public class HashPartitionService implements PartitionService {
@Override @Override
public synchronized void recalculatePartitions(ServiceInfo currentService, List<ServiceInfo> otherServices) { public synchronized void recalculatePartitions(ServiceInfo currentService, List<ServiceInfo> otherServices) {
partitionsInit();
tbTransportServicesByType.clear(); tbTransportServicesByType.clear();
logServiceInfo(currentService); logServiceInfo(currentService);
otherServices.forEach(this::logServiceInfo); otherServices.forEach(this::logServiceInfo);