diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java index b04291a303..c073aa6a95 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java @@ -19,7 +19,6 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Service; -import org.thingsboard.server.common.msg.rule.engine.DeviceAttributesEventNotificationMsg; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.EntityType; @@ -37,15 +36,16 @@ import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; +import org.thingsboard.server.common.msg.rule.engine.DeviceAttributesEventNotificationMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; import org.thingsboard.server.queue.TbQueueProducer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; -import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbApplicationEventListener; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; -import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; +import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.discovery.event.OtherServiceShutdownEvent; +import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; import org.thingsboard.server.queue.provider.TbQueueProducerProvider; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.state.DefaultDeviceStateService; @@ -154,7 +154,7 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene protected void onTbApplicationEvent(PartitionChangeEvent partitionChangeEvent) { if (ServiceType.TB_CORE.equals(partitionChangeEvent.getServiceType())) { entitySubscriptions.values().removeIf(sub -> - !partitionService.resolve(ServiceType.TB_CORE, sub.getTenantId(), sub.getEntityId()).isMyPartition()); + !partitionService.isMyPartition(ServiceType.TB_CORE, sub.getTenantId(), sub.getEntityId())); } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java index bfc50e076b..de3c394c22 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java @@ -251,7 +251,12 @@ public class HashPartitionService implements PartitionService { @Override public boolean isMyPartition(ServiceType serviceType, TenantId tenantId, EntityId entityId) { - return resolve(serviceType, tenantId, entityId).isMyPartition(); + try { + return resolve(serviceType, tenantId, entityId).isMyPartition(); + } catch (TenantNotFoundException e) { + log.warn("Tenant with id {} not found", tenantId, new RuntimeException("stacktrace")); + return false; + } } private TopicPartitionInfo resolve(QueueKey queueKey, EntityId entityId) { @@ -376,7 +381,12 @@ public class HashPartitionService implements PartitionService { .collect(Collectors.toList())) .collect(Collectors.joining(System.lineSeparator()))); } - applicationEventPublisher.publishEvent(new PartitionChangeEvent(this, serviceType, partitionsMap)); + PartitionChangeEvent event = new PartitionChangeEvent(this, serviceType, partitionsMap); + try { + applicationEventPublisher.publishEvent(event); + } catch (Exception e) { + log.error("Failed to publish partition change event {}", event, e); + } } @Override diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TbApplicationEventListener.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TbApplicationEventListener.java index 8e805aac3d..341ab0f1eb 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TbApplicationEventListener.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/TbApplicationEventListener.java @@ -41,7 +41,11 @@ public abstract class TbApplicationEventListener i seqNumberLock.unlock(); } if (validUpdate && filterTbApplicationEvent(event)) { - onTbApplicationEvent(event); + try { + onTbApplicationEvent(event); + } catch (Exception e) { + log.error("Failed to handle partition change event: {}", event, e); + } } else { log.info("Application event ignored due to invalid sequence number ({} > {}). Event: {}", lastProcessedSequenceNumber, event.getSequenceNumber(), event); } diff --git a/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultClusterVersionControlService.java b/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultClusterVersionControlService.java index fa45ddf98c..1d78b5ddd1 100644 --- a/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultClusterVersionControlService.java +++ b/common/version-control/src/main/java/org/thingsboard/server/service/sync/vc/DefaultClusterVersionControlService.java @@ -160,7 +160,7 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe @Override protected void onTbApplicationEvent(PartitionChangeEvent event) { for (TenantId tenantId : vcService.getActiveRepositoryTenants()) { - if (!partitionService.resolve(ServiceType.TB_VC_EXECUTOR, tenantId, tenantId).isMyPartition()) { + if (!partitionService.isMyPartition(ServiceType.TB_VC_EXECUTOR, tenantId, tenantId)) { var lock = getRepoLock(tenantId); lock.lock(); try {