Refactoring and fixes for partitions recalculation
This commit is contained in:
parent
2824ca6acc
commit
8e126e57dc
@ -373,10 +373,6 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
|
|||||||
} else if (toCoreNotification.hasComponentLifecycle()) {
|
} else if (toCoreNotification.hasComponentLifecycle()) {
|
||||||
handleComponentLifecycleMsg(id, ProtoUtils.fromProto(toCoreNotification.getComponentLifecycle()));
|
handleComponentLifecycleMsg(id, ProtoUtils.fromProto(toCoreNotification.getComponentLifecycle()));
|
||||||
callback.onSuccess();
|
callback.onSuccess();
|
||||||
} else if (!toCoreNotification.getComponentLifecycleMsg().isEmpty()) {
|
|
||||||
//will be removed in 3.6.1 in favour of hasComponentLifecycle()
|
|
||||||
handleComponentLifecycleMsg(id, toCoreNotification.getComponentLifecycleMsg());
|
|
||||||
callback.onSuccess();
|
|
||||||
} else if (toCoreNotification.hasEdgeEventUpdate()) {
|
} else if (toCoreNotification.hasEdgeEventUpdate()) {
|
||||||
forwardToAppActor(id, ProtoUtils.fromProto(toCoreNotification.getEdgeEventUpdate()));
|
forwardToAppActor(id, ProtoUtils.fromProto(toCoreNotification.getEdgeEventUpdate()));
|
||||||
callback.onSuccess();
|
callback.onSuccess();
|
||||||
|
|||||||
@ -18,13 +18,17 @@ package org.thingsboard.server.service.queue;
|
|||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.boot.context.event.ApplicationReadyEvent;
|
import org.springframework.boot.context.event.ApplicationReadyEvent;
|
||||||
import org.springframework.context.ApplicationEventPublisher;
|
import org.springframework.context.ApplicationEventPublisher;
|
||||||
|
import org.springframework.context.event.EventListener;
|
||||||
import org.springframework.scheduling.annotation.Scheduled;
|
import org.springframework.scheduling.annotation.Scheduled;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.thingsboard.server.actors.ActorSystemContext;
|
import org.thingsboard.server.actors.ActorSystemContext;
|
||||||
|
import org.thingsboard.server.common.data.EntityType;
|
||||||
import org.thingsboard.server.common.data.id.QueueId;
|
import org.thingsboard.server.common.data.id.QueueId;
|
||||||
import org.thingsboard.server.common.data.id.TenantId;
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
|
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
|
||||||
import org.thingsboard.server.common.data.queue.Queue;
|
import org.thingsboard.server.common.data.queue.Queue;
|
||||||
import org.thingsboard.server.common.data.rpc.RpcError;
|
import org.thingsboard.server.common.data.rpc.RpcError;
|
||||||
|
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
|
||||||
import org.thingsboard.server.common.msg.queue.ServiceType;
|
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||||
import org.thingsboard.server.common.msg.queue.TbCallback;
|
import org.thingsboard.server.common.msg.queue.TbCallback;
|
||||||
import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse;
|
import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse;
|
||||||
@ -56,6 +60,7 @@ import java.util.Optional;
|
|||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
@TbRuleEngineComponent
|
@TbRuleEngineComponent
|
||||||
@ -152,10 +157,6 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
|
|||||||
if (nfMsg.hasComponentLifecycle()) {
|
if (nfMsg.hasComponentLifecycle()) {
|
||||||
handleComponentLifecycleMsg(id, ProtoUtils.fromProto(nfMsg.getComponentLifecycle()));
|
handleComponentLifecycleMsg(id, ProtoUtils.fromProto(nfMsg.getComponentLifecycle()));
|
||||||
callback.onSuccess();
|
callback.onSuccess();
|
||||||
} else if (!nfMsg.getComponentLifecycleMsg().isEmpty()) {
|
|
||||||
//will be removed in 3.6.1 in favour of hasComponentLifecycle()
|
|
||||||
handleComponentLifecycleMsg(id, nfMsg.getComponentLifecycleMsg());
|
|
||||||
callback.onSuccess();
|
|
||||||
} else if (nfMsg.hasFromDeviceRpcResponse()) {
|
} else if (nfMsg.hasFromDeviceRpcResponse()) {
|
||||||
TransportProtos.FromDeviceRPCResponseProto proto = nfMsg.getFromDeviceRpcResponse();
|
TransportProtos.FromDeviceRPCResponseProto proto = nfMsg.getFromDeviceRpcResponse();
|
||||||
RpcError error = proto.getError() > 0 ? RpcError.values()[proto.getError()] : null;
|
RpcError error = proto.getError() > 0 ? RpcError.values()[proto.getError()] : null;
|
||||||
@ -164,10 +165,10 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
|
|||||||
tbDeviceRpcService.processRpcResponseFromDevice(response);
|
tbDeviceRpcService.processRpcResponseFromDevice(response);
|
||||||
callback.onSuccess();
|
callback.onSuccess();
|
||||||
} else if (nfMsg.hasQueueUpdateMsg()) {
|
} else if (nfMsg.hasQueueUpdateMsg()) {
|
||||||
ctx.getScheduler().execute(() -> updateQueue(nfMsg.getQueueUpdateMsg()));
|
updateQueue(nfMsg.getQueueUpdateMsg());
|
||||||
callback.onSuccess();
|
callback.onSuccess();
|
||||||
} else if (nfMsg.hasQueueDeleteMsg()) {
|
} else if (nfMsg.hasQueueDeleteMsg()) {
|
||||||
ctx.getScheduler().execute(() -> deleteQueue(nfMsg.getQueueDeleteMsg()));
|
deleteQueue(nfMsg.getQueueDeleteMsg());
|
||||||
callback.onSuccess();
|
callback.onSuccess();
|
||||||
} else {
|
} else {
|
||||||
log.trace("Received notification with missing handler");
|
log.trace("Received notification with missing handler");
|
||||||
@ -204,13 +205,30 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
|
|||||||
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueDeleteMsg.getQueueName(), tenantId);
|
QueueKey queueKey = new QueueKey(ServiceType.TB_RULE_ENGINE, queueDeleteMsg.getQueueName(), tenantId);
|
||||||
var consumerManager = consumers.remove(queueKey);
|
var consumerManager = consumers.remove(queueKey);
|
||||||
if (consumerManager != null) {
|
if (consumerManager != null) {
|
||||||
consumerManager.delete();
|
consumerManager.delete(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
partitionService.removeQueue(queueDeleteMsg);
|
partitionService.removeQueue(queueDeleteMsg);
|
||||||
partitionService.recalculatePartitions(ctx.getServiceInfoProvider().getServiceInfo(), new ArrayList<>(partitionService.getOtherServices(ServiceType.TB_RULE_ENGINE)));
|
partitionService.recalculatePartitions(ctx.getServiceInfoProvider().getServiceInfo(), new ArrayList<>(partitionService.getOtherServices(ServiceType.TB_RULE_ENGINE)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@EventListener
|
||||||
|
public void handleComponentLifecycleEvent(ComponentLifecycleMsg event) {
|
||||||
|
if (event.getEntityId().getEntityType() == EntityType.TENANT) {
|
||||||
|
if (event.getEvent() == ComponentLifecycleEvent.DELETED) {
|
||||||
|
List<QueueKey> toRemove = consumers.keySet().stream()
|
||||||
|
.filter(queueKey -> queueKey.getTenantId().equals(event.getTenantId()))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
toRemove.forEach(queueKey -> {
|
||||||
|
var consumerManager = consumers.remove(queueKey);
|
||||||
|
if (consumerManager != null) {
|
||||||
|
consumerManager.delete(false);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private TbRuleEngineQueueConsumerManager getOrCreateConsumer(QueueKey queueKey) {
|
private TbRuleEngineQueueConsumerManager getOrCreateConsumer(QueueKey queueKey) {
|
||||||
return consumers.computeIfAbsent(queueKey, key -> new TbRuleEngineQueueConsumerManager(ctx, key));
|
return consumers.computeIfAbsent(queueKey, key -> new TbRuleEngineQueueConsumerManager(ctx, key));
|
||||||
}
|
}
|
||||||
|
|||||||
@ -15,7 +15,6 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.server.service.queue.processing;
|
package org.thingsboard.server.service.queue.processing;
|
||||||
|
|
||||||
import com.google.protobuf.ByteString;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.boot.context.event.ApplicationReadyEvent;
|
import org.springframework.boot.context.event.ApplicationReadyEvent;
|
||||||
import org.springframework.context.ApplicationEventPublisher;
|
import org.springframework.context.ApplicationEventPublisher;
|
||||||
@ -30,7 +29,6 @@ import org.thingsboard.server.common.data.id.DeviceProfileId;
|
|||||||
import org.thingsboard.server.common.data.id.TenantId;
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
import org.thingsboard.server.common.data.id.TenantProfileId;
|
import org.thingsboard.server.common.data.id.TenantProfileId;
|
||||||
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
|
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
|
||||||
import org.thingsboard.server.common.msg.TbActorMsg;
|
|
||||||
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
|
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
|
||||||
import org.thingsboard.server.common.msg.queue.ServiceType;
|
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||||
import org.thingsboard.server.common.msg.queue.TbCallback;
|
import org.thingsboard.server.common.msg.queue.TbCallback;
|
||||||
@ -166,16 +164,9 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
// To be removed in 3.6.1 in favour of handleComponentLifecycleMsg(UUID id, TbActorMsg actorMsg)
|
protected final void handleComponentLifecycleMsg(UUID id, ComponentLifecycleMsg componentLifecycleMsg) {
|
||||||
protected void handleComponentLifecycleMsg(UUID id, ByteString nfMsg) {
|
TenantId tenantId = componentLifecycleMsg.getTenantId();
|
||||||
Optional<TbActorMsg> actorMsgOpt = encodingService.decode(nfMsg.toByteArray());
|
log.debug("[{}][{}][{}] Received Lifecycle event: {}", tenantId, componentLifecycleMsg.getEntityId().getEntityType(),
|
||||||
actorMsgOpt.ifPresent(tbActorMsg -> handleComponentLifecycleMsg(id, tbActorMsg));
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void handleComponentLifecycleMsg(UUID id, TbActorMsg actorMsg) {
|
|
||||||
if (actorMsg instanceof ComponentLifecycleMsg) {
|
|
||||||
ComponentLifecycleMsg componentLifecycleMsg = (ComponentLifecycleMsg) actorMsg;
|
|
||||||
log.debug("[{}][{}][{}] Received Lifecycle event: {}", componentLifecycleMsg.getTenantId(), componentLifecycleMsg.getEntityId().getEntityType(),
|
|
||||||
componentLifecycleMsg.getEntityId(), componentLifecycleMsg.getEvent());
|
componentLifecycleMsg.getEntityId(), componentLifecycleMsg.getEvent());
|
||||||
if (EntityType.TENANT_PROFILE.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
|
if (EntityType.TENANT_PROFILE.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
|
||||||
TenantProfileId tenantProfileId = new TenantProfileId(componentLifecycleMsg.getEntityId().getId());
|
TenantProfileId tenantProfileId = new TenantProfileId(componentLifecycleMsg.getEntityId().getId());
|
||||||
@ -184,39 +175,40 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene
|
|||||||
apiUsageStateService.onTenantProfileUpdate(tenantProfileId);
|
apiUsageStateService.onTenantProfileUpdate(tenantProfileId);
|
||||||
}
|
}
|
||||||
} else if (EntityType.TENANT.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
|
} else if (EntityType.TENANT.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
|
||||||
if (TenantId.SYS_TENANT_ID.equals(componentLifecycleMsg.getTenantId())) {
|
if (TenantId.SYS_TENANT_ID.equals(tenantId)) {
|
||||||
jwtSettingsService.ifPresent(JwtSettingsService::reloadJwtSettings);
|
jwtSettingsService.ifPresent(JwtSettingsService::reloadJwtSettings);
|
||||||
return;
|
return;
|
||||||
} else {
|
} else {
|
||||||
tenantProfileCache.evict(componentLifecycleMsg.getTenantId());
|
tenantProfileCache.evict(tenantId);
|
||||||
partitionService.removeTenant(componentLifecycleMsg.getTenantId());
|
partitionService.evictTenantInfo(tenantId);
|
||||||
if (componentLifecycleMsg.getEvent().equals(ComponentLifecycleEvent.UPDATED)) {
|
if (componentLifecycleMsg.getEvent().equals(ComponentLifecycleEvent.UPDATED)) {
|
||||||
apiUsageStateService.onTenantUpdate(componentLifecycleMsg.getTenantId());
|
apiUsageStateService.onTenantUpdate(tenantId);
|
||||||
} else if (componentLifecycleMsg.getEvent().equals(ComponentLifecycleEvent.DELETED)) {
|
} else if (componentLifecycleMsg.getEvent().equals(ComponentLifecycleEvent.DELETED)) {
|
||||||
apiUsageStateService.onTenantDelete((TenantId) componentLifecycleMsg.getEntityId());
|
apiUsageStateService.onTenantDelete(tenantId);
|
||||||
|
partitionService.removeTenant(tenantId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if (EntityType.DEVICE_PROFILE.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
|
} else if (EntityType.DEVICE_PROFILE.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
|
||||||
deviceProfileCache.evict(componentLifecycleMsg.getTenantId(), new DeviceProfileId(componentLifecycleMsg.getEntityId().getId()));
|
deviceProfileCache.evict(tenantId, new DeviceProfileId(componentLifecycleMsg.getEntityId().getId()));
|
||||||
} else if (EntityType.DEVICE.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
|
} else if (EntityType.DEVICE.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
|
||||||
deviceProfileCache.evict(componentLifecycleMsg.getTenantId(), new DeviceId(componentLifecycleMsg.getEntityId().getId()));
|
deviceProfileCache.evict(tenantId, new DeviceId(componentLifecycleMsg.getEntityId().getId()));
|
||||||
} else if (EntityType.ASSET_PROFILE.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
|
} else if (EntityType.ASSET_PROFILE.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
|
||||||
assetProfileCache.evict(componentLifecycleMsg.getTenantId(), new AssetProfileId(componentLifecycleMsg.getEntityId().getId()));
|
assetProfileCache.evict(tenantId, new AssetProfileId(componentLifecycleMsg.getEntityId().getId()));
|
||||||
} else if (EntityType.ASSET.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
|
} else if (EntityType.ASSET.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
|
||||||
assetProfileCache.evict(componentLifecycleMsg.getTenantId(), new AssetId(componentLifecycleMsg.getEntityId().getId()));
|
assetProfileCache.evict(tenantId, new AssetId(componentLifecycleMsg.getEntityId().getId()));
|
||||||
} else if (EntityType.ENTITY_VIEW.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
|
} else if (EntityType.ENTITY_VIEW.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
|
||||||
actorContext.getTbEntityViewService().onComponentLifecycleMsg(componentLifecycleMsg);
|
actorContext.getTbEntityViewService().onComponentLifecycleMsg(componentLifecycleMsg);
|
||||||
} else if (EntityType.API_USAGE_STATE.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
|
} else if (EntityType.API_USAGE_STATE.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
|
||||||
apiUsageStateService.onApiUsageStateUpdate(componentLifecycleMsg.getTenantId());
|
apiUsageStateService.onApiUsageStateUpdate(tenantId);
|
||||||
} else if (EntityType.CUSTOMER.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
|
} else if (EntityType.CUSTOMER.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
|
||||||
if (componentLifecycleMsg.getEvent() == ComponentLifecycleEvent.DELETED) {
|
if (componentLifecycleMsg.getEvent() == ComponentLifecycleEvent.DELETED) {
|
||||||
apiUsageStateService.onCustomerDelete((CustomerId) componentLifecycleMsg.getEntityId());
|
apiUsageStateService.onCustomerDelete((CustomerId) componentLifecycleMsg.getEntityId());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
eventPublisher.publishEvent(componentLifecycleMsg);
|
eventPublisher.publishEvent(componentLifecycleMsg);
|
||||||
}
|
log.trace("[{}] Forwarding component lifecycle message to App Actor {}", id, componentLifecycleMsg);
|
||||||
log.trace("[{}] Forwarding component lifecycle message to App Actor {}", id, actorMsg);
|
actorContext.tellWithHighPriority(componentLifecycleMsg);
|
||||||
actorContext.tellWithHighPriority(actorMsg);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract void handleNotification(UUID id, TbProtoQueueMsg<N> msg, TbCallback callback) throws Exception;
|
protected abstract void handleNotification(UUID id, TbProtoQueueMsg<N> msg, TbCallback callback) throws Exception;
|
||||||
|
|||||||
@ -15,6 +15,7 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.server.service.queue.ruleengine;
|
package org.thingsboard.server.service.queue.ruleengine;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
import lombok.Getter;
|
import lombok.Getter;
|
||||||
import lombok.ToString;
|
import lombok.ToString;
|
||||||
import org.thingsboard.server.common.data.queue.Queue;
|
import org.thingsboard.server.common.data.queue.Queue;
|
||||||
@ -24,24 +25,24 @@ import java.util.Set;
|
|||||||
|
|
||||||
@Getter
|
@Getter
|
||||||
@ToString
|
@ToString
|
||||||
|
@AllArgsConstructor
|
||||||
public class TbQueueConsumerManagerTask {
|
public class TbQueueConsumerManagerTask {
|
||||||
|
|
||||||
private final QueueEvent event;
|
private final QueueEvent event;
|
||||||
private Queue queue;
|
private Queue queue;
|
||||||
private Set<TopicPartitionInfo> partitions;
|
private Set<TopicPartitionInfo> partitions;
|
||||||
|
private boolean drainQueue;
|
||||||
|
|
||||||
public TbQueueConsumerManagerTask(QueueEvent event) {
|
public static TbQueueConsumerManagerTask delete(boolean drainQueue) {
|
||||||
this.event = event;
|
return new TbQueueConsumerManagerTask(QueueEvent.DELETE, null, null, drainQueue);
|
||||||
}
|
}
|
||||||
|
|
||||||
public TbQueueConsumerManagerTask(QueueEvent event, Queue queue) {
|
public static TbQueueConsumerManagerTask configUpdate(Queue queue) {
|
||||||
this.event = event;
|
return new TbQueueConsumerManagerTask(QueueEvent.CONFIG_UPDATE, queue, null, false);
|
||||||
this.queue = queue;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public TbQueueConsumerManagerTask(QueueEvent event, Set<TopicPartitionInfo> partitions) {
|
public static TbQueueConsumerManagerTask partitionChange(Set<TopicPartitionInfo> partitions) {
|
||||||
this.event = event;
|
return new TbQueueConsumerManagerTask(QueueEvent.PARTITION_CHANGE, null, partitions, false);
|
||||||
this.partitions = partitions;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -95,15 +95,15 @@ public class TbRuleEngineQueueConsumerManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void update(Queue queue) {
|
public void update(Queue queue) {
|
||||||
addTask(new TbQueueConsumerManagerTask(QueueEvent.CONFIG_UPDATE, queue));
|
addTask(TbQueueConsumerManagerTask.configUpdate(queue));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void update(Set<TopicPartitionInfo> partitions) {
|
public void update(Set<TopicPartitionInfo> partitions) {
|
||||||
addTask(new TbQueueConsumerManagerTask(QueueEvent.PARTITION_CHANGE, partitions));
|
addTask(TbQueueConsumerManagerTask.partitionChange(partitions));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void delete() {
|
public void delete(boolean drainQueue) {
|
||||||
addTask(new TbQueueConsumerManagerTask(QueueEvent.DELETE));
|
addTask(TbQueueConsumerManagerTask.delete(drainQueue));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addTask(TbQueueConsumerManagerTask todo) {
|
private void addTask(TbQueueConsumerManagerTask todo) {
|
||||||
@ -138,7 +138,7 @@ public class TbRuleEngineQueueConsumerManager {
|
|||||||
} else if (task.getEvent() == QueueEvent.CONFIG_UPDATE) {
|
} else if (task.getEvent() == QueueEvent.CONFIG_UPDATE) {
|
||||||
newConfiguration = task.getQueue();
|
newConfiguration = task.getQueue();
|
||||||
} else if (task.getEvent() == QueueEvent.DELETE) {
|
} else if (task.getEvent() == QueueEvent.DELETE) {
|
||||||
doDelete();
|
doDelete(task.isDrainQueue());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -205,7 +205,7 @@ public class TbRuleEngineQueueConsumerManager {
|
|||||||
log.debug("[{}] Unsubscribed and stopped consumers", queueKey);
|
log.debug("[{}] Unsubscribed and stopped consumers", queueKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void doDelete() {
|
private void doDelete(boolean drainQueue) {
|
||||||
stopped = true;
|
stopped = true;
|
||||||
log.info("[{}] Handling queue deletion", queueKey);
|
log.info("[{}] Handling queue deletion", queueKey);
|
||||||
consumerWrapper.getConsumers().forEach(TbQueueConsumerTask::awaitCompletion);
|
consumerWrapper.getConsumers().forEach(TbQueueConsumerTask::awaitCompletion);
|
||||||
@ -213,7 +213,9 @@ public class TbRuleEngineQueueConsumerManager {
|
|||||||
List<TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>>> queueConsumers = consumerWrapper.getConsumers().stream()
|
List<TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>>> queueConsumers = consumerWrapper.getConsumers().stream()
|
||||||
.map(TbQueueConsumerTask::getConsumer).collect(Collectors.toList());
|
.map(TbQueueConsumerTask::getConsumer).collect(Collectors.toList());
|
||||||
ctx.getConsumersExecutor().submit(() -> {
|
ctx.getConsumersExecutor().submit(() -> {
|
||||||
|
if (drainQueue) {
|
||||||
drainQueue(queueConsumers);
|
drainQueue(queueConsumers);
|
||||||
|
}
|
||||||
|
|
||||||
queueConsumers.forEach(consumer -> {
|
queueConsumers.forEach(consumer -> {
|
||||||
for (String topic : consumer.getFullTopicNames()) {
|
for (String topic : consumer.getFullTopicNames()) {
|
||||||
|
|||||||
@ -40,6 +40,7 @@ import org.thingsboard.server.common.data.Tenant;
|
|||||||
import org.thingsboard.server.common.data.TenantInfo;
|
import org.thingsboard.server.common.data.TenantInfo;
|
||||||
import org.thingsboard.server.common.data.TenantProfile;
|
import org.thingsboard.server.common.data.TenantProfile;
|
||||||
import org.thingsboard.server.common.data.User;
|
import org.thingsboard.server.common.data.User;
|
||||||
|
import org.thingsboard.server.common.data.exception.TenantNotFoundException;
|
||||||
import org.thingsboard.server.common.data.id.EntityId;
|
import org.thingsboard.server.common.data.id.EntityId;
|
||||||
import org.thingsboard.server.common.data.id.TenantId;
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
import org.thingsboard.server.common.data.msg.TbMsgType;
|
import org.thingsboard.server.common.data.msg.TbMsgType;
|
||||||
@ -64,6 +65,7 @@ import org.thingsboard.server.dao.service.DaoSqlTest;
|
|||||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||||
import org.thingsboard.server.queue.TbQueueAdmin;
|
import org.thingsboard.server.queue.TbQueueAdmin;
|
||||||
import org.thingsboard.server.queue.discovery.PartitionService;
|
import org.thingsboard.server.queue.discovery.PartitionService;
|
||||||
|
import org.thingsboard.server.queue.discovery.QueueKey;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
@ -71,7 +73,6 @@ import java.util.Comparator;
|
|||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Random;
|
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
@ -80,6 +81,7 @@ import java.util.function.Predicate;
|
|||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
import static org.assertj.core.api.Assertions.assertThatThrownBy;
|
||||||
import static org.awaitility.Awaitility.await;
|
import static org.awaitility.Awaitility.await;
|
||||||
import static org.hamcrest.Matchers.containsString;
|
import static org.hamcrest.Matchers.containsString;
|
||||||
import static org.mockito.ArgumentMatchers.argThat;
|
import static org.mockito.ArgumentMatchers.argThat;
|
||||||
@ -700,6 +702,45 @@ public class TenantControllerTest extends AbstractControllerTest {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void whenTenantIsDeleted_thenDeleteQueues() throws Exception {
|
||||||
|
loginSysAdmin();
|
||||||
|
TenantProfile tenantProfile = new TenantProfile();
|
||||||
|
tenantProfile.setName("Test profile");
|
||||||
|
TenantProfileData tenantProfileData = new TenantProfileData();
|
||||||
|
tenantProfileData.setConfiguration(new DefaultTenantProfileConfiguration());
|
||||||
|
tenantProfile.setProfileData(tenantProfileData);
|
||||||
|
tenantProfile.setIsolatedTbRuleEngine(true);
|
||||||
|
addQueueConfig(tenantProfile, MAIN_QUEUE_NAME);
|
||||||
|
tenantProfile = doPost("/api/tenantProfile", tenantProfile, TenantProfile.class);
|
||||||
|
createDifferentTenant();
|
||||||
|
loginSysAdmin();
|
||||||
|
savedDifferentTenant.setTenantProfileId(tenantProfile.getId());
|
||||||
|
savedDifferentTenant = doPost("/api/tenant", savedDifferentTenant, Tenant.class);
|
||||||
|
TenantId tenantId = differentTenantId;
|
||||||
|
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
|
||||||
|
assertThat(partitionService.getMyPartitions(new QueueKey(ServiceType.TB_RULE_ENGINE, tenantId))).isNotNull();
|
||||||
|
});
|
||||||
|
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, tenantId);
|
||||||
|
assertThat(tpi.getTenantId()).hasValue(tenantId);
|
||||||
|
TbMsg tbMsg = publishTbMsg(tenantId, tpi);
|
||||||
|
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
|
||||||
|
verify(actorContext).tell(argThat(msg -> {
|
||||||
|
return msg instanceof QueueToRuleEngineMsg && ((QueueToRuleEngineMsg) msg).getMsg().getId().equals(tbMsg.getId());
|
||||||
|
}));
|
||||||
|
});
|
||||||
|
|
||||||
|
deleteDifferentTenant();
|
||||||
|
|
||||||
|
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
|
||||||
|
assertThat(partitionService.getMyPartitions(new QueueKey(ServiceType.TB_RULE_ENGINE, tenantId))).isNull();
|
||||||
|
assertThatThrownBy(() -> partitionService.resolve(ServiceType.TB_RULE_ENGINE, tenantId, tenantId))
|
||||||
|
.isInstanceOf(TenantNotFoundException.class);
|
||||||
|
|
||||||
|
verify(queueAdmin).deleteTopic(eq(tpi.getFullTopicName()));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
private TbMsg publishTbMsg(TenantId tenantId, TopicPartitionInfo tpi) {
|
private TbMsg publishTbMsg(TenantId tenantId, TopicPartitionInfo tpi) {
|
||||||
TbMsg tbMsg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, tenantId, TbMsgMetaData.EMPTY, "{\"test\":1}");
|
TbMsg tbMsg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, tenantId, TbMsgMetaData.EMPTY, "{\"test\":1}");
|
||||||
TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder()
|
TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder()
|
||||||
@ -759,7 +800,7 @@ public class TenantControllerTest extends AbstractControllerTest {
|
|||||||
queueConfiguration.setName(queueName);
|
queueConfiguration.setName(queueName);
|
||||||
queueConfiguration.setTopic(topic);
|
queueConfiguration.setTopic(topic);
|
||||||
queueConfiguration.setPollInterval(25);
|
queueConfiguration.setPollInterval(25);
|
||||||
queueConfiguration.setPartitions(1 + new Random().nextInt(99));
|
queueConfiguration.setPartitions(12);
|
||||||
queueConfiguration.setConsumerPerPartition(true);
|
queueConfiguration.setConsumerPerPartition(true);
|
||||||
queueConfiguration.setPackProcessingTimeout(2000);
|
queueConfiguration.setPackProcessingTimeout(2000);
|
||||||
SubmitStrategy submitStrategy = new SubmitStrategy();
|
SubmitStrategy submitStrategy = new SubmitStrategy();
|
||||||
@ -799,10 +840,10 @@ public class TenantControllerTest extends AbstractControllerTest {
|
|||||||
ArgumentMatcher<Tenant> matcherTenant = cntTime == 1 ? argument -> argument.equals(tenant) :
|
ArgumentMatcher<Tenant> matcherTenant = cntTime == 1 ? argument -> argument.equals(tenant) :
|
||||||
argument -> argument.getClass().equals(Tenant.class);
|
argument -> argument.getClass().equals(Tenant.class);
|
||||||
if (ComponentLifecycleEvent.DELETED.equals(event)) {
|
if (ComponentLifecycleEvent.DELETED.equals(event)) {
|
||||||
Mockito.verify(tbClusterService, times( cntTime)).onTenantDelete(Mockito.argThat(matcherTenant),
|
Mockito.verify(tbClusterService, times(cntTime)).onTenantDelete(Mockito.argThat(matcherTenant),
|
||||||
Mockito.isNull());
|
Mockito.isNull());
|
||||||
} else {
|
} else {
|
||||||
Mockito.verify(tbClusterService, times( cntTime)).onTenantChange(Mockito.argThat(matcherTenant),
|
Mockito.verify(tbClusterService, times(cntTime)).onTenantChange(Mockito.argThat(matcherTenant),
|
||||||
Mockito.isNull());
|
Mockito.isNull());
|
||||||
}
|
}
|
||||||
TenantId tenantId = cntTime == 1 ? tenant.getId() : (TenantId) createEntityId_NULL_UUID(tenant);
|
TenantId tenantId = cntTime == 1 ? tenant.getId() : (TenantId) createEntityId_NULL_UUID(tenant);
|
||||||
|
|||||||
@ -447,7 +447,7 @@ public class TbRuleEngineQueueConsumerManagerTest {
|
|||||||
verifyMsgProcessed(consumer1.testMsg);
|
verifyMsgProcessed(consumer1.testMsg);
|
||||||
verifyMsgProcessed(consumer2.testMsg);
|
verifyMsgProcessed(consumer2.testMsg);
|
||||||
|
|
||||||
consumerManager.delete();
|
consumerManager.delete(true);
|
||||||
|
|
||||||
await().atMost(2, TimeUnit.SECONDS)
|
await().atMost(2, TimeUnit.SECONDS)
|
||||||
.untilAsserted(() -> {
|
.untilAsserted(() -> {
|
||||||
@ -488,7 +488,7 @@ public class TbRuleEngineQueueConsumerManagerTest {
|
|||||||
verifySubscribedAndLaunched(consumer, partitions);
|
verifySubscribedAndLaunched(consumer, partitions);
|
||||||
verifyMsgProcessed(consumer.testMsg);
|
verifyMsgProcessed(consumer.testMsg);
|
||||||
|
|
||||||
consumerManager.delete();
|
consumerManager.delete(true);
|
||||||
|
|
||||||
await().atMost(2, TimeUnit.SECONDS)
|
await().atMost(2, TimeUnit.SECONDS)
|
||||||
.untilAsserted(() -> {
|
.untilAsserted(() -> {
|
||||||
|
|||||||
@ -35,7 +35,6 @@ import org.thingsboard.server.queue.discovery.event.ServiceListChangedEvent;
|
|||||||
import org.thingsboard.server.queue.util.AfterStartUp;
|
import org.thingsboard.server.queue.util.AfterStartUp;
|
||||||
|
|
||||||
import javax.annotation.PostConstruct;
|
import javax.annotation.PostConstruct;
|
||||||
import java.nio.charset.StandardCharsets;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
@ -190,14 +189,25 @@ public class HashPartitionService implements PartitionService {
|
|||||||
myPartitions.remove(queueKey);
|
myPartitions.remove(queueKey);
|
||||||
partitionTopicsMap.remove(queueKey);
|
partitionTopicsMap.remove(queueKey);
|
||||||
partitionSizesMap.remove(queueKey);
|
partitionSizesMap.remove(queueKey);
|
||||||
//TODO: remove after merging tb entity services
|
evictTenantInfo(tenantId);
|
||||||
removeTenant(tenantId);
|
|
||||||
|
|
||||||
if (serviceInfoProvider.isService(ServiceType.TB_RULE_ENGINE)) {
|
if (serviceInfoProvider.isService(ServiceType.TB_RULE_ENGINE)) {
|
||||||
publishPartitionChangeEvent(ServiceType.TB_RULE_ENGINE, Map.of(queueKey, Collections.emptySet()));
|
publishPartitionChangeEvent(ServiceType.TB_RULE_ENGINE, Map.of(queueKey, Collections.emptySet()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void removeTenant(TenantId tenantId) {
|
||||||
|
List<QueueKey> queueKeys = partitionSizesMap.keySet().stream()
|
||||||
|
.filter(queueKey -> tenantId.equals(queueKey.getTenantId()))
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
queueKeys.forEach(queueKey -> {
|
||||||
|
myPartitions.remove(queueKey);
|
||||||
|
partitionTopicsMap.remove(queueKey);
|
||||||
|
partitionSizesMap.remove(queueKey);
|
||||||
|
});
|
||||||
|
evictTenantInfo(tenantId);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isManagedByCurrentService(TenantId tenantId) {
|
public boolean isManagedByCurrentService(TenantId tenantId) {
|
||||||
Set<UUID> assignedTenantProfiles = serviceInfoProvider.getAssignedTenantProfiles();
|
Set<UUID> assignedTenantProfiles = serviceInfoProvider.getAssignedTenantProfiles();
|
||||||
@ -258,6 +268,7 @@ public class HashPartitionService implements PartitionService {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void recalculatePartitions(ServiceInfo currentService, List<ServiceInfo> otherServices) {
|
public synchronized void recalculatePartitions(ServiceInfo currentService, List<ServiceInfo> otherServices) {
|
||||||
|
log.info("Recalculating partitions");
|
||||||
tbTransportServicesByType.clear();
|
tbTransportServicesByType.clear();
|
||||||
responsibleServices.clear();
|
responsibleServices.clear();
|
||||||
logServiceInfo(currentService);
|
logServiceInfo(currentService);
|
||||||
@ -274,10 +285,15 @@ public class HashPartitionService implements PartitionService {
|
|||||||
final ConcurrentMap<QueueKey, List<Integer>> newPartitions = new ConcurrentHashMap<>();
|
final ConcurrentMap<QueueKey, List<Integer>> newPartitions = new ConcurrentHashMap<>();
|
||||||
partitionSizesMap.forEach((queueKey, size) -> {
|
partitionSizesMap.forEach((queueKey, size) -> {
|
||||||
for (int i = 0; i < size; i++) {
|
for (int i = 0; i < size; i++) {
|
||||||
|
try {
|
||||||
ServiceInfo serviceInfo = resolveByPartitionIdx(queueServicesMap.get(queueKey), queueKey, i);
|
ServiceInfo serviceInfo = resolveByPartitionIdx(queueServicesMap.get(queueKey), queueKey, i);
|
||||||
|
log.trace("Server responsible for {}[{}] - {}", queueKey, i, serviceInfo != null ? serviceInfo.getServiceId() : "none");
|
||||||
if (currentService.equals(serviceInfo)) {
|
if (currentService.equals(serviceInfo)) {
|
||||||
newPartitions.computeIfAbsent(queueKey, key -> new ArrayList<>()).add(i);
|
newPartitions.computeIfAbsent(queueKey, key -> new ArrayList<>()).add(i);
|
||||||
}
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.warn("Failed to resolve server responsible for {}[{}]", queueKey, i, e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
@ -399,7 +415,7 @@ public class HashPartitionService implements PartitionService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void removeTenant(TenantId tenantId) {
|
public void evictTenantInfo(TenantId tenantId) {
|
||||||
tenantRoutingInfoMap.remove(tenantId);
|
tenantRoutingInfoMap.remove(tenantId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -59,7 +59,7 @@ public interface PartitionService {
|
|||||||
|
|
||||||
int resolvePartitionIndex(UUID entityId, int partitions);
|
int resolvePartitionIndex(UUID entityId, int partitions);
|
||||||
|
|
||||||
void removeTenant(TenantId tenantId);
|
void evictTenantInfo(TenantId tenantId);
|
||||||
|
|
||||||
int countTransportsByType(String type);
|
int countTransportsByType(String type);
|
||||||
|
|
||||||
@ -67,6 +67,8 @@ public interface PartitionService {
|
|||||||
|
|
||||||
void removeQueue(TransportProtos.QueueDeleteMsg queueDeleteMsg);
|
void removeQueue(TransportProtos.QueueDeleteMsg queueDeleteMsg);
|
||||||
|
|
||||||
|
void removeTenant(TenantId tenantId);
|
||||||
|
|
||||||
boolean isManagedByCurrentService(TenantId tenantId);
|
boolean isManagedByCurrentService(TenantId tenantId);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -96,6 +96,7 @@ import org.thingsboard.server.queue.TbQueueProducer;
|
|||||||
import org.thingsboard.server.queue.TbQueueRequestTemplate;
|
import org.thingsboard.server.queue.TbQueueRequestTemplate;
|
||||||
import org.thingsboard.server.queue.common.AsyncCallbackTemplate;
|
import org.thingsboard.server.queue.common.AsyncCallbackTemplate;
|
||||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
|
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
|
||||||
|
import org.thingsboard.server.queue.discovery.QueueKey;
|
||||||
import org.thingsboard.server.queue.discovery.TopicService;
|
import org.thingsboard.server.queue.discovery.TopicService;
|
||||||
import org.thingsboard.server.queue.discovery.PartitionService;
|
import org.thingsboard.server.queue.discovery.PartitionService;
|
||||||
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
|
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
|
||||||
@ -1001,8 +1002,8 @@ public class DefaultTransportService implements TransportService {
|
|||||||
Optional<Tenant> profileOpt = dataDecodingEncodingService.decode(msg.getData().toByteArray());
|
Optional<Tenant> profileOpt = dataDecodingEncodingService.decode(msg.getData().toByteArray());
|
||||||
if (profileOpt.isPresent()) {
|
if (profileOpt.isPresent()) {
|
||||||
Tenant tenant = profileOpt.get();
|
Tenant tenant = profileOpt.get();
|
||||||
partitionService.removeTenant(tenant.getId());
|
|
||||||
boolean updated = tenantProfileCache.put(tenant.getId(), tenant.getTenantProfileId());
|
boolean updated = tenantProfileCache.put(tenant.getId(), tenant.getTenantProfileId());
|
||||||
|
partitionService.evictTenantInfo(tenant.getId());
|
||||||
if (updated) {
|
if (updated) {
|
||||||
rateLimitService.update(tenant.getId());
|
rateLimitService.update(tenant.getId());
|
||||||
}
|
}
|
||||||
@ -1027,7 +1028,9 @@ public class DefaultTransportService implements TransportService {
|
|||||||
} else if (EntityType.TENANT_PROFILE.equals(entityType)) {
|
} else if (EntityType.TENANT_PROFILE.equals(entityType)) {
|
||||||
tenantProfileCache.remove(new TenantProfileId(entityUuid));
|
tenantProfileCache.remove(new TenantProfileId(entityUuid));
|
||||||
} else if (EntityType.TENANT.equals(entityType)) {
|
} else if (EntityType.TENANT.equals(entityType)) {
|
||||||
rateLimitService.remove(TenantId.fromUUID(entityUuid));
|
TenantId tenantId = TenantId.fromUUID(entityUuid);
|
||||||
|
rateLimitService.remove(tenantId);
|
||||||
|
partitionService.removeTenant(tenantId);
|
||||||
} else if (EntityType.DEVICE.equals(entityType)) {
|
} else if (EntityType.DEVICE.equals(entityType)) {
|
||||||
rateLimitService.remove(new DeviceId(entityUuid));
|
rateLimitService.remove(new DeviceId(entityUuid));
|
||||||
onDeviceDeleted(new DeviceId(entityUuid));
|
onDeviceDeleted(new DeviceId(entityUuid));
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user