Cleanup of the map on repartition

This commit is contained in:
Andrii Shvaika 2020-10-21 11:13:48 +03:00
parent a922b8227f
commit ddfa10a63b
2 changed files with 38 additions and 9 deletions

View File

@ -26,6 +26,7 @@ import org.thingsboard.server.common.data.id.TenantProfileId;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.tools.SchedulerUtils; import org.thingsboard.server.common.msg.tools.SchedulerUtils;
import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.dao.timeseries.TimeseriesService;
@ -33,6 +34,8 @@ import org.thingsboard.server.dao.usagerecord.ApiUsageStateService;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.UsageStatsKVProto; import org.thingsboard.server.gen.transport.TransportProtos.UsageStatsKVProto;
import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.scheduler.SchedulerComponent; import org.thingsboard.server.queue.scheduler.SchedulerComponent;
import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.profile.TbTenantProfileCache; import org.thingsboard.server.service.profile.TbTenantProfileCache;
@ -54,6 +57,7 @@ import java.util.concurrent.locks.ReentrantLock;
public class DefaultTbApiUsageStateService implements TbApiUsageStateService { public class DefaultTbApiUsageStateService implements TbApiUsageStateService {
public static final String HOURLY = "HOURLY_"; public static final String HOURLY = "HOURLY_";
private final PartitionService partitionService;
private final ApiUsageStateService apiUsageStateService; private final ApiUsageStateService apiUsageStateService;
private final TimeseriesService tsService; private final TimeseriesService tsService;
private final SchedulerComponent scheduler; private final SchedulerComponent scheduler;
@ -68,7 +72,8 @@ public class DefaultTbApiUsageStateService implements TbApiUsageStateService {
private final Lock updateLock = new ReentrantLock(); private final Lock updateLock = new ReentrantLock();
public DefaultTbApiUsageStateService(ApiUsageStateService apiUsageStateService, TimeseriesService tsService, SchedulerComponent scheduler, TbTenantProfileCache tenantProfileCache) { public DefaultTbApiUsageStateService(PartitionService partitionService, ApiUsageStateService apiUsageStateService, TimeseriesService tsService, SchedulerComponent scheduler, TbTenantProfileCache tenantProfileCache) {
this.partitionService = partitionService;
this.apiUsageStateService = apiUsageStateService; this.apiUsageStateService = apiUsageStateService;
this.tsService = tsService; this.tsService = tsService;
this.scheduler = scheduler; this.scheduler = scheduler;
@ -120,8 +125,16 @@ public class DefaultTbApiUsageStateService implements TbApiUsageStateService {
} }
} }
@Override
public void onApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
if (partitionChangeEvent.getServiceType().equals(ServiceType.TB_CORE)) {
tenantStates.entrySet().removeIf(entry -> !partitionService.resolve(ServiceType.TB_CORE, entry.getKey(), entry.getKey()).isMyPartition());
}
}
@Override @Override
public TenantApiUsageState getApiUsageState(TenantId tenantId) { public TenantApiUsageState getApiUsageState(TenantId tenantId) {
//We should always get it from the map of from the database;
return null; return null;
} }
@ -132,11 +145,7 @@ public class DefaultTbApiUsageStateService implements TbApiUsageStateService {
try { try {
tenantStates.values().forEach(state -> { tenantStates.values().forEach(state -> {
if (tenantProfile.getId().equals(state.getTenantProfileId())) { if (tenantProfile.getId().equals(state.getTenantProfileId())) {
state.setTenantProfileData(tenantProfile.getProfileData()); updateTenantState(state, tenantProfile);
if (state.checkStateUpdatedDueToThresholds()) {
apiUsageStateService.update(state.getApiUsageState());
//TODO: send notification to cluster;
}
} }
}); });
} finally { } finally {
@ -146,7 +155,25 @@ public class DefaultTbApiUsageStateService implements TbApiUsageStateService {
@Override @Override
public void onTenantUpdate(TenantId tenantId) { public void onTenantUpdate(TenantId tenantId) {
TenantProfile tenantProfile = tenantProfileCache.get(tenantId);
updateLock.lock();
try {
TenantApiUsageState state = tenantStates.get(tenantId);
if (state != null && !state.getTenantProfileId().equals(tenantProfile.getId())) {
updateTenantState(state, tenantProfile);
}
} finally {
updateLock.unlock();
}
}
private void updateTenantState(TenantApiUsageState state, TenantProfile tenantProfile) {
state.setTenantProfileData(tenantProfile.getProfileData());
if (state.checkStateUpdatedDueToThresholds()) {
apiUsageStateService.update(state.getApiUsageState());
//TODO: send notification to cluster;
}
} }
private void checkStartOfNextCycle() { private void checkStartOfNextCycle() {

View File

@ -15,13 +15,15 @@
*/ */
package org.thingsboard.server.service.apiusage; package org.thingsboard.server.service.apiusage;
import org.springframework.context.ApplicationListener;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.TenantProfileId; import org.thingsboard.server.common.data.id.TenantProfileId;
import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
public interface TbApiUsageStateService { public interface TbApiUsageStateService extends ApplicationListener<PartitionChangeEvent> {
void process(TbProtoQueueMsg<ToUsageStatsServiceMsg> msg, TbCallback callback); void process(TbProtoQueueMsg<ToUsageStatsServiceMsg> msg, TbCallback callback);