Usage records

This commit is contained in:
Andrii Shvaika 2020-10-27 18:30:23 +02:00
parent b38b47f409
commit 03581c2941
22 changed files with 295 additions and 135 deletions

View File

@ -32,7 +32,6 @@ import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.thingsboard.rule.engine.api.MailService;
import org.thingsboard.rule.engine.api.RuleEngineDeviceProfileCache;
import org.thingsboard.server.actors.service.ActorService;
import org.thingsboard.server.actors.tenant.DebugTbRateLimits;
import org.thingsboard.server.common.data.DataConstants;
@ -65,6 +64,8 @@ import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.dao.user.UserService;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.usagestats.TbApiUsageClient;
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
import org.thingsboard.server.service.component.ComponentDiscoveryService;
import org.thingsboard.server.common.transport.util.DataDecodingEncodingService;
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
@ -106,6 +107,14 @@ public class ActorSystemContext {
return debugPerTenantLimits;
}
@Autowired
@Getter
private TbApiUsageStateService apiUsageStateService;
@Autowired
@Getter
private TbApiUsageClient apiUsageClient;
@Autowired
@Getter
@Setter

View File

@ -23,6 +23,7 @@ import org.thingsboard.server.actors.TbActorRef;
import org.thingsboard.server.actors.TbEntityActorId;
import org.thingsboard.server.actors.service.DefaultActorService;
import org.thingsboard.server.actors.shared.ComponentMsgProcessor;
import org.thingsboard.server.common.data.ApiUsageRecordKey;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RuleChainId;
@ -46,6 +47,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.common.MultipleTbQueueTbMsgCallbackWrapper;
import org.thingsboard.server.queue.common.TbQueueTbMsgCallbackWrapper;
import org.thingsboard.server.queue.usagestats.TbApiUsageClient;
import org.thingsboard.server.service.queue.TbClusterService;
import java.util.ArrayList;
@ -68,15 +70,16 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
private final Map<RuleNodeId, List<RuleNodeRelation>> nodeRoutes;
private final RuleChainService service;
private final TbClusterService clusterService;
private final TbApiUsageClient apiUsageClient;
private String ruleChainName;
private RuleNodeId firstId;
private RuleNodeCtx firstNode;
private boolean started;
RuleChainActorMessageProcessor(TenantId tenantId, RuleChain ruleChain, ActorSystemContext systemContext
, TbActorRef parent, TbActorRef self) {
RuleChainActorMessageProcessor(TenantId tenantId, RuleChain ruleChain, ActorSystemContext systemContext, TbActorRef parent, TbActorRef self) {
super(systemContext, tenantId, ruleChain.getId());
this.apiUsageClient = systemContext.getApiUsageClient();
this.ruleChainName = ruleChain.getName();
this.parent = parent;
this.self = self;
@ -331,6 +334,7 @@ public class RuleChainActorMessageProcessor extends ComponentMsgProcessor<RuleCh
private void pushMsgToNode(RuleNodeCtx nodeCtx, TbMsg msg, String fromRelationType) {
if (nodeCtx != null) {
apiUsageClient.report(tenantId, ApiUsageRecordKey.RE_EXEC_COUNT);
nodeCtx.getSelfActor().tell(new RuleChainToRuleNodeMsg(new DefaultTbContext(systemContext, nodeCtx), msg, fromRelationType));
} else {
log.error("[{}][{}] RuleNodeCtx is empty", entityId, ruleChainName);

View File

@ -64,6 +64,12 @@ public abstract class RuleChainManagerActor extends ContextAwareActor {
}
}
protected void destroyRuleChains() {
for (RuleChain ruleChain : new PageDataIterable<>(link -> ruleChainService.findTenantRuleChains(tenantId, link), ContextAwareActor.ENTITY_PACK_LIMIT)) {
ctx.stop(new TbEntityActorId(ruleChain.getId()));
}
}
protected void visit(RuleChain entity, TbActorRef actorRef) {
if (entity != null && entity.isRoot()) {
rootChain = entity;

View File

@ -21,6 +21,7 @@ import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.TbActorCtx;
import org.thingsboard.server.actors.TbActorRef;
import org.thingsboard.server.actors.shared.ComponentMsgProcessor;
import org.thingsboard.server.common.data.ApiUsageRecordKey;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleState;
@ -28,6 +29,7 @@ import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.common.msg.queue.PartitionChangeMsg;
import org.thingsboard.server.common.msg.queue.RuleNodeException;
import org.thingsboard.server.common.msg.queue.RuleNodeInfo;
import org.thingsboard.server.queue.usagestats.TbApiUsageClient;
/**
* @author Andrew Shvayka
@ -36,6 +38,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
private final String ruleChainName;
private final TbActorRef self;
private final TbApiUsageClient apiUsageClient;
private RuleNode ruleNode;
private TbNode tbNode;
private DefaultTbContext defaultCtx;
@ -44,6 +47,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
RuleNodeActorMessageProcessor(TenantId tenantId, String ruleChainName, RuleNodeId ruleNodeId, ActorSystemContext systemContext
, TbActorRef parent, TbActorRef self) {
super(systemContext, tenantId, ruleNodeId);
this.apiUsageClient = systemContext.getApiUsageClient();
this.ruleChainName = ruleChainName;
this.self = self;
this.ruleNode = systemContext.getRuleChainService().findRuleNodeById(tenantId, entityId);
@ -92,6 +96,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor<RuleNod
public void onRuleToSelfMsg(RuleNodeToSelfMsg msg) throws Exception {
checkActive(msg.getMsg());
apiUsageClient.report(tenantId, ApiUsageRecordKey.RE_EXEC_COUNT);
if (ruleNode.isDebugMode()) {
systemContext.persistDebugInput(tenantId, entityId, msg.getMsg(), "Self");
}

View File

@ -29,6 +29,7 @@ import org.thingsboard.server.actors.device.DeviceActorCreator;
import org.thingsboard.server.actors.ruleChain.RuleChainManagerActor;
import org.thingsboard.server.actors.service.ContextBasedCreator;
import org.thingsboard.server.actors.service.DefaultActorService;
import org.thingsboard.server.common.data.ApiUsageState;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.TenantProfile;
@ -57,6 +58,7 @@ public class TenantActor extends RuleChainManagerActor {
private boolean isRuleEngineForCurrentTenant;
private boolean isCore;
private ApiUsageState apiUsageState;
private TenantActor(ActorSystemContext systemContext, TenantId tenantId) {
super(systemContext, tenantId);
@ -74,19 +76,24 @@ public class TenantActor extends RuleChainManagerActor {
cantFindTenant = true;
log.info("[{}] Started tenant actor for missing tenant.", tenantId);
} else {
apiUsageState = new ApiUsageState(systemContext.getApiUsageStateService().getApiUsageState(tenant.getId()));
// This Service may be started for specific tenant only.
Optional<TenantId> isolatedTenantId = systemContext.getServiceInfoProvider().getIsolatedTenant();
TenantProfile tenantProfile = systemContext.getTenantProfileCache().get(tenant.getTenantProfileId());
isRuleEngineForCurrentTenant = systemContext.getServiceInfoProvider().isService(ServiceType.TB_RULE_ENGINE);
isCore = systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE);
isRuleEngineForCurrentTenant = systemContext.getServiceInfoProvider().isService(ServiceType.TB_RULE_ENGINE);
if (isRuleEngineForCurrentTenant) {
try {
if (isolatedTenantId.map(id -> id.equals(tenantId)).orElseGet(() -> !tenantProfile.isIsolatedTbRuleEngine())) {
log.info("[{}] Going to init rule chains", tenantId);
initRuleChains();
if (apiUsageState.isReExecEnabled()) {
log.info("[{}] Going to init rule chains", tenantId);
initRuleChains();
} else {
log.info("[{}] Skip init of the rule chains due to API limits", tenantId);
}
} else {
isRuleEngineForCurrentTenant = false;
}
@ -98,8 +105,6 @@ public class TenantActor extends RuleChainManagerActor {
}
} catch (Exception e) {
log.warn("[{}] Unknown failure", tenantId, e);
// TODO: throw this in 3.1?
// throw new TbActorException("Failed to init actor", e);
}
}
@ -115,7 +120,7 @@ public class TenantActor extends RuleChainManagerActor {
if (msg.getMsgType().equals(MsgType.QUEUE_TO_RULE_ENGINE_MSG)) {
QueueToRuleEngineMsg queueMsg = (QueueToRuleEngineMsg) msg;
queueMsg.getTbMsg().getCallback().onSuccess();
} else if (msg.getMsgType().equals(MsgType.TRANSPORT_TO_DEVICE_ACTOR_MSG)){
} else if (msg.getMsgType().equals(MsgType.TRANSPORT_TO_DEVICE_ACTOR_MSG)) {
TransportToDeviceActorMsgWrapper transportMsg = (TransportToDeviceActorMsgWrapper) msg;
transportMsg.getCallback().onSuccess();
}
@ -173,26 +178,33 @@ public class TenantActor extends RuleChainManagerActor {
return;
}
TbMsg tbMsg = msg.getTbMsg();
if (tbMsg.getRuleChainId() == null) {
if (getRootChainActor() != null) {
getRootChainActor().tell(msg);
if (apiUsageState.isReExecEnabled()) {
if (tbMsg.getRuleChainId() == null) {
if (getRootChainActor() != null) {
getRootChainActor().tell(msg);
} else {
tbMsg.getCallback().onFailure(new RuleEngineException("No Root Rule Chain available!"));
log.info("[{}] No Root Chain: {}", tenantId, msg);
}
} else {
tbMsg.getCallback().onFailure(new RuleEngineException("No Root Rule Chain available!"));
log.info("[{}] No Root Chain: {}", tenantId, msg);
try {
ctx.tell(new TbEntityActorId(tbMsg.getRuleChainId()), msg);
} catch (TbActorNotRegisteredException ex) {
log.trace("Received message for non-existing rule chain: [{}]", tbMsg.getRuleChainId());
//TODO: 3.1 Log it to dead letters queue;
tbMsg.getCallback().onSuccess();
}
}
} else {
try {
ctx.tell(new TbEntityActorId(tbMsg.getRuleChainId()), msg);
} catch (TbActorNotRegisteredException ex) {
log.trace("Received message for non-existing rule chain: [{}]", tbMsg.getRuleChainId());
//TODO: 3.1 Log it to dead letters queue;
tbMsg.getCallback().onSuccess();
}
log.trace("[{}] Ack message because Rule Engine is disabled", tenantId);
tbMsg.getCallback().onSuccess();
}
}
private void onRuleChainMsg(RuleChainAwareMsg msg) {
getOrCreateActor(msg.getRuleChainId()).tell(msg);
if (apiUsageState.isReExecEnabled()) {
getOrCreateActor(msg.getRuleChainId()).tell(msg);
}
}
private void onToDeviceActorMsg(DeviceAwareMsg msg, boolean priority) {
@ -208,6 +220,17 @@ public class TenantActor extends RuleChainManagerActor {
}
private void onComponentLifecycleMsg(ComponentLifecycleMsg msg) {
if (msg.getEntityId().getEntityType().equals(EntityType.API_USAGE_STATE)) {
ApiUsageState old = apiUsageState;
apiUsageState = new ApiUsageState(systemContext.getApiUsageStateService().getApiUsageState(tenantId));
if (old.isReExecEnabled() && !apiUsageState.isReExecEnabled()) {
log.info("[{}] Received API state update. Going to DISABLE Rule Engine execution.", tenantId);
destroyRuleChains();
} else if (!old.isReExecEnabled() && apiUsageState.isReExecEnabled()) {
log.info("[{}] Received API state update. Going to ENABLE Rule Engine execution.", tenantId);
initRuleChains();
}
}
if (isRuleEngineForCurrentTenant) {
TbActorRef target = getEntityActorRef(msg.getEntityId());
if (target != null) {

View File

@ -15,6 +15,15 @@
*/
package org.thingsboard.server.service.apiusage;
import lombok.Getter;
public enum ApiFeature {
TRANSPORT, DB, RE, JS
TRANSPORT("transportApiState"), DB("dbApiState"), RE("ruleEngineApiState"), JS("jsExecutionApiState");
@Getter
private final String apiStateKey;
ApiFeature(String apiStateKey) {
this.apiStateKey = apiStateKey;
}
}

View File

@ -15,18 +15,24 @@
*/
package org.thingsboard.server.service.apiusage;
import com.google.common.util.concurrent.FutureCallback;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.util.Pair;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.ApiUsageRecordKey;
import org.thingsboard.server.common.data.ApiUsageState;
import org.thingsboard.server.common.data.TenantProfile;
import org.thingsboard.server.common.data.id.ApiUsageStateId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.TenantProfileId;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.BooleanDataEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.tenant.profile.TenantProfileConfiguration;
import org.thingsboard.server.common.data.tenant.profile.TenantProfileData;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.tools.SchedulerUtils;
@ -34,7 +40,6 @@ import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.dao.usagerecord.ApiUsageStateService;
import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
import org.thingsboard.server.gen.transport.TransportProtos.UsageStatsKVProto;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.PartitionChangeEvent;
import org.thingsboard.server.queue.discovery.PartitionService;
@ -42,6 +47,7 @@ import org.thingsboard.server.queue.scheduler.SchedulerComponent;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.profile.TbTenantProfileCache;
import org.thingsboard.server.service.queue.TbClusterService;
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
@ -60,11 +66,23 @@ import java.util.concurrent.locks.ReentrantLock;
@Service
public class DefaultTbApiUsageStateService implements TbApiUsageStateService {
public static final String HOURLY = "HOURLY_";
public static final String HOURLY = "Hourly";
public static final FutureCallback<Void> VOID_CALLBACK = new FutureCallback<Void>() {
@Override
public void onSuccess(@Nullable Void result) {
}
@Override
public void onFailure(Throwable t) {
}
};
private final TbClusterService clusterService;
private final PartitionService partitionService;
private final ApiUsageStateService apiUsageStateService;
private final TimeseriesService tsService;
private final TelemetrySubscriptionService tsWsService;
private final SchedulerComponent scheduler;
private final TbTenantProfileCache tenantProfileCache;
@ -84,13 +102,14 @@ public class DefaultTbApiUsageStateService implements TbApiUsageStateService {
public DefaultTbApiUsageStateService(TbClusterService clusterService,
PartitionService partitionService,
ApiUsageStateService apiUsageStateService,
TimeseriesService tsService,
TimeseriesService tsService, TelemetrySubscriptionService tsWsService,
SchedulerComponent scheduler,
TbTenantProfileCache tenantProfileCache) {
this.clusterService = clusterService;
this.partitionService = partitionService;
this.apiUsageStateService = apiUsageStateService;
this.tsService = tsService;
this.tsWsService = tsWsService;
this.scheduler = scheduler;
this.tenantProfileCache = tenantProfileCache;
}
@ -122,9 +141,9 @@ public class DefaultTbApiUsageStateService implements TbApiUsageStateService {
for (UsageStatsKVProto kvProto : statsMsg.getValuesList()) {
ApiUsageRecordKey recordKey = ApiUsageRecordKey.valueOf(kvProto.getKey());
long newValue = tenantState.add(recordKey, kvProto.getValue());
updatedEntries.add(new BasicTsKvEntry(ts, new LongDataEntry(recordKey.name(), newValue)));
updatedEntries.add(new BasicTsKvEntry(ts, new LongDataEntry(recordKey.getApiCountKey(), newValue)));
long newHourlyValue = tenantState.addToHourly(recordKey, kvProto.getValue());
updatedEntries.add(new BasicTsKvEntry(hourTs, new LongDataEntry(HOURLY + recordKey.name(), newHourlyValue)));
updatedEntries.add(new BasicTsKvEntry(hourTs, new LongDataEntry(recordKey.getApiCountKey() + HOURLY, newHourlyValue)));
Pair<ApiFeature, Boolean> update = tenantState.checkStateUpdatedDueToThreshold(recordKey);
if (update != null) {
result.put(update.getFirst(), update.getSecond());
@ -133,10 +152,11 @@ public class DefaultTbApiUsageStateService implements TbApiUsageStateService {
} finally {
updateLock.unlock();
}
tsService.save(tenantId, tenantState.getApiUsageState().getId(), updatedEntries, 0L);
tsWsService.saveAndNotify(tenantId, tenantState.getApiUsageState().getId(), updatedEntries, 0L, VOID_CALLBACK);
if (!result.isEmpty()) {
persistAndNotify(tenantState, result);
}
callback.onSuccess();
}
@Override
@ -149,27 +169,38 @@ public class DefaultTbApiUsageStateService implements TbApiUsageStateService {
@Override
public ApiUsageState getApiUsageState(TenantId tenantId) {
if (partitionService.resolve(ServiceType.TB_CORE, tenantId, tenantId).isMyPartition()) {
TenantApiUsageState state = getOrFetchState(tenantId);
return state.getApiUsageState();
TenantApiUsageState tenantState = myTenantStates.get(tenantId);
if (tenantState != null) {
return tenantState.getApiUsageState();
} else {
ApiUsageState state = otherTenantStates.get(tenantId);
if (state == null) {
updateLock.lock();
try {
state = otherTenantStates.get(tenantId);
if (state == null) {
state = apiUsageStateService.findTenantApiUsageState(tenantId);
otherTenantStates.put(tenantId, state);
if (state != null) {
return state;
} else {
if (partitionService.resolve(ServiceType.TB_CORE, tenantId, tenantId).isMyPartition()) {
return getOrFetchState(tenantId).getApiUsageState();
} else {
updateLock.lock();
try {
state = otherTenantStates.get(tenantId);
if (state == null) {
state = apiUsageStateService.findTenantApiUsageState(tenantId);
otherTenantStates.put(tenantId, state);
}
} finally {
updateLock.unlock();
}
} finally {
updateLock.unlock();
return state;
}
}
return state;
}
}
@Override
public void onApiUsageStateUpdate(TenantId tenantId) {
otherTenantStates.remove(tenantId);
}
@Override
public void onTenantProfileUpdate(TenantProfileId tenantProfileId) {
TenantProfile tenantProfile = tenantProfileCache.get(tenantProfileId);
@ -199,29 +230,38 @@ public class DefaultTbApiUsageStateService implements TbApiUsageStateService {
}
}
@Override
public void onApiUsageStateUpdate(TenantId tenantId) {
}
private void updateTenantState(TenantApiUsageState state, TenantProfile tenantProfile) {
TenantProfileData oldProfileData = state.getTenantProfileData();
state.setTenantProfileData(tenantProfile.getProfileData());
Map<ApiFeature, Boolean> result = state.checkStateUpdatedDueToThresholds();
if (!result.isEmpty()) {
persistAndNotify(state, result);
}
updateProfileThresholds(state.getTenantId(), state.getApiUsageState().getId(),
oldProfileData.getConfiguration(), tenantProfile.getProfileData().getConfiguration());
}
private void updateProfileThresholds(TenantId tenantId, ApiUsageStateId id,
TenantProfileConfiguration oldData, TenantProfileConfiguration newData) {
long ts = System.currentTimeMillis();
List<TsKvEntry> profileThresholds = new ArrayList<>();
for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) {
if (oldData == null || oldData.getProfileThreshold(key) != newData.getProfileThreshold(key)) {
profileThresholds.add(new BasicTsKvEntry(ts, new LongDataEntry(key.getApiLimitKey(), newData.getProfileThreshold(key))));
}
}
if (!profileThresholds.isEmpty()) {
tsWsService.saveAndNotify(tenantId, id, profileThresholds, 0L, VOID_CALLBACK);
}
}
private void persistAndNotify(TenantApiUsageState state, Map<ApiFeature, Boolean> result) {
// TODO:
// 1. Broadcast to everyone notifications about enabled/disabled features.
// 2. Report rule engine and js executor metrics
// 4. UI for configuration of the thresholds
// 5. Max rule node executions per message.
apiUsageStateService.update(state.getApiUsageState());
if (result.containsKey(ApiFeature.TRANSPORT)) {
clusterService.onApiStateChange(state.getApiUsageState(), null);
}
clusterService.onApiStateChange(state.getApiUsageState(), null);
long ts = System.currentTimeMillis();
List<TsKvEntry> stateTelemetry = new ArrayList<>();
result.forEach(((apiFeature, aState) -> stateTelemetry.add(new BasicTsKvEntry(ts, new BooleanDataEntry(apiFeature.getApiStateKey(), aState)))));
tsWsService.saveAndNotify(state.getTenantId(), state.getApiUsageState().getId(), stateTelemetry, 0L, VOID_CALLBACK);
}
private void checkStartOfNextCycle() {
@ -252,15 +292,15 @@ public class DefaultTbApiUsageStateService implements TbApiUsageStateService {
TenantProfile tenantProfile = tenantProfileCache.get(tenantId);
tenantState = new TenantApiUsageState(tenantProfile, dbStateEntity);
try {
List<TsKvEntry> dbValues = tsService.findAllLatest(tenantId, dbStateEntity.getEntityId()).get();
List<TsKvEntry> dbValues = tsService.findAllLatest(tenantId, dbStateEntity.getId()).get();
for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) {
boolean cycleEntryFound = false;
boolean hourlyEntryFound = false;
for (TsKvEntry tsKvEntry : dbValues) {
if (tsKvEntry.getKey().equals(key.name())) {
if (tsKvEntry.getKey().equals(key.getApiCountKey())) {
cycleEntryFound = true;
tenantState.put(key, tsKvEntry.getLongValue().get());
} else if (tsKvEntry.getKey().equals(HOURLY + key.name())) {
} else if (tsKvEntry.getKey().equals(key.getApiCountKey() + HOURLY)) {
hourlyEntryFound = true;
if (tsKvEntry.getTs() == tenantState.getCurrentHourTs()) {
tenantState.putHourly(key, tsKvEntry.getLongValue().get());

View File

@ -21,11 +21,10 @@ import org.springframework.data.util.Pair;
import org.thingsboard.server.common.data.ApiUsageRecordKey;
import org.thingsboard.server.common.data.ApiUsageState;
import org.thingsboard.server.common.data.TenantProfile;
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
import org.thingsboard.server.common.data.tenant.profile.TenantProfileData;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.TenantProfileId;
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
import org.thingsboard.server.common.data.tenant.profile.TenantProfileData;
import org.thingsboard.server.common.msg.tools.SchedulerUtils;
import java.util.HashMap;
@ -101,30 +100,13 @@ public class TenantApiUsageState {
}
public long getProfileThreshold(ApiUsageRecordKey key) {
DefaultTenantProfileConfiguration config = (DefaultTenantProfileConfiguration) tenantProfileData.getConfiguration();
switch (key) {
case TRANSPORT_MSG_COUNT:
return config.getMaxTransportMessages();
case TRANSPORT_DP_COUNT:
return config.getMaxTransportDataPoints();
case JS_EXEC_COUNT:
return config.getMaxJSExecutions();
case RE_EXEC_COUNT:
return config.getMaxREExecutions();
case STORAGE_DP_COUNT:
return config.getMaxDPStorageDays();
}
return 0L;
return tenantProfileData.getConfiguration().getProfileThreshold(key);
}
public TenantId getTenantId() {
return apiUsageState.getTenantId();
}
public EntityId getEntityId() {
return apiUsageState.getEntityId();
}
public boolean isTransportEnabled() {
return apiUsageState.isTransportEnabled();
}

View File

@ -129,8 +129,7 @@ public class DefaultSystemDataLoaderService implements SystemDataLoaderService {
tenantProfileService.findOrCreateDefaultTenantProfile(TenantId.SYS_TENANT_ID);
TenantProfileData tenantProfileData = new TenantProfileData();
DefaultTenantProfileConfiguration configuration = new DefaultTenantProfileConfiguration();
tenantProfileData.setConfiguration(configuration);
tenantProfileData.setConfiguration(new DefaultTenantProfileConfiguration());
TenantProfile isolatedTbCoreProfile = new TenantProfile();
isolatedTbCoreProfile.setDefault(false);

View File

@ -21,7 +21,8 @@ import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.ApiUsageRecordKey;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.queue.usagestats.TbUsageStatsClient;
import org.thingsboard.server.queue.usagestats.TbApiUsageClient;
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
import java.util.Map;
import java.util.UUID;
@ -36,13 +37,15 @@ import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public abstract class AbstractJsInvokeService implements JsInvokeService {
private final TbUsageStatsClient apiUsageStatsClient;
private final TbApiUsageStateService apiUsageStateService;
private final TbApiUsageClient apiUsageClient;
protected ScheduledExecutorService timeoutExecutorService;
protected Map<UUID, String> scriptIdToNameMap = new ConcurrentHashMap<>();
protected Map<UUID, BlackListInfo> blackListedFunctions = new ConcurrentHashMap<>();
protected Map<UUID, DisableListInfo> disabledFunctions = new ConcurrentHashMap<>();
protected AbstractJsInvokeService(TbUsageStatsClient apiUsageStatsClient) {
this.apiUsageStatsClient = apiUsageStatsClient;
protected AbstractJsInvokeService(TbApiUsageStateService apiUsageStateService, TbApiUsageClient apiUsageClient) {
this.apiUsageStateService = apiUsageStateService;
this.apiUsageClient = apiUsageClient;
}
public void init(long maxRequestsTimeout) {
@ -59,24 +62,32 @@ public abstract class AbstractJsInvokeService implements JsInvokeService {
@Override
public ListenableFuture<UUID> eval(TenantId tenantId, JsScriptType scriptType, String scriptBody, String... argNames) {
UUID scriptId = UUID.randomUUID();
String functionName = "invokeInternal_" + scriptId.toString().replace('-', '_');
String jsScript = generateJsScript(scriptType, functionName, scriptBody, argNames);
return doEval(scriptId, functionName, jsScript);
if (apiUsageStateService.getApiUsageState(tenantId).isJsExecEnabled()) {
UUID scriptId = UUID.randomUUID();
String functionName = "invokeInternal_" + scriptId.toString().replace('-', '_');
String jsScript = generateJsScript(scriptType, functionName, scriptBody, argNames);
return doEval(scriptId, functionName, jsScript);
} else {
return Futures.immediateFailedFuture(new RuntimeException("JS Execution is disabled due to API limits!"));
}
}
@Override
public ListenableFuture<Object> invokeFunction(TenantId tenantId, UUID scriptId, Object... args) {
String functionName = scriptIdToNameMap.get(scriptId);
if (functionName == null) {
return Futures.immediateFailedFuture(new RuntimeException("No compiled script found for scriptId: [" + scriptId + "]!"));
}
if (!isBlackListed(scriptId)) {
apiUsageStatsClient.report(tenantId, ApiUsageRecordKey.JS_EXEC_COUNT, 1);
return doInvokeFunction(scriptId, functionName, args);
if (apiUsageStateService.getApiUsageState(tenantId).isJsExecEnabled()) {
String functionName = scriptIdToNameMap.get(scriptId);
if (functionName == null) {
return Futures.immediateFailedFuture(new RuntimeException("No compiled script found for scriptId: [" + scriptId + "]!"));
}
if (!isDisabled(scriptId)) {
apiUsageClient.report(tenantId, ApiUsageRecordKey.JS_EXEC_COUNT, 1);
return doInvokeFunction(scriptId, functionName, args);
} else {
return Futures.immediateFailedFuture(
new RuntimeException("Script invocation is blocked due to maximum error count " + getMaxErrors() + "!"));
}
} else {
return Futures.immediateFailedFuture(
new RuntimeException("Script is blacklisted due to maximum error count " + getMaxErrors() + "!"));
return Futures.immediateFailedFuture(new RuntimeException("JS Execution is disabled due to API limits!"));
}
}
@ -86,7 +97,7 @@ public abstract class AbstractJsInvokeService implements JsInvokeService {
if (functionName != null) {
try {
scriptIdToNameMap.remove(scriptId);
blackListedFunctions.remove(scriptId);
disabledFunctions.remove(scriptId);
doRelease(scriptId, functionName);
} catch (Exception e) {
return Futures.immediateFailedFuture(e);
@ -106,7 +117,7 @@ public abstract class AbstractJsInvokeService implements JsInvokeService {
protected abstract long getMaxBlacklistDuration();
protected void onScriptExecutionError(UUID scriptId) {
blackListedFunctions.computeIfAbsent(scriptId, key -> new BlackListInfo()).incrementAndGet();
disabledFunctions.computeIfAbsent(scriptId, key -> new DisableListInfo()).incrementAndGet();
}
private String generateJsScript(JsScriptType scriptType, String functionName, String scriptBody, String... argNames) {
@ -116,11 +127,11 @@ public abstract class AbstractJsInvokeService implements JsInvokeService {
throw new RuntimeException("No script factory implemented for scriptType: " + scriptType);
}
private boolean isBlackListed(UUID scriptId) {
BlackListInfo errorCount = blackListedFunctions.get(scriptId);
private boolean isDisabled(UUID scriptId) {
DisableListInfo errorCount = disabledFunctions.get(scriptId);
if (errorCount != null) {
if (errorCount.getExpirationTime() <= System.currentTimeMillis()) {
blackListedFunctions.remove(scriptId);
disabledFunctions.remove(scriptId);
return false;
} else {
return errorCount.get() >= getMaxErrors();
@ -130,11 +141,11 @@ public abstract class AbstractJsInvokeService implements JsInvokeService {
}
}
private class BlackListInfo {
private class DisableListInfo {
private final AtomicInteger counter;
private long expirationTime;
private BlackListInfo() {
private DisableListInfo() {
this.counter = new AtomicInteger(0);
}

View File

@ -24,10 +24,10 @@ import delight.nashornsandbox.NashornSandboxes;
import jdk.nashorn.api.scripting.NashornScriptEngineFactory;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.thingsboard.server.queue.usagestats.TbUsageStatsClient;
import org.thingsboard.server.queue.usagestats.TbApiUsageClient;
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@ -65,8 +65,8 @@ public abstract class AbstractNashornJsInvokeService extends AbstractJsInvokeSer
@Value("${js.local.stats.enabled:false}")
private boolean statsEnabled;
public AbstractNashornJsInvokeService(TbUsageStatsClient apiUsageStatsClient, JsExecutorService jsExecutor) {
super(apiUsageStatsClient);
public AbstractNashornJsInvokeService(TbApiUsageStateService apiUsageStateService, TbApiUsageClient apiUsageClient, JsExecutorService jsExecutor) {
super(apiUsageStateService, apiUsageClient);
this.jsExecutor = jsExecutor;
}

View File

@ -19,7 +19,8 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import org.thingsboard.server.queue.usagestats.TbUsageStatsClient;
import org.thingsboard.server.queue.usagestats.TbApiUsageClient;
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
import java.util.concurrent.TimeUnit;
@ -43,8 +44,8 @@ public class NashornJsInvokeService extends AbstractNashornJsInvokeService {
@Value("${js.local.max_black_list_duration_sec:60}")
private int maxBlackListDurationSec;
public NashornJsInvokeService(TbUsageStatsClient apiUsageStatsClient, JsExecutorService jsExecutor) {
super(apiUsageStatsClient, jsExecutor);
public NashornJsInvokeService(TbApiUsageStateService apiUsageStateService, TbApiUsageClient apiUsageClient, JsExecutorService jsExecutor) {
super(apiUsageStateService, apiUsageClient, jsExecutor);
}
@Override

View File

@ -30,7 +30,8 @@ import org.thingsboard.server.gen.js.JsInvokeProtos;
import org.thingsboard.server.queue.TbQueueRequestTemplate;
import org.thingsboard.server.queue.common.TbProtoJsQueueMsg;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.usagestats.TbUsageStatsClient;
import org.thingsboard.server.queue.usagestats.TbApiUsageClient;
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
@ -69,8 +70,8 @@ public class RemoteJsInvokeService extends AbstractJsInvokeService {
private final AtomicInteger queueFailedMsgs = new AtomicInteger(0);
private final AtomicInteger queueTimeoutMsgs = new AtomicInteger(0);
public RemoteJsInvokeService(TbUsageStatsClient apiUsageStatsClient) {
super(apiUsageStatsClient);
public RemoteJsInvokeService(TbApiUsageStateService apiUsageStateService, TbApiUsageClient apiUsageClient) {
super(apiUsageStateService, apiUsageClient);
}
@Scheduled(fixedDelayString = "${js.remote.stats.print_interval_ms}")

View File

@ -15,12 +15,24 @@
*/
package org.thingsboard.server.common.data;
import lombok.Getter;
public enum ApiUsageRecordKey {
TRANSPORT_MSG_COUNT,
TRANSPORT_DP_COUNT,
STORAGE_DP_COUNT,
RE_EXEC_COUNT,
JS_EXEC_COUNT
TRANSPORT_MSG_COUNT("transportMsgCount", "transportMsgLimit"),
TRANSPORT_DP_COUNT("transportDataPointsCount", "transportDataPointsLimit"),
STORAGE_DP_COUNT("storageDataPointsCount", "storageDataPointsLimit"),
RE_EXEC_COUNT("ruleEngineExecutionCount", "ruleEngineExecutionLimit"),
JS_EXEC_COUNT("jsExecutionCount", "jsExecutionLimit");
@Getter
private final String apiCountKey;
@Getter
private final String apiLimitKey;
ApiUsageRecordKey(String apiCountKey, String apiLimitKey) {
this.apiCountKey = apiCountKey;
this.apiLimitKey = apiLimitKey;
}
}

View File

@ -16,6 +16,7 @@
package org.thingsboard.server.common.data.tenant.profile;
import lombok.Data;
import org.thingsboard.server.common.data.ApiUsageRecordKey;
import org.thingsboard.server.common.data.TenantProfileType;
@Data
@ -38,6 +39,24 @@ public class DefaultTenantProfileConfiguration implements TenantProfileConfigura
private long maxDPStorageDays;
private int maxRuleNodeExecutionsPerMessage;
@Override
public long getProfileThreshold(ApiUsageRecordKey key) {
switch (key) {
case TRANSPORT_MSG_COUNT:
return maxTransportMessages;
case TRANSPORT_DP_COUNT:
return maxTransportDataPoints;
case JS_EXEC_COUNT:
return maxJSExecutions;
case RE_EXEC_COUNT:
return maxREExecutions;
case STORAGE_DP_COUNT:
return maxDPStorageDays;
}
return 0L;
}
@Override
public TenantProfileType getType() {
return TenantProfileType.DEFAULT;

View File

@ -19,6 +19,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.thingsboard.server.common.data.ApiUsageRecordKey;
import org.thingsboard.server.common.data.TenantProfileType;
@JsonIgnoreProperties(ignoreUnknown = true)
@ -33,4 +34,7 @@ public interface TenantProfileConfiguration {
@JsonIgnore
TenantProfileType getType();
@JsonIgnore
long getProfileThreshold(ApiUsageRecordKey key);
}

View File

@ -40,7 +40,7 @@ import java.util.concurrent.atomic.AtomicLong;
@Component
@Slf4j
public class DefaultTbUsageStatsClient implements TbUsageStatsClient {
public class DefaultTbApiUsageClient implements TbApiUsageClient {
@Value("${usage.stats.report.enabled:true}")
private boolean enabled;
@ -53,7 +53,7 @@ public class DefaultTbUsageStatsClient implements TbUsageStatsClient {
private final TbQueueProducerProvider producerProvider;
private TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> msgProducer;
public DefaultTbUsageStatsClient(PartitionService partitionService, SchedulerComponent scheduler, TbQueueProducerProvider producerProvider) {
public DefaultTbApiUsageClient(PartitionService partitionService, SchedulerComponent scheduler, TbQueueProducerProvider producerProvider) {
this.partitionService = partitionService;
this.scheduler = scheduler;
this.producerProvider = producerProvider;
@ -109,4 +109,8 @@ public class DefaultTbUsageStatsClient implements TbUsageStatsClient {
}
}
@Override
public void report(TenantId tenantId, ApiUsageRecordKey key) {
report(tenantId, key, 1L);
}
}

View File

@ -18,9 +18,10 @@ package org.thingsboard.server.queue.usagestats;
import org.thingsboard.server.common.data.ApiUsageRecordKey;
import org.thingsboard.server.common.data.id.TenantId;
public interface TbUsageStatsClient {
public interface TbApiUsageClient {
void report(TenantId tenantId, ApiUsageRecordKey key, long value);
void report(TenantId tenantId, ApiUsageRecordKey key);
}

View File

@ -78,7 +78,7 @@ import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
import org.thingsboard.server.queue.provider.TbTransportQueueFactory;
import org.thingsboard.server.queue.scheduler.SchedulerComponent;
import org.thingsboard.server.queue.usagestats.TbUsageStatsClient;
import org.thingsboard.server.queue.usagestats.TbApiUsageClient;
import org.thingsboard.server.queue.util.TbTransportComponent;
import javax.annotation.PostConstruct;
@ -123,7 +123,7 @@ public class DefaultTransportService implements TransportService {
private final StatsFactory statsFactory;
private final TransportDeviceProfileCache deviceProfileCache;
private final TransportTenantProfileCache tenantProfileCache;
private final TbUsageStatsClient apiUsageStatsClient;
private final TbApiUsageClient apiUsageClient;
private final TransportRateLimitService rateLimitService;
private final DataDecodingEncodingService dataDecodingEncodingService;
private final SchedulerComponent scheduler;
@ -152,7 +152,7 @@ public class DefaultTransportService implements TransportService {
StatsFactory statsFactory,
TransportDeviceProfileCache deviceProfileCache,
TransportTenantProfileCache tenantProfileCache,
TbUsageStatsClient apiUsageStatsClient, TransportRateLimitService rateLimitService,
TbApiUsageClient apiUsageClient, TransportRateLimitService rateLimitService,
DataDecodingEncodingService dataDecodingEncodingService, SchedulerComponent scheduler) {
this.serviceInfoProvider = serviceInfoProvider;
this.queueProvider = queueProvider;
@ -161,7 +161,7 @@ public class DefaultTransportService implements TransportService {
this.statsFactory = statsFactory;
this.deviceProfileCache = deviceProfileCache;
this.tenantProfileCache = tenantProfileCache;
this.apiUsageStatsClient = apiUsageStatsClient;
this.apiUsageClient = apiUsageClient;
this.rateLimitService = rateLimitService;
this.dataDecodingEncodingService = dataDecodingEncodingService;
this.scheduler = scheduler;
@ -647,6 +647,7 @@ public class DefaultTransportService implements TransportService {
if (stateOpt.isPresent()) {
ApiUsageState apiUsageState = stateOpt.get();
rateLimitService.update(apiUsageState.getTenantId(), apiUsageState.isTransportEnabled());
//TODO: if transport is disabled, we should close all sessions and not to check credentials.
}
}
} else if (toSessionMsg.hasEntityDeleteMsg()) {
@ -821,8 +822,8 @@ public class DefaultTransportService implements TransportService {
@Override
public void onSuccess(T msg) {
try {
apiUsageStatsClient.report(tenantId, ApiUsageRecordKey.TRANSPORT_MSG_COUNT, 1);
apiUsageStatsClient.report(tenantId, ApiUsageRecordKey.TRANSPORT_DP_COUNT, dataPoints);
apiUsageClient.report(tenantId, ApiUsageRecordKey.TRANSPORT_MSG_COUNT, 1);
apiUsageClient.report(tenantId, ApiUsageRecordKey.TRANSPORT_DP_COUNT, dataPoints);
} finally {
callback.onSuccess(msg);
}

View File

@ -51,4 +51,5 @@ public interface TenantProfileRepository extends PagingAndSortingRepository<Tena
"FROM TenantProfileEntity t " +
"WHERE t.isDefault = true")
EntityInfo findDefaultTenantProfileInfo();
}

View File

@ -37,4 +37,5 @@ public interface TenantProfileDao extends Dao<TenantProfile> {
TenantProfile findDefaultTenantProfile(TenantId tenantId);
EntityInfo findDefaultTenantProfileInfo(TenantId tenantId);
}

View File

@ -17,15 +17,26 @@ package org.thingsboard.server.dao.usagerecord;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.ApiUsageRecordKey;
import org.thingsboard.server.common.data.ApiUsageState;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.ApiUsageState;
import org.thingsboard.server.common.data.TenantProfile;
import org.thingsboard.server.common.data.id.ApiUsageStateId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.tenant.profile.TenantProfileConfiguration;
import org.thingsboard.server.dao.entity.AbstractEntityService;
import org.thingsboard.server.dao.exception.DataValidationException;
import org.thingsboard.server.dao.service.DataValidator;
import org.thingsboard.server.dao.tenant.TenantDao;
import org.thingsboard.server.dao.tenant.TenantProfileDao;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import java.util.ArrayList;
import java.util.List;
import static org.thingsboard.server.dao.service.Validator.validateId;
@ -35,11 +46,15 @@ public class ApiUsageStateServiceImpl extends AbstractEntityService implements A
public static final String INCORRECT_TENANT_ID = "Incorrect tenantId ";
private final ApiUsageStateDao apiUsageStateDao;
private final TenantProfileDao tenantProfileDao;
private final TenantDao tenantDao;
private final TimeseriesService tsService;
public ApiUsageStateServiceImpl(TenantDao tenantDao, ApiUsageStateDao apiUsageStateDao) {
public ApiUsageStateServiceImpl(TenantDao tenantDao, ApiUsageStateDao apiUsageStateDao, TenantProfileDao tenantProfileDao, TimeseriesService tsService) {
this.tenantDao = tenantDao;
this.apiUsageStateDao = apiUsageStateDao;
this.tenantProfileDao = tenantProfileDao;
this.tsService = tsService;
}
@Override
@ -57,7 +72,18 @@ public class ApiUsageStateServiceImpl extends AbstractEntityService implements A
apiUsageState.setTenantId(tenantId);
apiUsageState.setEntityId(tenantId);
apiUsageStateValidator.validate(apiUsageState, ApiUsageState::getTenantId);
return apiUsageStateDao.save(apiUsageState.getTenantId(), apiUsageState);
ApiUsageState saved = apiUsageStateDao.save(apiUsageState.getTenantId(), apiUsageState);
Tenant tenant = tenantDao.findById(tenantId, tenantId.getId());
TenantProfile tenantProfile = tenantProfileDao.findById(tenantId, tenant.getTenantProfileId().getId());
TenantProfileConfiguration configuration = tenantProfile.getProfileData().getConfiguration();
List<TsKvEntry> profileThresholds = new ArrayList<>();
for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) {
profileThresholds.add(new BasicTsKvEntry(saved.getCreatedTime(), new LongDataEntry(key.getApiLimitKey(), configuration.getProfileThreshold(key))));
}
tsService.save(tenantId, saved.getId(), profileThresholds, 0L);
return saved;
}
@Override
@ -80,7 +106,8 @@ public class ApiUsageStateServiceImpl extends AbstractEntityService implements A
log.trace("Executing findApiUsageStateById, tenantId [{}], apiUsageStateId [{}]", tenantId, id);
validateId(tenantId, INCORRECT_TENANT_ID + tenantId);
validateId(id, "Incorrect apiUsageStateId " + id);
return apiUsageStateDao.findById(tenantId, id.getId()); }
return apiUsageStateDao.findById(tenantId, id.getId());
}
private DataValidator<ApiUsageState> apiUsageStateValidator =
new DataValidator<ApiUsageState>() {