Tenant Profile updates for ApiUsageStateService

This commit is contained in:
Andrii Shvaika 2020-10-20 18:57:15 +03:00
parent 9ec4b77672
commit 9d14a38966
10 changed files with 197 additions and 44 deletions

View File

@ -40,8 +40,6 @@ import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg;
import org.thingsboard.server.common.msg.queue.RuleEngineException;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.dao.model.ModelConstants;
import org.thingsboard.server.dao.tenant.TenantProfileService;
import org.thingsboard.server.dao.tenant.TenantService;
import org.thingsboard.server.service.profile.TbTenantProfileCache;
import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper;
@ -150,15 +148,12 @@ public class AppActor extends ContextAwareActor {
private void onComponentLifecycleMsg(ComponentLifecycleMsg msg) {
TbActorRef target = null;
if (TenantId.SYS_TENANT_ID.equals(msg.getTenantId())) {
if (msg.getEntityId().getEntityType() == EntityType.TENANT_PROFILE) {
tenantProfileCache.evict(new TenantProfileId(msg.getEntityId().getId()));
} else {
if (!EntityType.TENANT_PROFILE.equals(msg.getEntityId().getEntityType())) {
log.warn("Message has system tenant id: {}", msg);
}
} else {
if (msg.getEntityId().getEntityType() == EntityType.TENANT) {
if (EntityType.TENANT.equals(msg.getEntityId().getEntityType())) {
TenantId tenantId = new TenantId(msg.getEntityId().getId());
tenantProfileCache.evict(tenantId);
if (msg.getEvent() == ComponentLifecycleEvent.DELETED) {
log.info("[{}] Handling tenant deleted notification: {}", msg.getTenantId(), msg);
deletedTenants.add(tenantId);

View File

@ -22,6 +22,7 @@ import org.thingsboard.server.common.data.ApiUsageRecordKey;
import org.thingsboard.server.common.data.ApiUsageState;
import org.thingsboard.server.common.data.TenantProfile;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.TenantProfileId;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
@ -87,6 +88,7 @@ public class DefaultTbApiUsageStateService implements TbApiUsageStateService {
TenantId tenantId = new TenantId(new UUID(statsMsg.getTenantIdMSB(), statsMsg.getTenantIdLSB()));
TenantApiUsageState tenantState;
List<TsKvEntry> updatedEntries;
boolean stateUpdated = false;
updateLock.lock();
try {
tenantState = getOrFetchState(tenantId);
@ -101,13 +103,21 @@ public class DefaultTbApiUsageStateService implements TbApiUsageStateService {
ApiUsageRecordKey recordKey = ApiUsageRecordKey.valueOf(kvProto.getKey());
long newValue = tenantState.add(recordKey, kvProto.getValue());
updatedEntries.add(new BasicTsKvEntry(ts, new LongDataEntry(recordKey.name(), newValue)));
newValue = tenantState.addToHourly(recordKey, kvProto.getValue());
updatedEntries.add(new BasicTsKvEntry(hourTs, new LongDataEntry(HOURLY + recordKey.name(), newValue)));
long newHourlyValue = tenantState.addToHourly(recordKey, kvProto.getValue());
updatedEntries.add(new BasicTsKvEntry(hourTs, new LongDataEntry(HOURLY + recordKey.name(), newHourlyValue)));
stateUpdated |= tenantState.checkStateUpdatedDueToThreshold(recordKey);
}
} finally {
updateLock.unlock();
}
tsService.save(tenantId, tenantState.getEntityId(), updatedEntries, 0L);
if (stateUpdated) {
// Save new state into the database;
apiUsageStateService.update(tenantState.getApiUsageState());
//TODO: clear cache on cluster repartition.
//TODO: update profiles on tenant and profile updates.
//TODO: broadcast to everyone notifications about enabled/disabled features.
}
}
@Override
@ -116,12 +126,26 @@ public class DefaultTbApiUsageStateService implements TbApiUsageStateService {
}
@Override
public void onAddedToAllowList(TenantId tenantId) {
public void onTenantProfileUpdate(TenantProfileId tenantProfileId) {
TenantProfile tenantProfile = tenantProfileCache.get(tenantProfileId);
updateLock.lock();
try {
tenantStates.values().forEach(state -> {
if (tenantProfile.getId().equals(state.getTenantProfileId())) {
state.setTenantProfileData(tenantProfile.getProfileData());
if (state.checkStateUpdatedDueToThresholds()) {
apiUsageStateService.update(state.getApiUsageState());
//TODO: send notification to cluster;
}
}
});
} finally {
updateLock.unlock();
}
}
@Override
public void onAddedToDenyList(TenantId tenantId) {
public void onTenantUpdate(TenantId tenantId) {
}
@ -150,7 +174,8 @@ public class DefaultTbApiUsageStateService implements TbApiUsageStateService {
dbStateEntity = apiUsageStateService.findTenantApiUsageState(tenantId);
}
}
tenantState = new TenantApiUsageState(dbStateEntity.getEntityId());
TenantProfile tenantProfile = tenantProfileCache.get(tenantId);
tenantState = new TenantApiUsageState(tenantProfile, dbStateEntity);
try {
List<TsKvEntry> dbValues = tsService.findAllLatest(tenantId, dbStateEntity.getEntityId()).get();
for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) {

View File

@ -16,6 +16,7 @@
package org.thingsboard.server.service.apiusage;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.TenantProfileId;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
@ -26,8 +27,7 @@ public interface TbApiUsageStateService {
TenantApiUsageState getApiUsageState(TenantId tenantId);
void onAddedToAllowList(TenantId tenantId);
void onAddedToDenyList(TenantId tenantId);
void onTenantProfileUpdate(TenantProfileId tenantProfileId);
void onTenantUpdate(TenantId tenantId);
}

View File

@ -16,8 +16,13 @@
package org.thingsboard.server.service.apiusage;
import lombok.Getter;
import lombok.Setter;
import org.thingsboard.server.common.data.ApiUsageRecordKey;
import org.thingsboard.server.common.data.ApiUsageState;
import org.thingsboard.server.common.data.TenantProfile;
import org.thingsboard.server.common.data.TenantProfileData;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantProfileId;
import org.thingsboard.server.common.msg.tools.SchedulerUtils;
import java.util.Map;
@ -29,7 +34,13 @@ public class TenantApiUsageState {
private final Map<ApiUsageRecordKey, Long> currentHourValues = new ConcurrentHashMap<>();
@Getter
private final EntityId entityId;
@Setter
private TenantProfileId tenantProfileId;
@Getter
@Setter
private TenantProfileData tenantProfileData;
@Getter
private final ApiUsageState apiUsageState;
@Getter
private volatile long currentCycleTs;
@Getter
@ -37,8 +48,10 @@ public class TenantApiUsageState {
@Getter
private volatile long currentHourTs;
public TenantApiUsageState(EntityId entityId) {
this.entityId = entityId;
public TenantApiUsageState(TenantProfile tenantProfile, ApiUsageState apiUsageState) {
this.tenantProfileId = tenantProfile.getId();
this.tenantProfileData = tenantProfile.getProfileData();
this.apiUsageState = apiUsageState;
this.currentCycleTs = SchedulerUtils.getStartOfCurrentMonth();
this.nextCycleTs = SchedulerUtils.getStartOfNextMonth();
this.currentHourTs = SchedulerUtils.getStartOfCurrentHour();
@ -58,6 +71,10 @@ public class TenantApiUsageState {
return result;
}
public long get(ApiUsageRecordKey key) {
return currentCycleValues.getOrDefault(key, 0L);
}
public long addToHourly(ApiUsageRecordKey key, long value) {
long result = currentHourValues.getOrDefault(key, 0L) + value;
currentHourValues.put(key, result);
@ -79,4 +96,103 @@ public class TenantApiUsageState {
}
}
public long getProfileThreshold(ApiUsageRecordKey key) {
Object threshold = tenantProfileData.getProperties().get(key.name());
if (threshold != null) {
if (threshold instanceof String) {
return Long.parseLong((String) threshold);
} else if (threshold instanceof Long) {
return (Long) threshold;
}
}
return 0L;
}
public EntityId getEntityId() {
return apiUsageState.getEntityId();
}
public boolean isTransportEnabled() {
return apiUsageState.isTransportEnabled();
}
public boolean isDbStorageEnabled() {
return apiUsageState.isDbStorageEnabled();
}
public boolean isRuleEngineEnabled() {
return apiUsageState.isRuleEngineEnabled();
}
public boolean isJsExecEnabled() {
return apiUsageState.isJsExecEnabled();
}
public void setTransportEnabled(boolean transportEnabled) {
apiUsageState.setTransportEnabled(transportEnabled);
}
public void setDbStorageEnabled(boolean dbStorageEnabled) {
apiUsageState.setDbStorageEnabled(dbStorageEnabled);
}
public void setRuleEngineEnabled(boolean ruleEngineEnabled) {
apiUsageState.setRuleEngineEnabled(ruleEngineEnabled);
}
public void setJsExecEnabled(boolean jsExecEnabled) {
apiUsageState.setJsExecEnabled(jsExecEnabled);
}
public boolean isFeatureEnabled(ApiUsageRecordKey recordKey) {
switch (recordKey) {
case MSG_COUNT:
case MSG_BYTES_COUNT:
case DP_TRANSPORT_COUNT:
return isTransportEnabled();
case RE_EXEC_COUNT:
return isRuleEngineEnabled();
case DP_STORAGE_COUNT:
return isDbStorageEnabled();
case JS_EXEC_COUNT:
return isJsExecEnabled();
default:
return true;
}
}
public boolean setFeatureValue(ApiUsageRecordKey recordKey, boolean value) {
boolean currentValue = isFeatureEnabled(recordKey);
switch (recordKey) {
case MSG_COUNT:
case MSG_BYTES_COUNT:
case DP_TRANSPORT_COUNT:
setTransportEnabled(value);
break;
case RE_EXEC_COUNT:
setRuleEngineEnabled(value);
break;
case DP_STORAGE_COUNT:
setDbStorageEnabled(value);
break;
case JS_EXEC_COUNT:
setJsExecEnabled(value);
break;
}
return currentValue == value;
}
public boolean checkStateUpdatedDueToThresholds() {
boolean update = false;
for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) {
update |= checkStateUpdatedDueToThreshold(key);
}
return update;
}
public boolean checkStateUpdatedDueToThreshold(ApiUsageRecordKey recordKey) {
long value = get(recordKey);
long threshold = getProfileThreshold(recordKey);
return setFeatureValue(recordKey, threshold == 0 || value < threshold);
}
}

View File

@ -107,7 +107,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
SubscriptionManagerService subscriptionManagerService, DataDecodingEncodingService encodingService,
TbCoreDeviceRpcService tbCoreDeviceRpcService, StatsFactory statsFactory, TbDeviceProfileCache deviceProfileCache,
TbApiUsageStateService statsService) {
super(actorContext, encodingService, deviceProfileCache, tbCoreQueueFactory.createToCoreNotificationsMsgConsumer());
super(actorContext, encodingService, tenantProfileCache, deviceProfileCache, apiUsageStateService, tbCoreQueueFactory.createToCoreNotificationsMsgConsumer());
this.mainConsumer = tbCoreQueueFactory.createToCoreMsgConsumer();
this.usageStatsConsumer = tbCoreQueueFactory.createToUsageStatsServiceMsgConsumer();
this.stateService = stateService;

View File

@ -15,7 +15,6 @@
*/
package org.thingsboard.server.service.queue;
import com.google.protobuf.ByteString;
import com.google.protobuf.ProtocolStringList;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
@ -24,7 +23,6 @@ import org.springframework.stereotype.Service;
import org.thingsboard.rule.engine.api.RpcError;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.queue.*;
import org.thingsboard.server.common.stats.StatsFactory;
@ -83,7 +81,7 @@ public class DefaultTbRuleEngineConsumerService extends AbstractConsumerService<
ActorSystemContext actorContext, DataDecodingEncodingService encodingService,
TbRuleEngineDeviceRpcService tbDeviceRpcService,
StatsFactory statsFactory, TbDeviceProfileCache deviceProfileCache) {
super(actorContext, encodingService, deviceProfileCache, tbRuleEngineQueueFactory.createToRuleEngineNotificationsMsgConsumer());
super(actorContext, encodingService, tenantProfileCache, deviceProfileCache, apiUsageStateService, tbRuleEngineQueueFactory.createToRuleEngineNotificationsMsgConsumer());
this.statisticsService = statisticsService;
this.ruleEngineSettings = ruleEngineSettings;
this.tbRuleEngineQueueFactory = tbRuleEngineQueueFactory;

View File

@ -25,6 +25,7 @@ import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.TenantProfileId;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.queue.ServiceType;
@ -33,7 +34,9 @@ import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
import org.thingsboard.server.common.transport.util.DataDecodingEncodingService;
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
import org.thingsboard.server.service.profile.TbTenantProfileCache;
import org.thingsboard.server.service.queue.TbPackCallback;
import org.thingsboard.server.service.queue.TbPackProcessingContext;
@ -59,15 +62,19 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene
protected final ActorSystemContext actorContext;
protected final DataDecodingEncodingService encodingService;
protected final TbTenantProfileCache tenantProfileCache;
protected final TbDeviceProfileCache deviceProfileCache;
protected final TbApiUsageStateService apiUsageStateService;
protected final TbQueueConsumer<TbProtoQueueMsg<N>> nfConsumer;
public AbstractConsumerService(ActorSystemContext actorContext, DataDecodingEncodingService encodingService,
TbDeviceProfileCache deviceProfileCache, TbQueueConsumer<TbProtoQueueMsg<N>> nfConsumer) {
TbTenantProfileCache tenantProfileCache, TbDeviceProfileCache deviceProfileCache, TbApiUsageStateService apiUsageStateService, TbQueueConsumer<TbProtoQueueMsg<N>> nfConsumer) {
this.actorContext = actorContext;
this.encodingService = encodingService;
this.tenantProfileCache = tenantProfileCache;
this.deviceProfileCache = deviceProfileCache;
this.apiUsageStateService = apiUsageStateService;
this.nfConsumer = nfConsumer;
}
@ -143,7 +150,14 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene
TbActorMsg actorMsg = actorMsgOpt.get();
if (actorMsg instanceof ComponentLifecycleMsg) {
ComponentLifecycleMsg componentLifecycleMsg = (ComponentLifecycleMsg) actorMsg;
if (EntityType.DEVICE_PROFILE.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
if (EntityType.TENANT_PROFILE.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
TenantProfileId tenantProfileId = new TenantProfileId(componentLifecycleMsg.getEntityId().getId());
tenantProfileCache.evict(tenantProfileId);
apiUsageStateService.onTenantProfileUpdate(tenantProfileId);
} else if (EntityType.TENANT.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
tenantProfileCache.evict(componentLifecycleMsg.getTenantId());
apiUsageStateService.onTenantUpdate(componentLifecycleMsg.getTenantId());
} else if (EntityType.DEVICE_PROFILE.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
deviceProfileCache.evict(componentLifecycleMsg.getTenantId(), new DeviceProfileId(componentLifecycleMsg.getEntityId().getId()));
} else if (EntityType.DEVICE.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
deviceProfileCache.evict(new DeviceId(componentLifecycleMsg.getEntityId().getId()));

View File

@ -20,9 +20,12 @@ import org.thingsboard.server.common.data.id.TenantId;
public interface ApiUsageStateService {
ApiUsageState createDefaultApiUsageState(TenantId id);
ApiUsageState update(ApiUsageState apiUsageState);
ApiUsageState findTenantApiUsageState(TenantId tenantId);
void deleteApiUsageStateByTenantId(TenantId tenantId);
ApiUsageState createDefaultApiUsageState(TenantId id);
}

View File

@ -16,6 +16,8 @@
package org.thingsboard.server.common.data;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
@ -27,8 +29,18 @@ public class ApiUsageState extends BaseData<ApiUsageStateId> implements HasTenan
private static final long serialVersionUID = 8250339805336035966L;
@Getter @Setter
private TenantId tenantId;
@Getter @Setter
private EntityId entityId;
@Getter @Setter
private boolean transportEnabled;
@Getter @Setter
private boolean dbStorageEnabled;
@Getter @Setter
private boolean ruleEngineEnabled;
@Getter @Setter
private boolean jsExecEnabled;
public ApiUsageState() {
super();
@ -43,22 +55,4 @@ public class ApiUsageState extends BaseData<ApiUsageStateId> implements HasTenan
this.tenantId = ur.getTenantId();
this.entityId = ur.getEntityId();
}
@Override
public TenantId getTenantId() {
return tenantId;
}
public void setTenantId(TenantId tenantId) {
this.tenantId = tenantId;
}
public EntityId getEntityId() {
return entityId;
}
public void setEntityId(EntityId entityId) {
this.entityId = entityId;
}
}

View File

@ -59,6 +59,14 @@ public class ApiApiUsageStateServiceImpl extends AbstractEntityService implement
return apiUsageStateDao.save(apiUsageState.getTenantId(), apiUsageState);
}
@Override
public ApiUsageState update(ApiUsageState apiUsageState) {
log.trace("Executing save [{}]", apiUsageState.getTenantId());
validateId(apiUsageState.getTenantId(), INCORRECT_TENANT_ID + apiUsageState.getTenantId());
validateId(apiUsageState.getId(), "Can't save new usage state. Only update is allowed!");
return apiUsageStateDao.save(apiUsageState.getTenantId(), apiUsageState);
}
@Override
public ApiUsageState findTenantApiUsageState(TenantId tenantId) {
log.trace("Executing findTenantUsageRecord, tenantId [{}]", tenantId);