diff --git a/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java b/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java index 84801224e1..003ee72807 100644 --- a/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -26,6 +26,7 @@ import org.thingsboard.server.common.data.id.TenantProfileId; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.tools.SchedulerUtils; import org.thingsboard.server.dao.timeseries.TimeseriesService; @@ -33,6 +34,8 @@ import org.thingsboard.server.dao.usagerecord.ApiUsageStateService; import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.UsageStatsKVProto; import org.thingsboard.server.queue.common.TbProtoQueueMsg; +import org.thingsboard.server.queue.discovery.PartitionChangeEvent; +import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.scheduler.SchedulerComponent; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.profile.TbTenantProfileCache; @@ -54,6 +57,7 @@ import java.util.concurrent.locks.ReentrantLock; public class DefaultTbApiUsageStateService implements TbApiUsageStateService { public static final String HOURLY = "HOURLY_"; + private final PartitionService partitionService; private final ApiUsageStateService apiUsageStateService; private final TimeseriesService tsService; private final SchedulerComponent scheduler; @@ -68,7 +72,8 @@ public class DefaultTbApiUsageStateService implements TbApiUsageStateService { private final Lock updateLock = new ReentrantLock(); - public DefaultTbApiUsageStateService(ApiUsageStateService apiUsageStateService, TimeseriesService tsService, SchedulerComponent scheduler, TbTenantProfileCache tenantProfileCache) { + public DefaultTbApiUsageStateService(PartitionService partitionService, ApiUsageStateService apiUsageStateService, TimeseriesService tsService, SchedulerComponent scheduler, TbTenantProfileCache tenantProfileCache) { + this.partitionService = partitionService; this.apiUsageStateService = apiUsageStateService; this.tsService = tsService; this.scheduler = scheduler; @@ -120,8 +125,16 @@ public class DefaultTbApiUsageStateService implements TbApiUsageStateService { } } + @Override + public void onApplicationEvent(PartitionChangeEvent partitionChangeEvent) { + if (partitionChangeEvent.getServiceType().equals(ServiceType.TB_CORE)) { + tenantStates.entrySet().removeIf(entry -> !partitionService.resolve(ServiceType.TB_CORE, entry.getKey(), entry.getKey()).isMyPartition()); + } + } + @Override public TenantApiUsageState getApiUsageState(TenantId tenantId) { + //We should always get it from the map of from the database; return null; } @@ -132,11 +145,7 @@ public class DefaultTbApiUsageStateService implements TbApiUsageStateService { try { tenantStates.values().forEach(state -> { if (tenantProfile.getId().equals(state.getTenantProfileId())) { - state.setTenantProfileData(tenantProfile.getProfileData()); - if (state.checkStateUpdatedDueToThresholds()) { - apiUsageStateService.update(state.getApiUsageState()); - //TODO: send notification to cluster; - } + updateTenantState(state, tenantProfile); } }); } finally { @@ -146,7 +155,25 @@ public class DefaultTbApiUsageStateService implements TbApiUsageStateService { @Override public void onTenantUpdate(TenantId tenantId) { + TenantProfile tenantProfile = tenantProfileCache.get(tenantId); + updateLock.lock(); + try { + TenantApiUsageState state = tenantStates.get(tenantId); + if (state != null && !state.getTenantProfileId().equals(tenantProfile.getId())) { + updateTenantState(state, tenantProfile); + } + } finally { + updateLock.unlock(); + } + } + + private void updateTenantState(TenantApiUsageState state, TenantProfile tenantProfile) { + state.setTenantProfileData(tenantProfile.getProfileData()); + if (state.checkStateUpdatedDueToThresholds()) { + apiUsageStateService.update(state.getApiUsageState()); + //TODO: send notification to cluster; + } } private void checkStartOfNextCycle() { diff --git a/application/src/main/java/org/thingsboard/server/service/apiusage/TbApiUsageStateService.java b/application/src/main/java/org/thingsboard/server/service/apiusage/TbApiUsageStateService.java index d17bd9a3dd..4816dc1037 100644 --- a/application/src/main/java/org/thingsboard/server/service/apiusage/TbApiUsageStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/apiusage/TbApiUsageStateService.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,13 +15,15 @@ */ package org.thingsboard.server.service.apiusage; +import org.springframework.context.ApplicationListener; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantProfileId; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; +import org.thingsboard.server.queue.discovery.PartitionChangeEvent; -public interface TbApiUsageStateService { +public interface TbApiUsageStateService extends ApplicationListener { void process(TbProtoQueueMsg msg, TbCallback callback);