Merge branch 'usage-service-improvements' of https://github.com/YevhenBondarenko/thingsboard

This commit is contained in:
Andrii Shvaika 2022-02-18 12:34:37 +02:00
commit a7da56c270
3 changed files with 217 additions and 35 deletions

View File

@ -16,6 +16,10 @@
package org.thingsboard.server.service.apiusage; package org.thingsboard.server.service.apiusage;
import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.checkerframework.checker.nullness.qual.Nullable; import org.checkerframework.checker.nullness.qual.Nullable;
@ -23,9 +27,9 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy; import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.rule.engine.api.MailService; import org.thingsboard.rule.engine.api.MailService;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.ApiFeature; import org.thingsboard.server.common.data.ApiFeature;
import org.thingsboard.server.common.data.ApiUsageRecordKey; import org.thingsboard.server.common.data.ApiUsageRecordKey;
import org.thingsboard.server.common.data.ApiUsageState; import org.thingsboard.server.common.data.ApiUsageState;
@ -49,8 +53,8 @@ import org.thingsboard.server.common.data.tenant.profile.TenantProfileConfigurat
import org.thingsboard.server.common.data.tenant.profile.TenantProfileData; import org.thingsboard.server.common.data.tenant.profile.TenantProfileData;
import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.common.msg.tools.SchedulerUtils; import org.thingsboard.server.common.msg.tools.SchedulerUtils;
import org.thingsboard.server.dao.customer.CustomerService;
import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.dao.tenant.TenantService;
import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.dao.timeseries.TimeseriesService;
@ -58,11 +62,11 @@ import org.thingsboard.server.dao.usagerecord.ApiUsageStateService;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.UsageStatsKVProto; import org.thingsboard.server.gen.transport.TransportProtos.UsageStatsKVProto;
import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbApplicationEventListener; import org.thingsboard.server.queue.discovery.TbApplicationEventListener;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
import org.thingsboard.server.queue.scheduler.SchedulerComponent; import org.thingsboard.server.queue.scheduler.SchedulerComponent;
import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.service.executors.DbCallbackExecutorService;
import org.thingsboard.server.service.telemetry.InternalTelemetryService; import org.thingsboard.server.service.telemetry.InternalTelemetryService;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
@ -73,13 +77,15 @@ import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Queue;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
@ -102,23 +108,25 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
private final TbClusterService clusterService; private final TbClusterService clusterService;
private final PartitionService partitionService; private final PartitionService partitionService;
private final TenantService tenantService; private final TenantService tenantService;
private final CustomerService customerService;
private final TimeseriesService tsService; private final TimeseriesService tsService;
private final ApiUsageStateService apiUsageStateService; private final ApiUsageStateService apiUsageStateService;
private final SchedulerComponent scheduler; private final SchedulerComponent scheduler;
private final TbTenantProfileCache tenantProfileCache; private final TbTenantProfileCache tenantProfileCache;
private final MailService mailService; private final MailService mailService;
private final DbCallbackExecutorService dbExecutor;
@Lazy @Lazy
@Autowired @Autowired
private InternalTelemetryService tsWsService; private InternalTelemetryService tsWsService;
// Entities that should be processed on this server // Entities that should be processed on this server
private final Map<EntityId, BaseApiUsageState> myUsageStates = new ConcurrentHashMap<>(); final Map<EntityId, BaseApiUsageState> myUsageStates = new ConcurrentHashMap<>();
// Entities that should be processed on other servers // Entities that should be processed on other servers
private final Map<EntityId, ApiUsageState> otherUsageStates = new ConcurrentHashMap<>(); final Map<EntityId, ApiUsageState> otherUsageStates = new ConcurrentHashMap<>();
private final Set<EntityId> deletedEntities = Collections.newSetFromMap(new ConcurrentHashMap<>()); final ConcurrentMap<TopicPartitionInfo, Set<EntityId>> partitionedEntities = new ConcurrentHashMap<>();
final Set<EntityId> deletedEntities = Collections.newSetFromMap(new ConcurrentHashMap<>());
@Value("${usage.stats.report.enabled:true}") @Value("${usage.stats.report.enabled:true}")
private boolean enabled; private boolean enabled;
@ -130,25 +138,29 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
private final ExecutorService mailExecutor; private final ExecutorService mailExecutor;
private ListeningScheduledExecutorService tenantStateExecutor;
final Queue<Set<TopicPartitionInfo>> subscribeQueue = new ConcurrentLinkedQueue<>();
public DefaultTbApiUsageStateService(TbClusterService clusterService, public DefaultTbApiUsageStateService(TbClusterService clusterService,
PartitionService partitionService, PartitionService partitionService,
TenantService tenantService, TenantService tenantService,
CustomerService customerService,
TimeseriesService tsService, TimeseriesService tsService,
ApiUsageStateService apiUsageStateService, ApiUsageStateService apiUsageStateService,
SchedulerComponent scheduler, SchedulerComponent scheduler,
TbTenantProfileCache tenantProfileCache, TbTenantProfileCache tenantProfileCache,
MailService mailService) { MailService mailService,
DbCallbackExecutorService dbExecutor) {
this.clusterService = clusterService; this.clusterService = clusterService;
this.partitionService = partitionService; this.partitionService = partitionService;
this.tenantService = tenantService; this.tenantService = tenantService;
this.customerService = customerService;
this.tsService = tsService; this.tsService = tsService;
this.apiUsageStateService = apiUsageStateService; this.apiUsageStateService = apiUsageStateService;
this.scheduler = scheduler; this.scheduler = scheduler;
this.tenantProfileCache = tenantProfileCache; this.tenantProfileCache = tenantProfileCache;
this.mailService = mailService; this.mailService = mailService;
this.mailExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("api-usage-svc-mail")); this.mailExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("api-usage-svc-mail"));
this.dbExecutor = dbExecutor;
} }
@PostConstruct @PostConstruct
@ -158,6 +170,7 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
scheduler.scheduleAtFixedRate(this::checkStartOfNextCycle, nextCycleCheckInterval, nextCycleCheckInterval, TimeUnit.MILLISECONDS); scheduler.scheduleAtFixedRate(this::checkStartOfNextCycle, nextCycleCheckInterval, nextCycleCheckInterval, TimeUnit.MILLISECONDS);
log.info("Started api usage service."); log.info("Started api usage service.");
} }
tenantStateExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("tenant-state")));
} }
@Override @Override
@ -219,13 +232,53 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
@Override @Override
protected void onTbApplicationEvent(PartitionChangeEvent partitionChangeEvent) { protected void onTbApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
if (partitionChangeEvent.getServiceType().equals(ServiceType.TB_CORE)) { if (partitionChangeEvent.getServiceType().equals(ServiceType.TB_CORE)) {
myUsageStates.entrySet().removeIf(entry -> { subscribeQueue.add(partitionChangeEvent.getPartitions());
return !partitionService.resolve(ServiceType.TB_CORE, entry.getValue().getTenantId(), entry.getKey()).isMyPartition(); tenantStateExecutor.submit(this::pollInitStateFromDB);
}
}
void pollInitStateFromDB() {
final Set<TopicPartitionInfo> partitions = getLatestPartitionsFromQueue();
if (partitions == null) {
log.info("Tenant state service. Nothing to do. partitions is null");
return;
}
initStateFromDB(partitions);
}
Set<TopicPartitionInfo> getLatestPartitionsFromQueue() {
log.debug("getLatestPartitionsFromQueue, queue size {}", subscribeQueue.size());
Set<TopicPartitionInfo> partitions = null;
while (!subscribeQueue.isEmpty()) {
partitions = subscribeQueue.poll();
log.debug("polled from the queue partitions {}", partitions);
}
log.debug("getLatestPartitionsFromQueue, partitions {}", partitions);
return partitions;
}
private void initStateFromDB(Set<TopicPartitionInfo> partitions) {
try {
Set<TopicPartitionInfo> addedPartitions = new HashSet<>(partitions);
addedPartitions.removeAll(partitionedEntities.keySet());
Set<TopicPartitionInfo> removedPartitions = new HashSet<>(partitionedEntities.keySet());
removedPartitions.removeAll(partitions);
removedPartitions.forEach(partition -> {
Set<EntityId> entities = partitionedEntities.remove(partition);
entities.forEach(this::cleanUpEntitiesStateMap);
}); });
otherUsageStates.entrySet().removeIf(entry -> {
return partitionService.resolve(ServiceType.TB_CORE, entry.getValue().getTenantId(), entry.getKey()).isMyPartition(); addedPartitions.forEach(tpi ->
}); partitionedEntities.computeIfAbsent(tpi, key -> ConcurrentHashMap.newKeySet()));
initStatesFromDataBase();
otherUsageStates.entrySet().removeIf(entry ->
partitionService.resolve(ServiceType.TB_CORE, entry.getValue().getTenantId(), entry.getKey()).isMyPartition());
initStatesFromDataBase(addedPartitions);
} catch (Throwable t) {
log.warn("Failed to init tenant states from DB", t);
} }
} }
@ -311,6 +364,18 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
oldProfileData.getConfiguration(), profile.getProfileData().getConfiguration()); oldProfileData.getConfiguration(), profile.getProfileData().getConfiguration());
} }
private void addEntityState(TopicPartitionInfo tpi, BaseApiUsageState state) {
EntityId entityId = state.getEntityId();
Set<EntityId> entityIds = partitionedEntities.get(tpi);
if (entityIds != null) {
entityIds.add(entityId);
myUsageStates.put(entityId, state);
} else {
log.debug("[{}] belongs to external partition {}", entityId, tpi.getFullTopicName());
throw new RuntimeException(entityId.getEntityType() + " belongs to external partition " + tpi.getFullTopicName() + "!");
}
}
private void updateProfileThresholds(TenantId tenantId, ApiUsageStateId id, private void updateProfileThresholds(TenantId tenantId, ApiUsageStateId id,
TenantProfileConfiguration oldData, TenantProfileConfiguration newData) { TenantProfileConfiguration oldData, TenantProfileConfiguration newData) {
long ts = System.currentTimeMillis(); long ts = System.currentTimeMillis();
@ -339,6 +404,10 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
myUsageStates.remove(customerId); myUsageStates.remove(customerId);
} }
private void cleanUpEntitiesStateMap(EntityId entityId) {
myUsageStates.remove(entityId);
}
private void persistAndNotify(BaseApiUsageState state, Map<ApiFeature, ApiUsageStateValue> result) { private void persistAndNotify(BaseApiUsageState state, Map<ApiFeature, ApiUsageStateValue> result) {
log.info("[{}] Detected update of the API state for {}: {}", state.getEntityId(), state.getEntityType(), result); log.info("[{}] Detected update of the API state for {}: {}", state.getEntityId(), state.getEntityType(), result);
apiUsageStateService.update(state.getApiUsageState()); apiUsageStateService.update(state.getApiUsageState());
@ -420,7 +489,7 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
tsWsService.saveAndNotifyInternal(state.getTenantId(), state.getApiUsageState().getId(), counts, VOID_CALLBACK); tsWsService.saveAndNotifyInternal(state.getTenantId(), state.getApiUsageState().getId(), counts, VOID_CALLBACK);
} }
private BaseApiUsageState getOrFetchState(TenantId tenantId, EntityId entityId) { BaseApiUsageState getOrFetchState(TenantId tenantId, EntityId entityId) {
if (entityId == null || entityId.isNullUid()) { if (entityId == null || entityId.isNullUid()) {
entityId = tenantId; entityId = tenantId;
} }
@ -473,7 +542,12 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
} }
} }
log.debug("[{}] Initialized state: {}", entityId, storedState); log.debug("[{}] Initialized state: {}", entityId, storedState);
myUsageStates.put(entityId, state); TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId);
if (tpi.isMyPartition()) {
addEntityState(tpi, state);
} else {
otherUsageStates.put(entityId, state.getApiUsageState());
}
saveNewCounts(state, newCounts); saveNewCounts(state, newCounts);
} catch (InterruptedException | ExecutionException e) { } catch (InterruptedException | ExecutionException e) {
log.warn("[{}] Failed to fetch api usage state from db.", tenantId, e); log.warn("[{}] Failed to fetch api usage state from db.", tenantId, e);
@ -482,38 +556,41 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
return state; return state;
} }
private void initStatesFromDataBase() { private void initStatesFromDataBase(Set<TopicPartitionInfo> addedPartitions) {
if (addedPartitions.isEmpty()) {
return;
}
try { try {
log.info("Initializing tenant states."); log.info("Initializing tenant states.");
updateLock.lock(); updateLock.lock();
try {
ExecutorService tmpInitExecutor = ThingsBoardExecutors.newWorkStealingPool(20, "init-tenant-states-from-db");
try { try {
PageDataIterable<Tenant> tenantIterator = new PageDataIterable<>(tenantService::findTenants, 1024); PageDataIterable<Tenant> tenantIterator = new PageDataIterable<>(tenantService::findTenants, 1024);
List<Future<?>> futures = new ArrayList<>(); List<ListenableFuture<?>> futures = new ArrayList<>();
for (Tenant tenant : tenantIterator) { for (Tenant tenant : tenantIterator) {
if (!myUsageStates.containsKey(tenant.getId()) && partitionService.resolve(ServiceType.TB_CORE, tenant.getId(), tenant.getId()).isMyPartition()) { TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenant.getId(), tenant.getId());
if (addedPartitions.contains(tpi)) {
if (!myUsageStates.containsKey(tenant.getId()) && tpi.isMyPartition()) {
log.debug("[{}] Initializing tenant state.", tenant.getId()); log.debug("[{}] Initializing tenant state.", tenant.getId());
futures.add(tmpInitExecutor.submit(() -> { futures.add(dbExecutor.submit(() -> {
try { try {
updateTenantState((TenantApiUsageState) getOrFetchState(tenant.getId(), tenant.getId()), tenantProfileCache.get(tenant.getTenantProfileId())); updateTenantState((TenantApiUsageState) getOrFetchState(tenant.getId(), tenant.getId()), tenantProfileCache.get(tenant.getTenantProfileId()));
log.debug("[{}] Initialized tenant state.", tenant.getId()); log.debug("[{}] Initialized tenant state.", tenant.getId());
} catch (Exception e) { } catch (Exception e) {
log.warn("[{}] Failed to initialize tenant API state", tenant.getId(), e); log.warn("[{}] Failed to initialize tenant API state", tenant.getId(), e);
} }
return null;
})); }));
} }
} else {
log.debug("[{}][{}] Tenant doesn't belong to current partition. tpi [{}]", tenant.getName(), tenant.getId(), tpi);
} }
for (Future<?> future : futures) {
future.get();
}
} finally {
tmpInitExecutor.shutdownNow();
} }
Futures.whenAllComplete(futures);
} finally { } finally {
updateLock.unlock(); updateLock.unlock();
} }
log.info("Initialized tenant states."); log.info("Initialized {} tenant states.", myUsageStates.size());
} catch (Exception e) { } catch (Exception e) {
log.warn("Unknown failure", e); log.warn("Unknown failure", e);
} }
@ -524,5 +601,8 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener<Pa
if (mailExecutor != null) { if (mailExecutor != null) {
mailExecutor.shutdownNow(); mailExecutor.shutdownNow();
} }
if (tenantStateExecutor != null) {
tenantStateExecutor.shutdownNow();
}
} }
} }

View File

@ -0,0 +1,101 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.apiusage;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.thingsboard.rule.engine.api.MailService;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.ApiUsageState;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
import org.thingsboard.server.dao.tenant.TenantService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.dao.usagerecord.ApiUsageStateService;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.scheduler.SchedulerComponent;
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
import java.util.UUID;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.BDDMockito.willReturn;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
@RunWith(MockitoJUnitRunner.class)
public class DefaultTbApiUsageStateServiceTest {
@Mock
TenantService tenantService;
@Mock
TimeseriesService tsService;
@Mock
TbClusterService clusterService;
@Mock
PartitionService partitionService;
@Mock
TenantApiUsageState tenantUsageStateMock;
@Mock
ApiUsageStateService apiUsageStateService;
@Mock
SchedulerComponent scheduler;
@Mock
TbTenantProfileCache tenantProfileCache;
@Mock
MailService mailService;
@Mock
DbCallbackExecutorService dbExecutor;
TenantId tenantId = TenantId.fromUUID(UUID.fromString("00797a3b-7aeb-4b5b-b57a-c2a810d0f112"));
DefaultTbApiUsageStateService service;
@Before
public void setUp() {
service = spy(new DefaultTbApiUsageStateService(clusterService, partitionService, tenantService, tsService, apiUsageStateService, scheduler, tenantProfileCache, mailService, dbExecutor));
}
@Test
public void givenTenantIdFromEntityStatesMap_whenGetApiUsageState() {
service.myUsageStates.put(tenantId, tenantUsageStateMock);
ApiUsageState tenantUsageState = service.getApiUsageState(tenantId);
assertThat(tenantUsageState, is(tenantUsageStateMock.getApiUsageState()));
Mockito.verify(service, never()).getOrFetchState(tenantId, tenantId);
}
@Test
public void givenTenantIdWithoutTenantStateInMap_whenGetState_thenGetOrFetchState() {
TopicPartitionInfo tpi = Mockito.mock(TopicPartitionInfo.class);
Mockito.when(tpi.isMyPartition()).thenReturn(true);
willReturn(tpi).given(partitionService).resolve(ServiceType.TB_CORE, tenantId, tenantId);
service.myUsageStates.clear();
willReturn(tenantUsageStateMock).given(service).getOrFetchState(tenantId, tenantId);
ApiUsageState tenantUsageState = service.getApiUsageState(tenantId);
assertThat(tenantUsageState, is(tenantUsageStateMock.getApiUsageState()));
assertThat(true, is(service.partitionedEntities.get(tpi).contains(tenantId)));
Mockito.verify(service, times(1)).getOrFetchState(tenantId, tenantId);
}
}

View File

@ -158,6 +158,8 @@ public class HashPartitionService implements PartitionService {
} }
}); });
tpiCache.clear();
oldPartitions.forEach((serviceQueueKey, partitions) -> { oldPartitions.forEach((serviceQueueKey, partitions) -> {
if (!myPartitions.containsKey(serviceQueueKey)) { if (!myPartitions.containsKey(serviceQueueKey)) {
log.info("[{}] NO MORE PARTITIONS FOR CURRENT KEY", serviceQueueKey); log.info("[{}] NO MORE PARTITIONS FOR CURRENT KEY", serviceQueueKey);
@ -174,7 +176,6 @@ public class HashPartitionService implements PartitionService {
applicationEventPublisher.publishEvent(new PartitionChangeEvent(this, serviceQueueKey, tpiList)); applicationEventPublisher.publishEvent(new PartitionChangeEvent(this, serviceQueueKey, tpiList));
} }
}); });
tpiCache.clear();
if (currentOtherServices == null) { if (currentOtherServices == null) {
currentOtherServices = new ArrayList<>(otherServices); currentOtherServices = new ArrayList<>(otherServices);