Merge pull request #10093 from thingsboard/fix/recalculate-partitions-error
Error handling for PartitionChangeEvent listeners
This commit is contained in:
commit
4df22aa063
@ -19,7 +19,6 @@ import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.context.event.EventListener;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.server.common.msg.rule.engine.DeviceAttributesEventNotificationMsg;
|
||||
import org.thingsboard.server.cluster.TbClusterService;
|
||||
import org.thingsboard.server.common.data.DataConstants;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
@ -37,15 +36,16 @@ import org.thingsboard.server.common.data.kv.TsKvEntry;
|
||||
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||
import org.thingsboard.server.common.msg.queue.TbCallback;
|
||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||
import org.thingsboard.server.common.msg.rule.engine.DeviceAttributesEventNotificationMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
|
||||
import org.thingsboard.server.queue.TbQueueProducer;
|
||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
|
||||
import org.thingsboard.server.queue.discovery.TopicService;
|
||||
import org.thingsboard.server.queue.discovery.PartitionService;
|
||||
import org.thingsboard.server.queue.discovery.TbApplicationEventListener;
|
||||
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
|
||||
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
|
||||
import org.thingsboard.server.queue.discovery.TopicService;
|
||||
import org.thingsboard.server.queue.discovery.event.OtherServiceShutdownEvent;
|
||||
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
|
||||
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
import org.thingsboard.server.service.state.DefaultDeviceStateService;
|
||||
@ -154,7 +154,7 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene
|
||||
protected void onTbApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
|
||||
if (ServiceType.TB_CORE.equals(partitionChangeEvent.getServiceType())) {
|
||||
entitySubscriptions.values().removeIf(sub ->
|
||||
!partitionService.resolve(ServiceType.TB_CORE, sub.getTenantId(), sub.getEntityId()).isMyPartition());
|
||||
!partitionService.isMyPartition(ServiceType.TB_CORE, sub.getTenantId(), sub.getEntityId()));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -251,7 +251,12 @@ public class HashPartitionService implements PartitionService {
|
||||
|
||||
@Override
|
||||
public boolean isMyPartition(ServiceType serviceType, TenantId tenantId, EntityId entityId) {
|
||||
try {
|
||||
return resolve(serviceType, tenantId, entityId).isMyPartition();
|
||||
} catch (TenantNotFoundException e) {
|
||||
log.warn("Tenant with id {} not found", tenantId, new RuntimeException("stacktrace"));
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private TopicPartitionInfo resolve(QueueKey queueKey, EntityId entityId) {
|
||||
@ -376,7 +381,12 @@ public class HashPartitionService implements PartitionService {
|
||||
.collect(Collectors.toList()))
|
||||
.collect(Collectors.joining(System.lineSeparator())));
|
||||
}
|
||||
applicationEventPublisher.publishEvent(new PartitionChangeEvent(this, serviceType, partitionsMap));
|
||||
PartitionChangeEvent event = new PartitionChangeEvent(this, serviceType, partitionsMap);
|
||||
try {
|
||||
applicationEventPublisher.publishEvent(event);
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to publish partition change event {}", event, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -41,7 +41,11 @@ public abstract class TbApplicationEventListener<T extends TbApplicationEvent> i
|
||||
seqNumberLock.unlock();
|
||||
}
|
||||
if (validUpdate && filterTbApplicationEvent(event)) {
|
||||
try {
|
||||
onTbApplicationEvent(event);
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to handle partition change event: {}", event, e);
|
||||
}
|
||||
} else {
|
||||
log.info("Application event ignored due to invalid sequence number ({} > {}). Event: {}", lastProcessedSequenceNumber, event.getSequenceNumber(), event);
|
||||
}
|
||||
|
||||
@ -160,7 +160,7 @@ public class DefaultClusterVersionControlService extends TbApplicationEventListe
|
||||
@Override
|
||||
protected void onTbApplicationEvent(PartitionChangeEvent event) {
|
||||
for (TenantId tenantId : vcService.getActiveRepositoryTenants()) {
|
||||
if (!partitionService.resolve(ServiceType.TB_VC_EXECUTOR, tenantId, tenantId).isMyPartition()) {
|
||||
if (!partitionService.isMyPartition(ServiceType.TB_VC_EXECUTOR, tenantId, tenantId)) {
|
||||
var lock = getRepoLock(tenantId);
|
||||
lock.lock();
|
||||
try {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user