From 9d14a38966c5351edd16c01b339a9ff3c19e88e7 Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Tue, 20 Oct 2020 18:57:15 +0300 Subject: [PATCH] Tenant Profile updates for ApiUsageStateService --- .../server/actors/app/AppActor.java | 9 +- .../DefaultTbApiUsageStateService.java | 37 +++++- .../apiusage/TbApiUsageStateService.java | 6 +- .../service/apiusage/TenantApiUsageState.java | 122 +++++++++++++++++- .../queue/DefaultTbCoreConsumerService.java | 2 +- .../DefaultTbRuleEngineConsumerService.java | 4 +- .../processing/AbstractConsumerService.java | 18 ++- .../dao/usagerecord/ApiUsageStateService.java | 5 +- .../server/common/data/ApiUsageState.java | 30 ++--- .../ApiApiUsageStateServiceImpl.java | 8 ++ 10 files changed, 197 insertions(+), 44 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java index 826596db56..fea5558540 100644 --- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java @@ -40,8 +40,6 @@ import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg; import org.thingsboard.server.common.msg.queue.RuleEngineException; import org.thingsboard.server.common.msg.queue.ServiceType; -import org.thingsboard.server.dao.model.ModelConstants; -import org.thingsboard.server.dao.tenant.TenantProfileService; import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.service.profile.TbTenantProfileCache; import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper; @@ -150,15 +148,12 @@ public class AppActor extends ContextAwareActor { private void onComponentLifecycleMsg(ComponentLifecycleMsg msg) { TbActorRef target = null; if (TenantId.SYS_TENANT_ID.equals(msg.getTenantId())) { - if (msg.getEntityId().getEntityType() == EntityType.TENANT_PROFILE) { - tenantProfileCache.evict(new TenantProfileId(msg.getEntityId().getId())); - } else { + if (!EntityType.TENANT_PROFILE.equals(msg.getEntityId().getEntityType())) { log.warn("Message has system tenant id: {}", msg); } } else { - if (msg.getEntityId().getEntityType() == EntityType.TENANT) { + if (EntityType.TENANT.equals(msg.getEntityId().getEntityType())) { TenantId tenantId = new TenantId(msg.getEntityId().getId()); - tenantProfileCache.evict(tenantId); if (msg.getEvent() == ComponentLifecycleEvent.DELETED) { log.info("[{}] Handling tenant deleted notification: {}", msg.getTenantId(), msg); deletedTenants.add(tenantId); diff --git a/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java b/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java index 1b6cdf3cff..84801224e1 100644 --- a/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java @@ -22,6 +22,7 @@ import org.thingsboard.server.common.data.ApiUsageRecordKey; import org.thingsboard.server.common.data.ApiUsageState; import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.id.TenantProfileId; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; @@ -87,6 +88,7 @@ public class DefaultTbApiUsageStateService implements TbApiUsageStateService { TenantId tenantId = new TenantId(new UUID(statsMsg.getTenantIdMSB(), statsMsg.getTenantIdLSB())); TenantApiUsageState tenantState; List updatedEntries; + boolean stateUpdated = false; updateLock.lock(); try { tenantState = getOrFetchState(tenantId); @@ -101,13 +103,21 @@ public class DefaultTbApiUsageStateService implements TbApiUsageStateService { ApiUsageRecordKey recordKey = ApiUsageRecordKey.valueOf(kvProto.getKey()); long newValue = tenantState.add(recordKey, kvProto.getValue()); updatedEntries.add(new BasicTsKvEntry(ts, new LongDataEntry(recordKey.name(), newValue))); - newValue = tenantState.addToHourly(recordKey, kvProto.getValue()); - updatedEntries.add(new BasicTsKvEntry(hourTs, new LongDataEntry(HOURLY + recordKey.name(), newValue))); + long newHourlyValue = tenantState.addToHourly(recordKey, kvProto.getValue()); + updatedEntries.add(new BasicTsKvEntry(hourTs, new LongDataEntry(HOURLY + recordKey.name(), newHourlyValue))); + stateUpdated |= tenantState.checkStateUpdatedDueToThreshold(recordKey); } } finally { updateLock.unlock(); } tsService.save(tenantId, tenantState.getEntityId(), updatedEntries, 0L); + if (stateUpdated) { + // Save new state into the database; + apiUsageStateService.update(tenantState.getApiUsageState()); + //TODO: clear cache on cluster repartition. + //TODO: update profiles on tenant and profile updates. + //TODO: broadcast to everyone notifications about enabled/disabled features. + } } @Override @@ -116,12 +126,26 @@ public class DefaultTbApiUsageStateService implements TbApiUsageStateService { } @Override - public void onAddedToAllowList(TenantId tenantId) { - + public void onTenantProfileUpdate(TenantProfileId tenantProfileId) { + TenantProfile tenantProfile = tenantProfileCache.get(tenantProfileId); + updateLock.lock(); + try { + tenantStates.values().forEach(state -> { + if (tenantProfile.getId().equals(state.getTenantProfileId())) { + state.setTenantProfileData(tenantProfile.getProfileData()); + if (state.checkStateUpdatedDueToThresholds()) { + apiUsageStateService.update(state.getApiUsageState()); + //TODO: send notification to cluster; + } + } + }); + } finally { + updateLock.unlock(); + } } @Override - public void onAddedToDenyList(TenantId tenantId) { + public void onTenantUpdate(TenantId tenantId) { } @@ -150,7 +174,8 @@ public class DefaultTbApiUsageStateService implements TbApiUsageStateService { dbStateEntity = apiUsageStateService.findTenantApiUsageState(tenantId); } } - tenantState = new TenantApiUsageState(dbStateEntity.getEntityId()); + TenantProfile tenantProfile = tenantProfileCache.get(tenantId); + tenantState = new TenantApiUsageState(tenantProfile, dbStateEntity); try { List dbValues = tsService.findAllLatest(tenantId, dbStateEntity.getEntityId()).get(); for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) { diff --git a/application/src/main/java/org/thingsboard/server/service/apiusage/TbApiUsageStateService.java b/application/src/main/java/org/thingsboard/server/service/apiusage/TbApiUsageStateService.java index e79b79d069..d17bd9a3dd 100644 --- a/application/src/main/java/org/thingsboard/server/service/apiusage/TbApiUsageStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/apiusage/TbApiUsageStateService.java @@ -16,6 +16,7 @@ package org.thingsboard.server.service.apiusage; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.id.TenantProfileId; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; import org.thingsboard.server.queue.common.TbProtoQueueMsg; @@ -26,8 +27,7 @@ public interface TbApiUsageStateService { TenantApiUsageState getApiUsageState(TenantId tenantId); - void onAddedToAllowList(TenantId tenantId); - - void onAddedToDenyList(TenantId tenantId); + void onTenantProfileUpdate(TenantProfileId tenantProfileId); + void onTenantUpdate(TenantId tenantId); } diff --git a/application/src/main/java/org/thingsboard/server/service/apiusage/TenantApiUsageState.java b/application/src/main/java/org/thingsboard/server/service/apiusage/TenantApiUsageState.java index 29335c3326..0492ba9793 100644 --- a/application/src/main/java/org/thingsboard/server/service/apiusage/TenantApiUsageState.java +++ b/application/src/main/java/org/thingsboard/server/service/apiusage/TenantApiUsageState.java @@ -16,8 +16,13 @@ package org.thingsboard.server.service.apiusage; import lombok.Getter; +import lombok.Setter; import org.thingsboard.server.common.data.ApiUsageRecordKey; +import org.thingsboard.server.common.data.ApiUsageState; +import org.thingsboard.server.common.data.TenantProfile; +import org.thingsboard.server.common.data.TenantProfileData; import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantProfileId; import org.thingsboard.server.common.msg.tools.SchedulerUtils; import java.util.Map; @@ -29,7 +34,13 @@ public class TenantApiUsageState { private final Map currentHourValues = new ConcurrentHashMap<>(); @Getter - private final EntityId entityId; + @Setter + private TenantProfileId tenantProfileId; + @Getter + @Setter + private TenantProfileData tenantProfileData; + @Getter + private final ApiUsageState apiUsageState; @Getter private volatile long currentCycleTs; @Getter @@ -37,8 +48,10 @@ public class TenantApiUsageState { @Getter private volatile long currentHourTs; - public TenantApiUsageState(EntityId entityId) { - this.entityId = entityId; + public TenantApiUsageState(TenantProfile tenantProfile, ApiUsageState apiUsageState) { + this.tenantProfileId = tenantProfile.getId(); + this.tenantProfileData = tenantProfile.getProfileData(); + this.apiUsageState = apiUsageState; this.currentCycleTs = SchedulerUtils.getStartOfCurrentMonth(); this.nextCycleTs = SchedulerUtils.getStartOfNextMonth(); this.currentHourTs = SchedulerUtils.getStartOfCurrentHour(); @@ -58,6 +71,10 @@ public class TenantApiUsageState { return result; } + public long get(ApiUsageRecordKey key) { + return currentCycleValues.getOrDefault(key, 0L); + } + public long addToHourly(ApiUsageRecordKey key, long value) { long result = currentHourValues.getOrDefault(key, 0L) + value; currentHourValues.put(key, result); @@ -79,4 +96,103 @@ public class TenantApiUsageState { } } + public long getProfileThreshold(ApiUsageRecordKey key) { + Object threshold = tenantProfileData.getProperties().get(key.name()); + if (threshold != null) { + if (threshold instanceof String) { + return Long.parseLong((String) threshold); + } else if (threshold instanceof Long) { + return (Long) threshold; + } + } + return 0L; + } + + public EntityId getEntityId() { + return apiUsageState.getEntityId(); + } + + public boolean isTransportEnabled() { + return apiUsageState.isTransportEnabled(); + } + + public boolean isDbStorageEnabled() { + return apiUsageState.isDbStorageEnabled(); + } + + public boolean isRuleEngineEnabled() { + return apiUsageState.isRuleEngineEnabled(); + } + + public boolean isJsExecEnabled() { + return apiUsageState.isJsExecEnabled(); + } + + public void setTransportEnabled(boolean transportEnabled) { + apiUsageState.setTransportEnabled(transportEnabled); + } + + public void setDbStorageEnabled(boolean dbStorageEnabled) { + apiUsageState.setDbStorageEnabled(dbStorageEnabled); + } + + public void setRuleEngineEnabled(boolean ruleEngineEnabled) { + apiUsageState.setRuleEngineEnabled(ruleEngineEnabled); + } + + public void setJsExecEnabled(boolean jsExecEnabled) { + apiUsageState.setJsExecEnabled(jsExecEnabled); + } + + public boolean isFeatureEnabled(ApiUsageRecordKey recordKey) { + switch (recordKey) { + case MSG_COUNT: + case MSG_BYTES_COUNT: + case DP_TRANSPORT_COUNT: + return isTransportEnabled(); + case RE_EXEC_COUNT: + return isRuleEngineEnabled(); + case DP_STORAGE_COUNT: + return isDbStorageEnabled(); + case JS_EXEC_COUNT: + return isJsExecEnabled(); + default: + return true; + } + } + + public boolean setFeatureValue(ApiUsageRecordKey recordKey, boolean value) { + boolean currentValue = isFeatureEnabled(recordKey); + switch (recordKey) { + case MSG_COUNT: + case MSG_BYTES_COUNT: + case DP_TRANSPORT_COUNT: + setTransportEnabled(value); + break; + case RE_EXEC_COUNT: + setRuleEngineEnabled(value); + break; + case DP_STORAGE_COUNT: + setDbStorageEnabled(value); + break; + case JS_EXEC_COUNT: + setJsExecEnabled(value); + break; + } + return currentValue == value; + } + + public boolean checkStateUpdatedDueToThresholds() { + boolean update = false; + for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) { + update |= checkStateUpdatedDueToThreshold(key); + } + return update; + } + + public boolean checkStateUpdatedDueToThreshold(ApiUsageRecordKey recordKey) { + long value = get(recordKey); + long threshold = getProfileThreshold(recordKey); + return setFeatureValue(recordKey, threshold == 0 || value < threshold); + } } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index 929d5000f0..4ad073ead8 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -107,7 +107,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService> nfConsumer; public AbstractConsumerService(ActorSystemContext actorContext, DataDecodingEncodingService encodingService, - TbDeviceProfileCache deviceProfileCache, TbQueueConsumer> nfConsumer) { + TbTenantProfileCache tenantProfileCache, TbDeviceProfileCache deviceProfileCache, TbApiUsageStateService apiUsageStateService, TbQueueConsumer> nfConsumer) { this.actorContext = actorContext; this.encodingService = encodingService; + this.tenantProfileCache = tenantProfileCache; this.deviceProfileCache = deviceProfileCache; + this.apiUsageStateService = apiUsageStateService; this.nfConsumer = nfConsumer; } @@ -143,7 +150,14 @@ public abstract class AbstractConsumerService implements HasTenan private static final long serialVersionUID = 8250339805336035966L; + @Getter @Setter private TenantId tenantId; + @Getter @Setter private EntityId entityId; + @Getter @Setter + private boolean transportEnabled; + @Getter @Setter + private boolean dbStorageEnabled; + @Getter @Setter + private boolean ruleEngineEnabled; + @Getter @Setter + private boolean jsExecEnabled; public ApiUsageState() { super(); @@ -43,22 +55,4 @@ public class ApiUsageState extends BaseData implements HasTenan this.tenantId = ur.getTenantId(); this.entityId = ur.getEntityId(); } - - @Override - public TenantId getTenantId() { - return tenantId; - } - - public void setTenantId(TenantId tenantId) { - this.tenantId = tenantId; - } - - public EntityId getEntityId() { - return entityId; - } - - public void setEntityId(EntityId entityId) { - this.entityId = entityId; - } - } diff --git a/dao/src/main/java/org/thingsboard/server/dao/usagerecord/ApiApiUsageStateServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/usagerecord/ApiApiUsageStateServiceImpl.java index 6be56a1815..7dc390046e 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/usagerecord/ApiApiUsageStateServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/usagerecord/ApiApiUsageStateServiceImpl.java @@ -59,6 +59,14 @@ public class ApiApiUsageStateServiceImpl extends AbstractEntityService implement return apiUsageStateDao.save(apiUsageState.getTenantId(), apiUsageState); } + @Override + public ApiUsageState update(ApiUsageState apiUsageState) { + log.trace("Executing save [{}]", apiUsageState.getTenantId()); + validateId(apiUsageState.getTenantId(), INCORRECT_TENANT_ID + apiUsageState.getTenantId()); + validateId(apiUsageState.getId(), "Can't save new usage state. Only update is allowed!"); + return apiUsageStateDao.save(apiUsageState.getTenantId(), apiUsageState); + } + @Override public ApiUsageState findTenantApiUsageState(TenantId tenantId) { log.trace("Executing findTenantUsageRecord, tenantId [{}]", tenantId);