diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java index 573310f7cd..9fd33a91d7 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/RuleNodeActorMessageProcessor.java @@ -106,7 +106,7 @@ public class RuleNodeActorMessageProcessor extends ComponentMsgProcessor reply(new LocalRequestMetaData(rpcRequest, currentUser, result), fromDeviceRpcResponse)); + deviceRpcService.processRestApiRpcRequest(rpcRequest, fromDeviceRpcResponse -> reply(new LocalRequestMetaData(rpcRequest, currentUser, result), fromDeviceRpcResponse), currentUser); } @Override diff --git a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java index daca2d1a5d..d311cdbfed 100644 --- a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java +++ b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java @@ -48,6 +48,7 @@ import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.audit.ActionType; import org.thingsboard.server.common.data.exception.ThingsboardException; +import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityIdFactory; @@ -449,7 +450,7 @@ public class TelemetryController extends BaseController { TenantProfile tenantProfile = tenantProfileCache.get(tenantId); tenantTtl = TimeUnit.DAYS.toSeconds(((DefaultTenantProfileConfiguration) tenantProfile.getProfileData().getConfiguration()).getDefaultStorageTtlDays()); } - tsSubService.saveAndNotify(tenantId, entityId, entries, tenantTtl, new FutureCallback() { + tsSubService.saveAndNotify(tenantId, user.getCustomerId(), entityId, entries, tenantTtl, new FutureCallback() { @Override public void onSuccess(@Nullable Void tmp) { logTelemetryUpdated(user, entityId, entries, null); diff --git a/application/src/main/java/org/thingsboard/server/service/apiusage/BaseApiUsageState.java b/application/src/main/java/org/thingsboard/server/service/apiusage/BaseApiUsageState.java new file mode 100644 index 0000000000..37a2d986ac --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/apiusage/BaseApiUsageState.java @@ -0,0 +1,150 @@ +package org.thingsboard.server.service.apiusage; + +import lombok.Getter; +import org.springframework.data.util.Pair; +import org.thingsboard.server.common.data.ApiFeature; +import org.thingsboard.server.common.data.ApiUsageRecordKey; +import org.thingsboard.server.common.data.ApiUsageState; +import org.thingsboard.server.common.data.ApiUsageStateValue; +import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.tools.SchedulerUtils; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +public abstract class BaseApiUsageState { + private final Map currentCycleValues = new ConcurrentHashMap<>(); + private final Map currentHourValues = new ConcurrentHashMap<>(); + + @Getter + private final ApiUsageState apiUsageState; + @Getter + private volatile long currentCycleTs; + @Getter + private volatile long nextCycleTs; + @Getter + private volatile long currentHourTs; + + public BaseApiUsageState(ApiUsageState apiUsageState) { + this.apiUsageState = apiUsageState; + this.currentCycleTs = SchedulerUtils.getStartOfCurrentMonth(); + this.nextCycleTs = SchedulerUtils.getStartOfNextMonth(); + this.currentHourTs = SchedulerUtils.getStartOfCurrentHour(); + } + + public void put(ApiUsageRecordKey key, Long value) { + currentCycleValues.put(key, value); + } + + public void putHourly(ApiUsageRecordKey key, Long value) { + currentHourValues.put(key, value); + } + + public long add(ApiUsageRecordKey key, long value) { + long result = currentCycleValues.getOrDefault(key, 0L) + value; + currentCycleValues.put(key, result); + return result; + } + + public long get(ApiUsageRecordKey key) { + return currentCycleValues.getOrDefault(key, 0L); + } + + public long addToHourly(ApiUsageRecordKey key, long value) { + long result = currentHourValues.getOrDefault(key, 0L) + value; + currentHourValues.put(key, result); + return result; + } + + public void setHour(long currentHourTs) { + this.currentHourTs = currentHourTs; + for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) { + currentHourValues.put(key, 0L); + } + } + + public void setCycles(long currentCycleTs, long nextCycleTs) { + this.currentCycleTs = currentCycleTs; + this.nextCycleTs = nextCycleTs; + for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) { + currentCycleValues.put(key, 0L); + } + } + + public ApiUsageStateValue getFeatureValue(ApiFeature feature) { + switch (feature) { + case TRANSPORT: + return apiUsageState.getTransportState(); + case RE: + return apiUsageState.getReExecState(); + case DB: + return apiUsageState.getDbStorageState(); + case JS: + return apiUsageState.getJsExecState(); + case EMAIL: + return apiUsageState.getEmailExecState(); + case SMS: + return apiUsageState.getSmsExecState(); + default: + return ApiUsageStateValue.ENABLED; + } + } + + public boolean setFeatureValue(ApiFeature feature, ApiUsageStateValue value) { + ApiUsageStateValue currentValue = getFeatureValue(feature); + switch (feature) { + case TRANSPORT: + apiUsageState.setTransportState(value); + break; + case RE: + apiUsageState.setReExecState(value); + break; + case DB: + apiUsageState.setDbStorageState(value); + break; + case JS: + apiUsageState.setJsExecState(value); + break; + case EMAIL: + apiUsageState.setEmailExecState(value); + break; + case SMS: + apiUsageState.setSmsExecState(value); + break; + } + return !currentValue.equals(value); + } + + public Map checkStateUpdatedDueToThresholds() { + return checkStateUpdatedDueToThreshold(new HashSet<>(Arrays.asList(ApiFeature.values()))); + } + + public Map checkStateUpdatedDueToThreshold(Set features) { + Map result = new HashMap<>(); + for (ApiFeature feature : features) { + Pair tmp = checkStateUpdatedDueToThreshold(feature); + if (tmp != null) { + result.put(tmp.getFirst(), tmp.getSecond()); + } + } + return result; + } + + public abstract Pair checkStateUpdatedDueToThreshold(ApiFeature feature); + + public abstract EntityType getEntityType(); + + public TenantId getTenantId() { + return getApiUsageState().getTenantId(); + } + + public EntityId getEntityId() { + return getApiUsageState().getEntityId(); + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/apiusage/CustomerApiUsageState.java b/application/src/main/java/org/thingsboard/server/service/apiusage/CustomerApiUsageState.java new file mode 100644 index 0000000000..76b3e63fc6 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/apiusage/CustomerApiUsageState.java @@ -0,0 +1,24 @@ +package org.thingsboard.server.service.apiusage; + +import org.springframework.data.util.Pair; +import org.thingsboard.server.common.data.ApiFeature; +import org.thingsboard.server.common.data.ApiUsageState; +import org.thingsboard.server.common.data.ApiUsageStateValue; +import org.thingsboard.server.common.data.EntityType; + +public class CustomerApiUsageState extends BaseApiUsageState { + public CustomerApiUsageState(ApiUsageState apiUsageState) { + super(apiUsageState); + } + + @Override + public Pair checkStateUpdatedDueToThreshold(ApiFeature feature) { + ApiUsageStateValue featureValue = ApiUsageStateValue.ENABLED; + return setFeatureValue(feature, featureValue) ? Pair.of(feature, featureValue) : null; + } + + @Override + public EntityType getEntityType() { + return EntityType.CUSTOMER; + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java b/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java index c2549803e5..fb77a65255 100644 --- a/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java @@ -29,10 +29,13 @@ import org.thingsboard.server.common.data.ApiUsageRecordKey; import org.thingsboard.server.common.data.ApiUsageState; import org.thingsboard.server.common.data.ApiUsageStateMailMessage; import org.thingsboard.server.common.data.ApiUsageStateValue; +import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.common.data.id.ApiUsageStateId; +import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantProfileId; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; @@ -66,6 +69,7 @@ import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -106,9 +110,9 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener myTenantStates = new ConcurrentHashMap<>(); + private final Map myUsageStates = new ConcurrentHashMap<>(); // Tenants that should be processed on other servers - private final Map otherTenantStates = new ConcurrentHashMap<>(); + private final Map otherUsageStates = new ConcurrentHashMap<>(); @Value("${usage.stats.report.enabled:true}") private boolean enabled; @@ -151,60 +155,73 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener msg, TbCallback callback) { ToUsageStatsServiceMsg statsMsg = msg.getValue(); TenantId tenantId = new TenantId(new UUID(statsMsg.getTenantIdMSB(), statsMsg.getTenantIdLSB())); - - if (tenantProfileCache.get(tenantId) == null) { - return; + CustomerId customerId; + if (statsMsg.getCustomerIdMSB() != 0 && statsMsg.getCustomerIdLSB() != 0) { + customerId = new CustomerId(new UUID(statsMsg.getCustomerIdMSB(), statsMsg.getCustomerIdLSB())); + } else { + customerId = new CustomerId(EntityId.NULL_UUID); } - TenantApiUsageState tenantState; + processEntityUsageStats(tenantId, customerId.isNullUid() ? tenantId : customerId, statsMsg.getValuesList()); + callback.onSuccess(); + } + + private void processEntityUsageStats(TenantId tenantId, EntityId entityId, List values) { + if (tenantProfileCache.get(tenantId) == null) return; + + BaseApiUsageState usageState; List updatedEntries; Map result; + updateLock.lock(); try { - tenantState = getOrFetchState(tenantId); - long ts = tenantState.getCurrentCycleTs(); - long hourTs = tenantState.getCurrentHourTs(); + usageState = getOrFetchState(tenantId, entityId); + long ts = usageState.getCurrentCycleTs(); + long hourTs = usageState.getCurrentHourTs(); long newHourTs = SchedulerUtils.getStartOfCurrentHour(); if (newHourTs != hourTs) { - tenantState.setHour(newHourTs); + usageState.setHour(newHourTs); } updatedEntries = new ArrayList<>(ApiUsageRecordKey.values().length); Set apiFeatures = new HashSet<>(); - for (UsageStatsKVProto kvProto : statsMsg.getValuesList()) { + for (UsageStatsKVProto kvProto : values) { ApiUsageRecordKey recordKey = ApiUsageRecordKey.valueOf(kvProto.getKey()); - long newValue = tenantState.add(recordKey, kvProto.getValue()); + long newValue = usageState.add(recordKey, kvProto.getValue()); updatedEntries.add(new BasicTsKvEntry(ts, new LongDataEntry(recordKey.getApiCountKey(), newValue))); - long newHourlyValue = tenantState.addToHourly(recordKey, kvProto.getValue()); + long newHourlyValue = usageState.addToHourly(recordKey, kvProto.getValue()); updatedEntries.add(new BasicTsKvEntry(newHourTs, new LongDataEntry(recordKey.getApiCountKey() + HOURLY, newHourlyValue))); apiFeatures.add(recordKey.getApiFeature()); } - result = tenantState.checkStateUpdatedDueToThreshold(apiFeatures); + result = usageState.checkStateUpdatedDueToThreshold(apiFeatures); } finally { updateLock.unlock(); } - tsWsService.saveAndNotifyInternal(tenantId, tenantState.getApiUsageState().getId(), updatedEntries, VOID_CALLBACK); + tsWsService.saveAndNotifyInternal(tenantId, usageState.getApiUsageState().getId(), updatedEntries, VOID_CALLBACK); if (!result.isEmpty()) { - persistAndNotify(tenantState, result); + persistAndNotify(usageState, result); } - callback.onSuccess(); } @Override protected void onTbApplicationEvent(PartitionChangeEvent partitionChangeEvent) { if (partitionChangeEvent.getServiceType().equals(ServiceType.TB_CORE)) { - myTenantStates.entrySet().removeIf(entry -> !partitionService.resolve(ServiceType.TB_CORE, entry.getKey(), entry.getKey()).isMyPartition()); - otherTenantStates.entrySet().removeIf(entry -> partitionService.resolve(ServiceType.TB_CORE, entry.getKey(), entry.getKey()).isMyPartition()); + myUsageStates.entrySet().removeIf(entry -> { + return !partitionService.resolve(ServiceType.TB_CORE, entry.getValue().getTenantId(), entry.getKey()).isMyPartition(); + }); + otherUsageStates.entrySet().removeIf(entry -> { + return partitionService.resolve(ServiceType.TB_CORE, entry.getValue().getTenantId(), entry.getKey()).isMyPartition(); + }); initStatesFromDataBase(); } } @Override public ApiUsageState getApiUsageState(TenantId tenantId) { - TenantApiUsageState tenantState = myTenantStates.get(tenantId); + TenantApiUsageState tenantState = (TenantApiUsageState) myUsageStates.get(tenantId); if (tenantState != null) { return tenantState.getApiUsageState(); } else { - ApiUsageState state = otherTenantStates.get(tenantId); + ApiUsageState state = otherUsageStates.get(tenantId); if (state != null) { return state; } else { @@ -213,11 +230,11 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener { - if (tenantProfile.getId().equals(state.getTenantProfileId())) { - updateTenantState(state, tenantProfile); - } - }); + myUsageStates.values().stream() + .filter(state -> state.getEntityType() == EntityType.TENANT) + .map(state -> (TenantApiUsageState) state) + .forEach(state -> { + if (tenantProfile.getId().equals(state.getTenantProfileId())) { + updateTenantState(state, tenantProfile); + } + }); } finally { updateLock.unlock(); } @@ -256,7 +276,7 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener result) { - log.info("[{}] Detected update of the API state: {}", state.getTenantId(), result); + private void persistAndNotify(BaseApiUsageState state, Map result) { + log.info("[{}] Detected update of the API state for {}: {}", state.getEntityId(), state.getEntityType(), result); apiUsageStateService.update(state.getApiUsageState()); clusterService.onApiStateChange(state.getApiUsageState(), null); long ts = System.currentTimeMillis(); @@ -302,20 +322,21 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener stateTelemetry.add(new BasicTsKvEntry(ts, new StringDataEntry(apiFeature.getApiStateKey(), aState.name()))))); tsWsService.saveAndNotifyInternal(state.getTenantId(), state.getApiUsageState().getId(), stateTelemetry, VOID_CALLBACK); - String email = tenantService.findTenantById(state.getTenantId()).getEmail(); - - if (StringUtils.isNotEmpty(email)) { - result.forEach((apiFeature, stateValue) -> { - mailExecutor.submit(() -> { - try { - mailService.sendApiFeatureStateEmail(apiFeature, stateValue, email, createStateMailMessage(state, apiFeature, stateValue)); - } catch (ThingsboardException e) { - log.warn("[{}] Can't send update of the API state to tenant with provided email [{}]", state.getTenantId(), email, e); - } + if (state.getEntityType() == EntityType.TENANT) { + String email = tenantService.findTenantById(state.getTenantId()).getEmail(); + if (StringUtils.isNotEmpty(email)) { + result.forEach((apiFeature, stateValue) -> { + mailExecutor.submit(() -> { + try { + mailService.sendApiFeatureStateEmail(apiFeature, stateValue, email, createStateMailMessage((TenantApiUsageState) state, apiFeature, stateValue)); + } catch (ThingsboardException e) { + log.warn("[{}] Can't send update of the API state to tenant with provided email [{}]", state.getTenantId(), email, e); + } + }); }); - }); - } else { - log.warn("[{}] Can't send update of the API state to tenant with empty email!", state.getTenantId()); + } else { + log.warn("[{}] Can't send update of the API state to tenant with empty email!", state.getTenantId()); + } } } @@ -350,12 +371,13 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener { + myUsageStates.values().forEach(state -> { if ((state.getNextCycleTs() < now) && (now - state.getNextCycleTs() < TimeUnit.HOURS.toMillis(1))) { + // FIXME TenantId tenantId = state.getTenantId(); state.setCycles(state.getNextCycleTs(), SchedulerUtils.getStartOfNextNextMonth()); saveNewCounts(state, Arrays.asList(ApiUsageRecordKey.values())); - updateTenantState(state, tenantProfileCache.get(tenantId)); + updateTenantState((TenantApiUsageState) state, tenantProfileCache.get(tenantId)); } }); } finally { @@ -363,7 +385,7 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener keys) { + private void saveNewCounts(BaseApiUsageState state, List keys) { List counts = keys.stream() .map(key -> new BasicTsKvEntry(state.getCurrentCycleTs(), new LongDataEntry(key.getApiCountKey(), 0L))) .collect(Collectors.toList()); @@ -371,13 +393,74 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener { + try { + return apiUsageStateService.createDefaultApiUsageState(tenantId, entityId); + } catch (Exception e) { + return apiUsageStateService.findApiUsageStateByEntityId(entityId); + } + }); + + switch (entityId.getEntityType()) { + case TENANT: + TenantProfile tenantProfile = tenantProfileCache.get(tenantId); + state = new TenantApiUsageState(tenantProfile, storedState); + break; + case CUSTOMER: + default: + state = new CustomerApiUsageState(storedState); + break; + } + + List newCounts = new ArrayList<>(); + try { + List dbValues = tsService.findAllLatest(tenantId, storedState.getId()).get(); + for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) { + boolean cycleEntryFound = false; + boolean hourlyEntryFound = false; + for (TsKvEntry tsKvEntry : dbValues) { + if (tsKvEntry.getKey().equals(key.getApiCountKey())) { + cycleEntryFound = true; + + boolean oldCount = tsKvEntry.getTs() == state.getCurrentCycleTs(); + state.put(key, oldCount ? tsKvEntry.getLongValue().get() : 0L); + + if (!oldCount) { + newCounts.add(key); + } + } else if (tsKvEntry.getKey().equals(key.getApiCountKey() + HOURLY)) { + hourlyEntryFound = true; + state.putHourly(key, tsKvEntry.getTs() == state.getCurrentHourTs() ? tsKvEntry.getLongValue().get() : 0L); + } + if (cycleEntryFound && hourlyEntryFound) { + break; + } + } + } + log.debug("[{}] Initialized state: {}", entityId, storedState); + myUsageStates.put(entityId, state); + saveNewCounts(state, newCounts); + } catch (InterruptedException | ExecutionException e) { + log.warn("[{}] Failed to fetch api usage state from db.", tenantId, e); + } + + return state; + } + private TenantApiUsageState getOrFetchState(TenantId tenantId) { - TenantApiUsageState tenantState = myTenantStates.get(tenantId); + TenantApiUsageState tenantState = (TenantApiUsageState) myUsageStates.get(tenantId); if (tenantState == null) { ApiUsageState dbStateEntity = apiUsageStateService.findTenantApiUsageState(tenantId); if (dbStateEntity == null) { try { - dbStateEntity = apiUsageStateService.createDefaultApiUsageState(tenantId); + dbStateEntity = apiUsageStateService.createDefaultApiUsageState(tenantId, null); } catch (Exception e) { dbStateEntity = apiUsageStateService.findTenantApiUsageState(tenantId); } @@ -410,7 +493,7 @@ public class DefaultTbApiUsageStateService extends TbApplicationEventListener tenantIterator = new PageDataIterable<>(tenantService::findTenants, 1024); List> futures = new ArrayList<>(); for (Tenant tenant : tenantIterator) { - if (!myTenantStates.containsKey(tenant.getId()) && partitionService.resolve(ServiceType.TB_CORE, tenant.getId(), tenant.getId()).isMyPartition()) { + if (!myUsageStates.containsKey(tenant.getId()) && partitionService.resolve(ServiceType.TB_CORE, tenant.getId(), tenant.getId()).isMyPartition()) { log.debug("[{}] Initializing tenant state.", tenant.getId()); futures.add(tmpInitExecutor.submit(() -> { try { diff --git a/application/src/main/java/org/thingsboard/server/service/apiusage/TenantApiUsageState.java b/application/src/main/java/org/thingsboard/server/service/apiusage/TenantApiUsageState.java index bf3ec13893..6687d184d2 100644 --- a/application/src/main/java/org/thingsboard/server/service/apiusage/TenantApiUsageState.java +++ b/application/src/main/java/org/thingsboard/server/service/apiusage/TenantApiUsageState.java @@ -22,85 +22,23 @@ import org.thingsboard.server.common.data.ApiFeature; import org.thingsboard.server.common.data.ApiUsageRecordKey; import org.thingsboard.server.common.data.ApiUsageState; import org.thingsboard.server.common.data.ApiUsageStateValue; +import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.TenantProfile; -import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantProfileId; import org.thingsboard.server.common.data.tenant.profile.TenantProfileData; -import org.thingsboard.server.common.msg.tools.SchedulerUtils; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - -public class TenantApiUsageState { - - private final Map currentCycleValues = new ConcurrentHashMap<>(); - private final Map currentHourValues = new ConcurrentHashMap<>(); +public class TenantApiUsageState extends BaseApiUsageState { @Getter @Setter private TenantProfileId tenantProfileId; @Getter @Setter private TenantProfileData tenantProfileData; - @Getter - private final ApiUsageState apiUsageState; - @Getter - private volatile long currentCycleTs; - @Getter - private volatile long nextCycleTs; - @Getter - private volatile long currentHourTs; public TenantApiUsageState(TenantProfile tenantProfile, ApiUsageState apiUsageState) { + super(apiUsageState); this.tenantProfileId = tenantProfile.getId(); this.tenantProfileData = tenantProfile.getProfileData(); - this.apiUsageState = apiUsageState; - this.currentCycleTs = SchedulerUtils.getStartOfCurrentMonth(); - this.nextCycleTs = SchedulerUtils.getStartOfNextMonth(); - this.currentHourTs = SchedulerUtils.getStartOfCurrentHour(); - } - - public void put(ApiUsageRecordKey key, Long value) { - currentCycleValues.put(key, value); - } - - public void putHourly(ApiUsageRecordKey key, Long value) { - currentHourValues.put(key, value); - } - - public long add(ApiUsageRecordKey key, long value) { - long result = currentCycleValues.getOrDefault(key, 0L) + value; - currentCycleValues.put(key, result); - return result; - } - - public long get(ApiUsageRecordKey key) { - return currentCycleValues.getOrDefault(key, 0L); - } - - public long addToHourly(ApiUsageRecordKey key, long value) { - long result = currentHourValues.getOrDefault(key, 0L) + value; - currentHourValues.put(key, result); - return result; - } - - public void setHour(long currentHourTs) { - this.currentHourTs = currentHourTs; - for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) { - currentHourValues.put(key, 0L); - } - } - - public void setCycles(long currentCycleTs, long nextCycleTs) { - this.currentCycleTs = currentCycleTs; - this.nextCycleTs = nextCycleTs; - for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) { - currentCycleValues.put(key, 0L); - } } public long getProfileThreshold(ApiUsageRecordKey key) { @@ -111,69 +49,7 @@ public class TenantApiUsageState { return tenantProfileData.getConfiguration().getWarnThreshold(key); } - public TenantId getTenantId() { - return apiUsageState.getTenantId(); - } - - public ApiUsageStateValue getFeatureValue(ApiFeature feature) { - switch (feature) { - case TRANSPORT: - return apiUsageState.getTransportState(); - case RE: - return apiUsageState.getReExecState(); - case DB: - return apiUsageState.getDbStorageState(); - case JS: - return apiUsageState.getJsExecState(); - case EMAIL: - return apiUsageState.getEmailExecState(); - case SMS: - return apiUsageState.getSmsExecState(); - default: - return ApiUsageStateValue.ENABLED; - } - } - - public boolean setFeatureValue(ApiFeature feature, ApiUsageStateValue value) { - ApiUsageStateValue currentValue = getFeatureValue(feature); - switch (feature) { - case TRANSPORT: - apiUsageState.setTransportState(value); - break; - case RE: - apiUsageState.setReExecState(value); - break; - case DB: - apiUsageState.setDbStorageState(value); - break; - case JS: - apiUsageState.setJsExecState(value); - break; - case EMAIL: - apiUsageState.setEmailExecState(value); - break; - case SMS: - apiUsageState.setSmsExecState(value); - break; - } - return !currentValue.equals(value); - } - - public Map checkStateUpdatedDueToThresholds() { - return checkStateUpdatedDueToThreshold(new HashSet<>(Arrays.asList(ApiFeature.values()))); - } - - public Map checkStateUpdatedDueToThreshold(Set features) { - Map result = new HashMap<>(); - for (ApiFeature feature : features) { - Pair tmp = checkStateUpdatedDueToThreshold(feature); - if (tmp != null) { - result.put(tmp.getFirst(), tmp.getSecond()); - } - } - return result; - } - + @Override public Pair checkStateUpdatedDueToThreshold(ApiFeature feature) { ApiUsageStateValue featureValue = ApiUsageStateValue.ENABLED; for (ApiUsageRecordKey recordKey : ApiUsageRecordKey.getKeys(feature)) { @@ -193,4 +69,9 @@ public class TenantApiUsageState { return setFeatureValue(feature, featureValue) ? Pair.of(feature, featureValue) : null; } + @Override + public EntityType getEntityType() { + return EntityType.TENANT; + } + } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 70978df0a3..fb01926e2a 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -920,7 +920,7 @@ public final class EdgeGrpcSession implements Closeable { try { if (uplinkMsg.getEntityDataCount() > 0) { for (EntityDataProto entityData : uplinkMsg.getEntityDataList()) { - result.addAll(ctx.getTelemetryProcessor().onTelemetryUpdate(edge.getTenantId(), entityData)); + result.addAll(ctx.getTelemetryProcessor().onTelemetryUpdate(edge.getTenantId(), edge.getCustomerId(), entityData)); } } if (uplinkMsg.getDeviceUpdateMsgCount() > 0) { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceProcessor.java index 35eea3f962..0cc241c743 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/DeviceProcessor.java @@ -233,7 +233,7 @@ public class DeviceProcessor extends BaseProcessor { ObjectNode entityNode = mapper.valueToTree(device); TbMsg tbMsg = TbMsg.newMsg(DataConstants.ENTITY_CREATED, deviceId, getActionTbMsgMetaData(edge, device.getCustomerId()), TbMsgDataType.JSON, mapper.writeValueAsString(entityNode)); - tbClusterService.pushMsgToRuleEngine(tenantId, deviceId, tbMsg, new TbQueueCallback() { + tbClusterService.pushMsgToRuleEngine(tenantId, device.getCustomerId(), deviceId, tbMsg, new TbQueueCallback() { @Override public void onSuccess(TbQueueMsgMetadata metadata) { log.debug("Successfully send ENTITY_CREATED EVENT to rule engine [{}]", device); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryProcessor.java index c858955bfc..6667414815 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryProcessor.java @@ -70,7 +70,7 @@ public class TelemetryProcessor extends BaseProcessor { private final Gson gson = new Gson(); - public List> onTelemetryUpdate(TenantId tenantId, EntityDataProto entityData) { + public List> onTelemetryUpdate(TenantId tenantId, CustomerId customerId, EntityDataProto entityData) { log.trace("[{}] onTelemetryUpdate [{}]", tenantId, entityData); List> result = new ArrayList<>(); EntityId entityId = constructEntityId(entityData); @@ -80,14 +80,14 @@ public class TelemetryProcessor extends BaseProcessor { TbMsgMetaData metaData = new TbMsgMetaData(); metaData.putValue(DataConstants.MSG_SOURCE_KEY, DataConstants.EDGE_MSG_SOURCE); if (entityData.hasPostAttributesMsg()) { - result.add(processPostAttributes(tenantId, entityId, entityData.getPostAttributesMsg(), metaData)); + result.add(processPostAttributes(tenantId, customerId, entityId, entityData.getPostAttributesMsg(), metaData)); } if (entityData.hasAttributesUpdatedMsg()) { metaData.putValue("scope", entityData.getPostAttributeScope()); - result.add(processAttributesUpdate(tenantId, entityId, entityData.getAttributesUpdatedMsg(), metaData)); + result.add(processAttributesUpdate(tenantId, customerId, entityId, entityData.getAttributesUpdatedMsg(), metaData)); } if (entityData.hasPostTelemetryMsg()) { - result.add(processPostTelemetry(tenantId, entityId, entityData.getPostTelemetryMsg(), metaData)); + result.add(processPostTelemetry(tenantId, customerId, entityId, entityData.getPostTelemetryMsg(), metaData)); } } if (entityData.hasAttributeDeleteMsg()) { @@ -148,7 +148,7 @@ public class TelemetryProcessor extends BaseProcessor { } } - private ListenableFuture processPostTelemetry(TenantId tenantId, EntityId entityId, TransportProtos.PostTelemetryMsg msg, TbMsgMetaData metaData) { + private ListenableFuture processPostTelemetry(TenantId tenantId, CustomerId customerId, EntityId entityId, TransportProtos.PostTelemetryMsg msg, TbMsgMetaData metaData) { SettableFuture futureToSet = SettableFuture.create(); for (TransportProtos.TsKvListProto tsKv : msg.getTsKvListList()) { JsonObject json = JsonUtils.getJsonObject(tsKv.getKvList()); @@ -157,7 +157,7 @@ public class TelemetryProcessor extends BaseProcessor { String queueName = defaultQueueAndRuleChain.getKey(); RuleChainId ruleChainId = defaultQueueAndRuleChain.getValue(); TbMsg tbMsg = TbMsg.newMsg(queueName, SessionMsgType.POST_TELEMETRY_REQUEST.name(), entityId, metaData, gson.toJson(json), ruleChainId, null); - tbClusterService.pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() { + tbClusterService.pushMsgToRuleEngine(tenantId, customerId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() { @Override public void onSuccess(TbQueueMsgMetadata metadata) { futureToSet.set(null); @@ -173,14 +173,14 @@ public class TelemetryProcessor extends BaseProcessor { return futureToSet; } - private ListenableFuture processPostAttributes(TenantId tenantId, EntityId entityId, TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData) { + private ListenableFuture processPostAttributes(TenantId tenantId, CustomerId customerId, EntityId entityId, TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData) { SettableFuture futureToSet = SettableFuture.create(); JsonObject json = JsonUtils.getJsonObject(msg.getKvList()); Pair defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId); String queueName = defaultQueueAndRuleChain.getKey(); RuleChainId ruleChainId = defaultQueueAndRuleChain.getValue(); TbMsg tbMsg = TbMsg.newMsg(queueName, SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), entityId, metaData, gson.toJson(json), ruleChainId, null); - tbClusterService.pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() { + tbClusterService.pushMsgToRuleEngine(tenantId, customerId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() { @Override public void onSuccess(TbQueueMsgMetadata metadata) { futureToSet.set(null); @@ -195,7 +195,7 @@ public class TelemetryProcessor extends BaseProcessor { return futureToSet; } - private ListenableFuture processAttributesUpdate(TenantId tenantId, EntityId entityId, TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData) { + private ListenableFuture processAttributesUpdate(TenantId tenantId, CustomerId customerId, EntityId entityId, TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData) { SettableFuture futureToSet = SettableFuture.create(); JsonObject json = JsonUtils.getJsonObject(msg.getKvList()); Set attributes = JsonConverter.convertToAttributes(json); @@ -207,7 +207,7 @@ public class TelemetryProcessor extends BaseProcessor { String queueName = defaultQueueAndRuleChain.getKey(); RuleChainId ruleChainId = defaultQueueAndRuleChain.getValue(); TbMsg tbMsg = TbMsg.newMsg(queueName, DataConstants.ATTRIBUTES_UPDATED, entityId, metaData, gson.toJson(json), ruleChainId, null); - tbClusterService.pushMsgToRuleEngine(tenantId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() { + tbClusterService.pushMsgToRuleEngine(tenantId, customerId, tbMsg.getOriginator(), tbMsg, new TbQueueCallback() { @Override public void onSuccess(TbQueueMsgMetadata metadata) { futureToSet.set(null); diff --git a/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java b/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java index 5eba80315c..b4a9d06583 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/SqlDatabaseUpgradeService.java @@ -390,7 +390,7 @@ public class SqlDatabaseUpgradeService implements DatabaseEntitiesUpgradeService pageData = tenantService.findTenants(pageLink); for (Tenant tenant : pageData.getData()) { try { - apiUsageStateService.createDefaultApiUsageState(tenant.getId()); + apiUsageStateService.createDefaultApiUsageState(tenant.getId(), null); } catch (Exception e) { } List deviceTypes = deviceService.findDeviceTypesByTenantId(tenant.getId()).get(); diff --git a/application/src/main/java/org/thingsboard/server/service/mail/DefaultMailService.java b/application/src/main/java/org/thingsboard/server/service/mail/DefaultMailService.java index 972c99f866..b4c1058616 100644 --- a/application/src/main/java/org/thingsboard/server/service/mail/DefaultMailService.java +++ b/application/src/main/java/org/thingsboard/server/service/mail/DefaultMailService.java @@ -36,6 +36,7 @@ import org.thingsboard.server.common.data.ApiUsageStateMailMessage; import org.thingsboard.server.common.data.ApiUsageStateValue; import org.thingsboard.server.common.data.exception.ThingsboardErrorCode; import org.thingsboard.server.common.data.exception.ThingsboardException; +import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.dao.exception.IncorrectParameterException; @@ -233,7 +234,7 @@ public class DefaultMailService implements MailService { } @Override - public void send(TenantId tenantId, String from, String to, String cc, String bcc, String subject, String body) throws MessagingException { + public void send(TenantId tenantId, CustomerId customerId, String from, String to, String cc, String bcc, String subject, String body) throws MessagingException { if (apiUsageStateService.getApiUsageState(tenantId).isEmailSendEnabled()) { MimeMessage mailMsg = mailSender.createMimeMessage(); MimeMessageHelper helper = new MimeMessageHelper(mailMsg, "UTF-8"); @@ -248,7 +249,7 @@ public class DefaultMailService implements MailService { helper.setSubject(subject); helper.setText(body); mailSender.send(helper.getMimeMessage()); - apiUsageClient.report(tenantId, ApiUsageRecordKey.EMAIL_EXEC_COUNT, 1); + apiUsageClient.report(tenantId, customerId, ApiUsageRecordKey.EMAIL_EXEC_COUNT, 1); } else { throw new RuntimeException("Email sending is disabled due to API limits!"); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index 2f5610d7ff..e61983b20d 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -29,6 +29,7 @@ import org.thingsboard.server.common.data.HasName; import org.thingsboard.server.common.data.TbResource; import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.TenantProfile; +import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EdgeId; @@ -133,7 +134,12 @@ public class DefaultTbClusterService implements TbClusterService { } @Override - public void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, TbMsg tbMsg, TbQueueCallback callback) { + public void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, TbMsg msg, TbQueueCallback callback) { + pushMsgToRuleEngine(tenantId, null, entityId, msg, callback); + } + + @Override + public void pushMsgToRuleEngine(TenantId tenantId, CustomerId customerId, EntityId entityId, TbMsg tbMsg, TbQueueCallback callback) { if (tenantId.isNullUid()) { if (entityId.getEntityType().equals(EntityType.TENANT)) { tenantId = new TenantId(entityId.getId()); @@ -148,6 +154,7 @@ public class DefaultTbClusterService implements TbClusterService { tbMsg = transformMsg(tbMsg, deviceProfileCache.get(tenantId, new DeviceProfileId(entityId.getId()))); } } + tbMsg.setCustomerId(customerId); TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_RULE_ENGINE, tbMsg.getQueueName(), tenantId, entityId); log.trace("PUSHING msg: {} to:{}", tbMsg, tpi); ToRuleEngineMsg msg = ToRuleEngineMsg.newBuilder() diff --git a/application/src/main/java/org/thingsboard/server/service/queue/TbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/TbClusterService.java index c5848bed58..c66ca7a62f 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/TbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/TbClusterService.java @@ -22,6 +22,7 @@ import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.TbResource; import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.TenantProfile; +import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; @@ -50,6 +51,8 @@ public interface TbClusterService { void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, TbMsg msg, TbQueueCallback callback); + void pushMsgToRuleEngine(TenantId tenantId, CustomerId customerId, EntityId entityId, TbMsg msg, TbQueueCallback callback); + void pushNotificationToRuleEngine(String targetServiceId, FromDeviceRpcResponse response, TbQueueCallback callback); void pushNotificationToTransport(String targetServiceId, ToTransportMsg response, TbQueueCallback callback); diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbCoreDeviceRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbCoreDeviceRpcService.java index a29cac7f47..08d4143140 100644 --- a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbCoreDeviceRpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbCoreDeviceRpcService.java @@ -34,6 +34,7 @@ import org.thingsboard.server.dao.device.DeviceService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.queue.TbClusterService; +import org.thingsboard.server.service.security.model.SecurityUser; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @@ -95,11 +96,11 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService { } @Override - public void processRestApiRpcRequest(ToDeviceRpcRequest request, Consumer responseConsumer) { + public void processRestApiRpcRequest(ToDeviceRpcRequest request, Consumer responseConsumer, SecurityUser currentUser) { log.trace("[{}][{}] Processing REST API call to rule engine [{}]", request.getTenantId(), request.getId(), request.getDeviceId()); UUID requestId = request.getId(); localToRuleEngineRpcRequests.put(requestId, responseConsumer); - sendRpcRequestToRuleEngine(request); + sendRpcRequestToRuleEngine(request, currentUser); scheduleToRuleEngineTimeout(request, requestId); } @@ -149,7 +150,7 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService { } } - private void sendRpcRequestToRuleEngine(ToDeviceRpcRequest msg) { + private void sendRpcRequestToRuleEngine(ToDeviceRpcRequest msg, SecurityUser currentUser) { ObjectNode entityNode = json.createObjectNode(); TbMsgMetaData metaData = new TbMsgMetaData(); metaData.putValue("requestUUID", msg.getId().toString()); @@ -168,7 +169,7 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService { try { TbMsg tbMsg = TbMsg.newMsg(DataConstants.RPC_CALL_FROM_SERVER_TO_DEVICE, msg.getDeviceId(), metaData, TbMsgDataType.JSON, json.writeValueAsString(entityNode)); - clusterService.pushMsgToRuleEngine(msg.getTenantId(), msg.getDeviceId(), tbMsg, null); + clusterService.pushMsgToRuleEngine(msg.getTenantId(), currentUser.getCustomerId(), msg.getDeviceId(), tbMsg, null); } catch (JsonProcessingException e) { throw new RuntimeException(e); } diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/TbCoreDeviceRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/TbCoreDeviceRpcService.java index 74e7072611..c3c6f9b888 100644 --- a/application/src/main/java/org/thingsboard/server/service/rpc/TbCoreDeviceRpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/rpc/TbCoreDeviceRpcService.java @@ -16,6 +16,7 @@ package org.thingsboard.server.service.rpc; import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest; +import org.thingsboard.server.service.security.model.SecurityUser; import java.util.function.Consumer; @@ -27,11 +28,11 @@ public interface TbCoreDeviceRpcService { /** * Handles REST API calls that contain RPC requests to Device and pushes them to Rule Engine. * Schedules the timeout for the RPC call based on the {@link ToDeviceRpcRequest} - * - * @param request the RPC request + * @param request the RPC request * @param responseConsumer the consumer of the RPC response + * @param currentUser */ - void processRestApiRpcRequest(ToDeviceRpcRequest request, Consumer responseConsumer); + void processRestApiRpcRequest(ToDeviceRpcRequest request, Consumer responseConsumer, SecurityUser currentUser); /** * Handles the RPC response from the Rule Engine. diff --git a/application/src/main/java/org/thingsboard/server/service/script/AbstractJsInvokeService.java b/application/src/main/java/org/thingsboard/server/service/script/AbstractJsInvokeService.java index 7342169a86..2f0378ae9d 100644 --- a/application/src/main/java/org/thingsboard/server/service/script/AbstractJsInvokeService.java +++ b/application/src/main/java/org/thingsboard/server/service/script/AbstractJsInvokeService.java @@ -20,6 +20,7 @@ import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.data.ApiUsageRecordKey; +import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.queue.usagestats.TbApiUsageClient; import org.thingsboard.server.service.apiusage.TbApiUsageStateService; @@ -73,14 +74,14 @@ public abstract class AbstractJsInvokeService implements JsInvokeService { } @Override - public ListenableFuture invokeFunction(TenantId tenantId, UUID scriptId, Object... args) { + public ListenableFuture invokeFunction(TenantId tenantId, CustomerId customerId, UUID scriptId, Object... args) { if (apiUsageStateService.getApiUsageState(tenantId).isJsExecEnabled()) { String functionName = scriptIdToNameMap.get(scriptId); if (functionName == null) { return Futures.immediateFailedFuture(new RuntimeException("No compiled script found for scriptId: [" + scriptId + "]!")); } if (!isDisabled(scriptId)) { - apiUsageClient.report(tenantId, ApiUsageRecordKey.JS_EXEC_COUNT, 1); + apiUsageClient.report(tenantId, customerId, ApiUsageRecordKey.JS_EXEC_COUNT, 1); return doInvokeFunction(scriptId, functionName, args); } else { return Futures.immediateFailedFuture( diff --git a/application/src/main/java/org/thingsboard/server/service/script/JsInvokeService.java b/application/src/main/java/org/thingsboard/server/service/script/JsInvokeService.java index 1636b2cbf0..317170f1a7 100644 --- a/application/src/main/java/org/thingsboard/server/service/script/JsInvokeService.java +++ b/application/src/main/java/org/thingsboard/server/service/script/JsInvokeService.java @@ -16,7 +16,7 @@ package org.thingsboard.server.service.script; import com.google.common.util.concurrent.ListenableFuture; -import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.TenantId; import java.util.UUID; @@ -25,7 +25,7 @@ public interface JsInvokeService { ListenableFuture eval(TenantId tenantId, JsScriptType scriptType, String scriptBody, String... argNames); - ListenableFuture invokeFunction(TenantId tenantId, UUID scriptId, Object... args); + ListenableFuture invokeFunction(TenantId tenantId, CustomerId customerId, UUID scriptId, Object... args); ListenableFuture release(UUID scriptId); diff --git a/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java b/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java index 5dffc0217c..066ce71a58 100644 --- a/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java +++ b/application/src/main/java/org/thingsboard/server/service/script/RuleNodeJsScriptEngine.java @@ -218,7 +218,7 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S private JsonNode executeScript(TbMsg msg) throws ScriptException { try { String[] inArgs = prepareArgs(msg); - String eval = sandboxService.invokeFunction(tenantId, this.scriptId, inArgs[0], inArgs[1], inArgs[2]).get().toString(); + String eval = sandboxService.invokeFunction(tenantId, msg.getCustomerId(), this.scriptId, inArgs[0], inArgs[1], inArgs[2]).get().toString(); return mapper.readTree(eval); } catch (ExecutionException e) { if (e.getCause() instanceof ScriptException) { @@ -235,7 +235,7 @@ public class RuleNodeJsScriptEngine implements org.thingsboard.rule.engine.api.S private ListenableFuture executeScriptAsync(TbMsg msg) { String[] inArgs = prepareArgs(msg); - return Futures.transformAsync(sandboxService.invokeFunction(tenantId, this.scriptId, inArgs[0], inArgs[1], inArgs[2]), + return Futures.transformAsync(sandboxService.invokeFunction(tenantId, msg.getCustomerId(), this.scriptId, inArgs[0], inArgs[1], inArgs[2]), o -> { try { return Futures.immediateFuture(mapper.readTree(o.toString())); diff --git a/application/src/main/java/org/thingsboard/server/service/sms/DefaultSmsService.java b/application/src/main/java/org/thingsboard/server/service/sms/DefaultSmsService.java index f7a012b23e..f0b3eb174e 100644 --- a/application/src/main/java/org/thingsboard/server/service/sms/DefaultSmsService.java +++ b/application/src/main/java/org/thingsboard/server/service/sms/DefaultSmsService.java @@ -22,6 +22,7 @@ import org.springframework.stereotype.Service; import org.thingsboard.rule.engine.api.SmsService; import org.thingsboard.rule.engine.api.sms.SmsSender; import org.thingsboard.rule.engine.api.sms.SmsSenderFactory; +import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.sms.config.SmsProviderConfiguration; import org.thingsboard.server.common.data.sms.config.TestSmsRequest; import org.thingsboard.server.common.data.AdminSettings; @@ -94,7 +95,7 @@ public class DefaultSmsService implements SmsService { } @Override - public void sendSms(TenantId tenantId, String[] numbersTo, String message) throws ThingsboardException { + public void sendSms(TenantId tenantId, CustomerId customerId, String[] numbersTo, String message) throws ThingsboardException { if (apiUsageStateService.getApiUsageState(tenantId).isSmsSendEnabled()) { int smsCount = 0; try { @@ -103,7 +104,7 @@ public class DefaultSmsService implements SmsService { } } finally { if (smsCount > 0) { - apiUsageClient.report(tenantId, ApiUsageRecordKey.SMS_EXEC_COUNT, smsCount); + apiUsageClient.report(tenantId, customerId, ApiUsageRecordKey.SMS_EXEC_COUNT, smsCount); } } } else { diff --git a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java index c8a4e80e0e..6ccfd255ed 100644 --- a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java @@ -468,6 +468,7 @@ public class DefaultDeviceStateService extends TbApplicationEventListener ts, FutureCallback callback) { - saveAndNotify(tenantId, entityId, ts, 0L, callback); + saveAndNotify(tenantId, null, entityId, ts, 0L, callback); } @Override - public void saveAndNotify(TenantId tenantId, EntityId entityId, List ts, long ttl, FutureCallback callback) { + public void saveAndNotify(TenantId tenantId, CustomerId customerId, EntityId entityId, List ts, long ttl, FutureCallback callback) { checkInternalEntity(entityId); boolean sysTenant = TenantId.SYS_TENANT_ID.equals(tenantId) || tenantId == null; if (sysTenant || apiUsageStateService.getApiUsageState(tenantId).isDbStorageEnabled()) { @@ -127,7 +125,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer @Override public void onSuccess(Integer result) { if (!sysTenant && result != null && result > 0) { - apiUsageClient.report(tenantId, ApiUsageRecordKey.STORAGE_DP_COUNT, result); + apiUsageClient.report(tenantId, customerId, ApiUsageRecordKey.STORAGE_DP_COUNT, result); } callback.onSuccess(null); } diff --git a/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java b/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java index 6944809515..96bc2fc02b 100644 --- a/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java +++ b/application/src/main/java/org/thingsboard/server/service/transport/DefaultTransportApiService.java @@ -81,6 +81,7 @@ import org.thingsboard.server.service.profile.TbDeviceProfileCache; import org.thingsboard.server.service.queue.TbClusterService; import org.thingsboard.server.service.state.DeviceStateService; +import java.util.Optional; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -273,7 +274,7 @@ public class DefaultTransportApiService implements TransportApiService { DeviceId deviceId = device.getId(); ObjectNode entityNode = mapper.valueToTree(device); TbMsg tbMsg = TbMsg.newMsg(DataConstants.ENTITY_CREATED, deviceId, metaData, TbMsgDataType.JSON, mapper.writeValueAsString(entityNode)); - tbClusterService.pushMsgToRuleEngine(tenantId, deviceId, tbMsg, null); + tbClusterService.pushMsgToRuleEngine(tenantId, customerId, deviceId, tbMsg, null); } GetOrCreateDeviceFromGatewayResponseMsg.Builder builder = GetOrCreateDeviceFromGatewayResponseMsg.newBuilder() .setDeviceInfo(getDeviceInfoProto(device)); @@ -411,6 +412,8 @@ public class DefaultTransportApiService implements TransportApiService { return DeviceInfoProto.newBuilder() .setTenantIdMSB(device.getTenantId().getId().getMostSignificantBits()) .setTenantIdLSB(device.getTenantId().getId().getLeastSignificantBits()) + .setCustomerIdMSB(device.getCustomerId().getId().getMostSignificantBits()) + .setCustomerIdLSB(device.getCustomerId().getId().getLeastSignificantBits()) .setDeviceIdMSB(device.getId().getId().getMostSignificantBits()) .setDeviceIdLSB(device.getId().getId().getLeastSignificantBits()) .setDeviceName(device.getName()) diff --git a/application/src/test/java/org/thingsboard/server/controller/BaseWebsocketApiTest.java b/application/src/test/java/org/thingsboard/server/controller/BaseWebsocketApiTest.java index 2249940c48..2532fbc00d 100644 --- a/application/src/test/java/org/thingsboard/server/controller/BaseWebsocketApiTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/BaseWebsocketApiTest.java @@ -21,12 +21,12 @@ import org.checkerframework.checker.nullness.qual.Nullable; import org.junit.After; import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.User; +import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.kv.Aggregation; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; @@ -750,7 +750,7 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest { private void sendTelemetry(Device device, List tsData) throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); - tsService.saveAndNotify(device.getTenantId(), device.getId(), tsData, 0, new FutureCallback() { + tsService.saveAndNotify(device.getTenantId(), null, device.getId(), tsData, 0, new FutureCallback() { @Override public void onSuccess(@Nullable Void result) { latch.countDown(); diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/usagerecord/ApiUsageStateService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/usagerecord/ApiUsageStateService.java index 5b0985e66c..6cd9c62aef 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/usagerecord/ApiUsageStateService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/usagerecord/ApiUsageStateService.java @@ -17,16 +17,19 @@ package org.thingsboard.server.dao.usagerecord; import org.thingsboard.server.common.data.ApiUsageState; import org.thingsboard.server.common.data.id.ApiUsageStateId; +import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; public interface ApiUsageStateService { - ApiUsageState createDefaultApiUsageState(TenantId id); + ApiUsageState createDefaultApiUsageState(TenantId id, EntityId entityId); ApiUsageState update(ApiUsageState apiUsageState); ApiUsageState findTenantApiUsageState(TenantId tenantId); + ApiUsageState findApiUsageStateByEntityId(EntityId entityId); + void deleteApiUsageStateByTenantId(TenantId tenantId); ApiUsageState findApiUsageStateById(TenantId tenantId, ApiUsageStateId id); diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java index 66a3670ddd..072626b04e 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsg.java @@ -23,6 +23,7 @@ import lombok.Builder; import lombok.Data; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.RuleChainId; @@ -47,6 +48,7 @@ public final class TbMsg implements Serializable { private final long ts; private final String type; private final EntityId originator; + private CustomerId customerId; private final TbMsgMetaData metaData; private final TbMsgDataType dataType; private final String data; diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/usagestats/DefaultTbApiUsageClient.java b/common/queue/src/main/java/org/thingsboard/server/queue/usagestats/DefaultTbApiUsageClient.java index 01a322f158..556b9b88ff 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/usagestats/DefaultTbApiUsageClient.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/usagestats/DefaultTbApiUsageClient.java @@ -19,6 +19,9 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.ApiUsageRecordKey; +import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; @@ -31,6 +34,7 @@ import org.thingsboard.server.queue.provider.TbQueueProducerProvider; import org.thingsboard.server.queue.scheduler.SchedulerComponent; import javax.annotation.PostConstruct; +import java.util.EnumMap; import java.util.Random; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -47,8 +51,9 @@ public class DefaultTbApiUsageClient implements TbApiUsageClient { @Value("${usage.stats.report.interval:10}") private int interval; - @SuppressWarnings("unchecked") - private final ConcurrentMap[] values = new ConcurrentMap[ApiUsageRecordKey.values().length]; + private final EnumMap> stats = new EnumMap<>(ApiUsageRecordKey.class); + private final ConcurrentMap tenants = new ConcurrentHashMap<>(); + private final PartitionService partitionService; private final SchedulerComponent scheduler; private final TbQueueProducerProvider producerProvider; @@ -65,55 +70,88 @@ public class DefaultTbApiUsageClient implements TbApiUsageClient { if (enabled) { msgProducer = this.producerProvider.getTbUsageStatsMsgProducer(); for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) { - values[key.ordinal()] = new ConcurrentHashMap<>(); + stats.put(key, new ConcurrentHashMap<>()); } - scheduler.scheduleWithFixedDelay(this::reportStats, new Random().nextInt(interval), interval, TimeUnit.SECONDS); + scheduler.scheduleWithFixedDelay(() -> { + try { + reportStats(); + } catch (Exception e) { + log.warn("Failed to report statistics: ", e); + } + }, new Random().nextInt(interval), interval, TimeUnit.SECONDS); } } private void reportStats() { - try { - ConcurrentMap report = new ConcurrentHashMap<>(); + ConcurrentMap report = new ConcurrentHashMap<>(); - for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) { - values[key.ordinal()].forEach(((tenantId, atomicLong) -> { - long value = atomicLong.getAndSet(0); - if (value > 0) { - ToUsageStatsServiceMsg.Builder msgBuilder = report.computeIfAbsent(tenantId, id -> { - ToUsageStatsServiceMsg.Builder msg = ToUsageStatsServiceMsg.newBuilder(); - msg.setTenantIdMSB(tenantId.getId().getMostSignificantBits()); - msg.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()); - return msg; - }); - msgBuilder.addValues(UsageStatsKVProto.newBuilder().setKey(key.name()).setValue(value).build()); + for (ApiUsageRecordKey key : ApiUsageRecordKey.values()) { + ConcurrentMap statsForKey = stats.get(key); + statsForKey.forEach((initiatorId, statsValue) -> { + long value = statsValue.get(); + if (value == 0) return; + + ToUsageStatsServiceMsg.Builder statsMsgBuilder = report.computeIfAbsent(initiatorId, id -> { + ToUsageStatsServiceMsg.Builder newStatsMsgBuilder = ToUsageStatsServiceMsg.newBuilder(); + + TenantId tenantId; + if (initiatorId.getEntityType() == EntityType.TENANT) { + tenantId = (TenantId) initiatorId; + } else { + tenantId = tenants.get(initiatorId); } - })); + + newStatsMsgBuilder.setTenantIdMSB(tenantId.getId().getMostSignificantBits()); + newStatsMsgBuilder.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()); + + if (initiatorId.getEntityType() == EntityType.CUSTOMER) { + newStatsMsgBuilder.setCustomerIdMSB(initiatorId.getId().getMostSignificantBits()); + newStatsMsgBuilder.setCustomerIdLSB(initiatorId.getId().getLeastSignificantBits()); + } + + return newStatsMsgBuilder; + }); + + statsMsgBuilder.addValues(UsageStatsKVProto.newBuilder().setKey(key.name()).setValue(value).build()); + }); + statsForKey.clear(); + } + + report.forEach(((initiatorId, builder) -> { + //TODO: figure out how to minimize messages into the queue. Maybe group by 100s of messages? + + TenantId tenantId; + if (initiatorId.getEntityType() == EntityType.TENANT) { + tenantId = (TenantId) initiatorId; + } else { + tenantId = tenants.get(initiatorId); } - report.forEach(((tenantId, builder) -> { - //TODO: figure out how to minimize messages into the queue. Maybe group by 100s of messages? - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, tenantId).newByTopic(msgProducer.getDefaultTopic()); - msgProducer.send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), builder.build()), null); - })); - if (!report.isEmpty()) { - log.info("Report statistics for: {} tenants", report.size()); - } - } catch (Exception e) { - log.warn("Failed to report statistics: ", e); + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, initiatorId).newByTopic(msgProducer.getDefaultTopic()); + msgProducer.send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), builder.build()), null); + })); + + if (!report.isEmpty()) { + log.info("Reporting API usage statistics for {} tenants and customers", report.size()); } } @Override - public void report(TenantId tenantId, ApiUsageRecordKey key, long value) { + public void report(TenantId tenantId, CustomerId customerId, ApiUsageRecordKey key, long value) { if (enabled) { - ConcurrentMap map = values[key.ordinal()]; - AtomicLong atomicValue = map.computeIfAbsent(tenantId, id -> new AtomicLong()); - atomicValue.addAndGet(value); + ConcurrentMap statsForKey = stats.get(key); + + statsForKey.computeIfAbsent(tenantId, id -> new AtomicLong()).addAndGet(value); + + if (customerId != null && !customerId.isNullUid()) { + statsForKey.computeIfAbsent(customerId, id -> new AtomicLong()).addAndGet(value); + tenants.putIfAbsent(customerId, tenantId); + } } } @Override - public void report(TenantId tenantId, ApiUsageRecordKey key) { - report(tenantId, key, 1L); + public void report(TenantId tenantId, CustomerId customerId, ApiUsageRecordKey key) { + report(tenantId, customerId, key, 1); } } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/usagestats/TbApiUsageClient.java b/common/queue/src/main/java/org/thingsboard/server/queue/usagestats/TbApiUsageClient.java index f46a2d0143..be1b071873 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/usagestats/TbApiUsageClient.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/usagestats/TbApiUsageClient.java @@ -16,12 +16,13 @@ package org.thingsboard.server.queue.usagestats; import org.thingsboard.server.common.data.ApiUsageRecordKey; +import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.TenantId; public interface TbApiUsageClient { - void report(TenantId tenantId, ApiUsageRecordKey key, long value); + void report(TenantId tenantId, CustomerId customerId, ApiUsageRecordKey key, long value); - void report(TenantId tenantId, ApiUsageRecordKey key); + void report(TenantId tenantId, CustomerId customerId, ApiUsageRecordKey key); } diff --git a/common/queue/src/main/proto/queue.proto b/common/queue/src/main/proto/queue.proto index 2150992bbb..d599031b34 100644 --- a/common/queue/src/main/proto/queue.proto +++ b/common/queue/src/main/proto/queue.proto @@ -53,6 +53,8 @@ message SessionInfoProto { int64 gwSessionIdLSB = 11; int64 deviceProfileIdMSB = 12; int64 deviceProfileIdLSB = 13; + int64 customerIdMSB = 14; + int64 customerIdLSB = 15; } enum SessionEvent { @@ -110,6 +112,8 @@ message DeviceInfoProto { string additionalInfo = 7; int64 deviceProfileIdMSB = 8; int64 deviceProfileIdLSB = 9; + int64 customerIdMSB = 10; + int64 customerIdLSB = 11; } /** @@ -638,4 +642,6 @@ message ToUsageStatsServiceMsg { int64 entityIdMSB = 3; int64 entityIdLSB = 4; repeated UsageStatsKVProto values = 5; + int64 customerIdMSB = 6; + int64 customerIdLSB = 7; } diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/auth/SessionInfoCreator.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/auth/SessionInfoCreator.java index b175ca8580..63d19b9c30 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/auth/SessionInfoCreator.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/auth/SessionInfoCreator.java @@ -40,6 +40,8 @@ public class SessionInfoCreator { .setDeviceIdLSB(msg.getDeviceInfo().getDeviceId().getId().getLeastSignificantBits()) .setTenantIdMSB(msg.getDeviceInfo().getTenantId().getId().getMostSignificantBits()) .setTenantIdLSB(msg.getDeviceInfo().getTenantId().getId().getLeastSignificantBits()) + .setCustomerIdMSB(msg.getDeviceInfo().getCustomerId().getId().getMostSignificantBits()) + .setCustomerIdLSB(msg.getDeviceInfo().getCustomerId().getId().getLeastSignificantBits()) .setDeviceName(msg.getDeviceInfo().getDeviceName()) .setDeviceType(msg.getDeviceInfo().getDeviceType()) .setDeviceProfileIdMSB(msg.getDeviceInfo().getDeviceProfileId().getId().getMostSignificantBits()) diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/auth/TransportDeviceInfo.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/auth/TransportDeviceInfo.java index 20d01420c4..27d42d867f 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/auth/TransportDeviceInfo.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/auth/TransportDeviceInfo.java @@ -16,6 +16,7 @@ package org.thingsboard.server.common.transport.auth; import lombok.Data; +import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.TenantId; @@ -24,6 +25,7 @@ import org.thingsboard.server.common.data.id.TenantId; public class TransportDeviceInfo { private TenantId tenantId; + private CustomerId customerId; private DeviceProfileId deviceProfileId; private DeviceId deviceId; private String deviceName; diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index 740a2a622f..dfc0e6400d 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -33,8 +33,10 @@ import org.thingsboard.server.common.data.DeviceTransportType; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.ResourceType; import org.thingsboard.server.common.data.Tenant; +import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceProfileId; +import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantProfileId; @@ -359,6 +361,7 @@ public class DefaultTransportService implements TransportService { private TransportDeviceInfo getTransportDeviceInfo(TransportProtos.DeviceInfoProto di) { TransportDeviceInfo tdi = new TransportDeviceInfo(); tdi.setTenantId(new TenantId(new UUID(di.getTenantIdMSB(), di.getTenantIdLSB()))); + tdi.setCustomerId(new CustomerId(new UUID(di.getCustomerIdMSB(), di.getCustomerIdLSB()))); tdi.setDeviceId(new DeviceId(new UUID(di.getDeviceIdMSB(), di.getDeviceIdLSB()))); tdi.setDeviceProfileId(new DeviceProfileId(new UUID(di.getDeviceProfileIdMSB(), di.getDeviceProfileIdLSB()))); tdi.setAdditionalInfo(di.getAdditionalInfo()); @@ -404,9 +407,9 @@ public class DefaultTransportService implements TransportService { } if (checkLimits(sessionInfo, msg, callback, dataPoints)) { reportActivityInternal(sessionInfo); - TenantId tenantId = new TenantId(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB())); + TenantId tenantId = getTenantId(sessionInfo); DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB())); - MsgPackCallback packCallback = new MsgPackCallback(msg.getTsKvListCount(), new ApiStatsProxyCallback<>(tenantId, dataPoints, callback)); + MsgPackCallback packCallback = new MsgPackCallback(msg.getTsKvListCount(), new ApiStatsProxyCallback<>(tenantId, getCustomerId(sessionInfo), dataPoints, callback)); for (TransportProtos.TsKvListProto tsKv : msg.getTsKvListList()) { TbMsgMetaData metaData = new TbMsgMetaData(); metaData.putValue("deviceName", sessionInfo.getDeviceName()); @@ -423,7 +426,7 @@ public class DefaultTransportService implements TransportService { public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostAttributeMsg msg, TransportServiceCallback callback) { if (checkLimits(sessionInfo, msg, callback, msg.getKvCount())) { reportActivityInternal(sessionInfo); - TenantId tenantId = new TenantId(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB())); + TenantId tenantId = getTenantId(sessionInfo); DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB())); JsonObject json = JsonUtils.getJsonObject(msg.getKvList()); TbMsgMetaData metaData = new TbMsgMetaData(); @@ -431,7 +434,7 @@ public class DefaultTransportService implements TransportService { metaData.putValue("deviceType", sessionInfo.getDeviceType()); metaData.putValue("notifyDevice", "false"); sendToRuleEngine(tenantId, deviceId, sessionInfo, json, metaData, SessionMsgType.POST_ATTRIBUTES_REQUEST, - new TransportTbQueueCallback(new ApiStatsProxyCallback<>(tenantId, msg.getKvList().size(), callback))); + new TransportTbQueueCallback(new ApiStatsProxyCallback<>(tenantId, getCustomerId(sessionInfo), msg.getKvList().size(), callback))); } } @@ -440,7 +443,7 @@ public class DefaultTransportService implements TransportService { if (checkLimits(sessionInfo, msg, callback)) { reportActivityInternal(sessionInfo); sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) - .setGetAttributes(msg).build(), new ApiStatsProxyCallback<>(getTenantId(sessionInfo), 1, callback)); + .setGetAttributes(msg).build(), new ApiStatsProxyCallback<>(getTenantId(sessionInfo), getCustomerId(sessionInfo), 1, callback)); } } @@ -449,8 +452,8 @@ public class DefaultTransportService implements TransportService { if (checkLimits(sessionInfo, msg, callback)) { SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo); sessionMetaData.setSubscribedToAttributes(!msg.getUnsubscribe()); - sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) - .setSubscribeToAttributes(msg).build(), new ApiStatsProxyCallback<>(getTenantId(sessionInfo), 1, callback)); + sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToAttributes(msg).build(), + new ApiStatsProxyCallback<>(getTenantId(sessionInfo), getCustomerId(sessionInfo), 1, callback)); } } @@ -459,8 +462,8 @@ public class DefaultTransportService implements TransportService { if (checkLimits(sessionInfo, msg, callback)) { SessionMetaData sessionMetaData = reportActivityInternal(sessionInfo); sessionMetaData.setSubscribedToRPC(!msg.getUnsubscribe()); - sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) - .setSubscribeToRPC(msg).build(), new ApiStatsProxyCallback<>(getTenantId(sessionInfo), 1, callback)); + sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setSubscribeToRPC(msg).build(), + new ApiStatsProxyCallback<>(getTenantId(sessionInfo), getCustomerId(sessionInfo), 1, callback)); } } @@ -468,8 +471,8 @@ public class DefaultTransportService implements TransportService { public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.ToDeviceRpcResponseMsg msg, TransportServiceCallback callback) { if (checkLimits(sessionInfo, msg, callback)) { reportActivityInternal(sessionInfo); - sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo) - .setToDeviceRPCCallResponse(msg).build(), new ApiStatsProxyCallback<>(getTenantId(sessionInfo), 1, callback)); + sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo).setToDeviceRPCCallResponse(msg).build(), + new ApiStatsProxyCallback<>(getTenantId(sessionInfo), getCustomerId(sessionInfo), 1, callback)); } } @@ -797,6 +800,16 @@ public class DefaultTransportService implements TransportService { return new TenantId(new UUID(sessionInfo.getTenantIdMSB(), sessionInfo.getTenantIdLSB())); } + protected CustomerId getCustomerId(TransportProtos.SessionInfoProto sessionInfo) { + long msb = sessionInfo.getCustomerIdMSB(); + long lsb = sessionInfo.getCustomerIdLSB(); + if (msb != 0 && lsb != 0) { + return new CustomerId(new UUID(msb, lsb)); + } else { + return new CustomerId(EntityId.NULL_UUID); + } + } + protected DeviceId getDeviceId(TransportProtos.SessionInfoProto sessionInfo) { return new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB())); } @@ -922,11 +935,13 @@ public class DefaultTransportService implements TransportService { private class ApiStatsProxyCallback implements TransportServiceCallback { private final TenantId tenantId; + private final CustomerId customerId; private final int dataPoints; private final TransportServiceCallback callback; - public ApiStatsProxyCallback(TenantId tenantId, int dataPoints, TransportServiceCallback callback) { + public ApiStatsProxyCallback(TenantId tenantId, CustomerId customerId, int dataPoints, TransportServiceCallback callback) { this.tenantId = tenantId; + this.customerId = customerId; this.dataPoints = dataPoints; this.callback = callback; } @@ -934,8 +949,8 @@ public class DefaultTransportService implements TransportService { @Override public void onSuccess(T msg) { try { - apiUsageClient.report(tenantId, ApiUsageRecordKey.TRANSPORT_MSG_COUNT, 1); - apiUsageClient.report(tenantId, ApiUsageRecordKey.TRANSPORT_DP_COUNT, dataPoints); + apiUsageClient.report(tenantId, customerId, ApiUsageRecordKey.TRANSPORT_MSG_COUNT, 1); + apiUsageClient.report(tenantId, customerId, ApiUsageRecordKey.TRANSPORT_DP_COUNT, dataPoints); } finally { callback.onSuccess(msg); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/usagerecord/ApiUsageStateRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/usagerecord/ApiUsageStateRepository.java index 46cc6eaa83..ac0aea02da 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/usagerecord/ApiUsageStateRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/usagerecord/ApiUsageStateRepository.java @@ -33,6 +33,8 @@ public interface ApiUsageStateRepository extends CrudRepository { */ ApiUsageState findTenantApiUsageState(UUID tenantId); + ApiUsageState findApiUsageStateByEntityId(EntityId entityId); + /** * Delete usage record by tenantId. * diff --git a/dao/src/main/java/org/thingsboard/server/dao/usagerecord/ApiUsageStateServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/usagerecord/ApiUsageStateServiceImpl.java index f263de1505..a1e882607f 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/usagerecord/ApiUsageStateServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/usagerecord/ApiUsageStateServiceImpl.java @@ -25,6 +25,7 @@ import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.id.ApiUsageStateId; +import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.LongDataEntry; @@ -68,12 +69,12 @@ public class ApiUsageStateServiceImpl extends AbstractEntityService implements A } @Override - public ApiUsageState createDefaultApiUsageState(TenantId tenantId) { + public ApiUsageState createDefaultApiUsageState(TenantId tenantId, EntityId entityId) { log.trace("Executing createDefaultUsageRecord [{}]", tenantId); validateId(tenantId, INCORRECT_TENANT_ID + tenantId); ApiUsageState apiUsageState = new ApiUsageState(); apiUsageState.setTenantId(tenantId); - apiUsageState.setEntityId(tenantId); + apiUsageState.setEntityId(entityId); apiUsageState.setTransportState(ApiUsageStateValue.ENABLED); apiUsageState.setReExecState(ApiUsageStateValue.ENABLED); apiUsageState.setJsExecState(ApiUsageStateValue.ENABLED); @@ -87,6 +88,7 @@ public class ApiUsageStateServiceImpl extends AbstractEntityService implements A Tenant tenant = tenantDao.findById(tenantId, tenantId.getId()); TenantProfile tenantProfile = tenantProfileDao.findById(tenantId, tenant.getTenantProfileId().getId()); TenantProfileConfiguration configuration = tenantProfile.getProfileData().getConfiguration(); + List apiUsageStates = new ArrayList<>(); apiUsageStates.add(new BasicTsKvEntry(saved.getCreatedTime(), new StringDataEntry(ApiFeature.TRANSPORT.getApiStateKey(), ApiUsageStateValue.ENABLED.name()))); @@ -126,6 +128,12 @@ public class ApiUsageStateServiceImpl extends AbstractEntityService implements A return apiUsageStateDao.findTenantApiUsageState(tenantId.getId()); } + @Override + public ApiUsageState findApiUsageStateByEntityId(EntityId entityId) { + validateId(entityId.getId(), "Invalid entity id"); + return apiUsageStateDao.findApiUsageStateByEntityId(entityId); + } + @Override public ApiUsageState findApiUsageStateById(TenantId tenantId, ApiUsageStateId id) { log.trace("Executing findApiUsageStateById, tenantId [{}], apiUsageStateId [{}]", tenantId, id); @@ -148,10 +156,8 @@ public class ApiUsageStateServiceImpl extends AbstractEntityService implements A } if (apiUsageState.getEntityId() == null) { throw new DataValidationException("UsageRecord should be assigned to entity!"); - } else if (!EntityType.TENANT.equals(apiUsageState.getEntityId().getEntityType())) { - throw new DataValidationException("Only Tenant Usage Records are supported!"); - } else if (!apiUsageState.getTenantId().getId().equals(apiUsageState.getEntityId().getId())) { - throw new DataValidationException("Can't assign one Usage Record to multiple tenants!"); + } else if (apiUsageState.getEntityId().getEntityType() != EntityType.TENANT && apiUsageState.getEntityId().getEntityType() != EntityType.CUSTOMER) { + throw new DataValidationException("Only Tenant and Customer Usage Records are supported!"); } } }; diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/MailService.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/MailService.java index 7160c150f4..0b957c8b23 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/MailService.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/MailService.java @@ -20,6 +20,7 @@ import org.thingsboard.server.common.data.ApiFeature; import org.thingsboard.server.common.data.ApiUsageStateMailMessage; import org.thingsboard.server.common.data.ApiUsageStateValue; import org.thingsboard.server.common.data.exception.ThingsboardException; +import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.TenantId; import javax.mail.MessagingException; @@ -40,7 +41,7 @@ public interface MailService { void sendPasswordWasResetEmail(String loginLink, String email) throws ThingsboardException; - void send(TenantId tenantId, String from, String to, String cc, String bcc, String subject, String body) throws MessagingException; + void send(TenantId tenantId, CustomerId customerId, String from, String to, String cc, String bcc, String subject, String body) throws MessagingException; void sendAccountLockoutEmail( String lockoutEmail, String email, Integer maxFailedLoginAttempts) throws ThingsboardException; diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java index 224a10f701..e641c83144 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java @@ -16,6 +16,7 @@ package org.thingsboard.rule.engine.api; import com.google.common.util.concurrent.FutureCallback; +import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; @@ -31,7 +32,7 @@ public interface RuleEngineTelemetryService { void saveAndNotify(TenantId tenantId, EntityId entityId, List ts, FutureCallback callback); - void saveAndNotify(TenantId tenantId, EntityId entityId, List ts, long ttl, FutureCallback callback); + void saveAndNotify(TenantId tenantId, CustomerId id, EntityId entityId, List ts, long ttl, FutureCallback callback); void saveAndNotify(TenantId tenantId, EntityId entityId, String scope, List attributes, FutureCallback callback); diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/SmsService.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/SmsService.java index e0190177f0..caf96a68e6 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/SmsService.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/SmsService.java @@ -15,6 +15,7 @@ */ package org.thingsboard.rule.engine.api; +import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.sms.config.TestSmsRequest; import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.common.data.id.TenantId; @@ -23,7 +24,7 @@ public interface SmsService { void updateSmsConfiguration(); - void sendSms(TenantId tenantId, String[] numbersTo, String message) throws ThingsboardException;; + void sendSms(TenantId tenantId, CustomerId customerId, String[] numbersTo, String message) throws ThingsboardException;; void sendTestSms(TestSmsRequest testSmsRequest) throws ThingsboardException; diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbSendEmailNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbSendEmailNode.java index 22aefd16f1..45e6efadb2 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbSendEmailNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/mail/TbSendEmailNode.java @@ -26,7 +26,6 @@ import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.util.TbNodeUtils; -import org.thingsboard.server.common.data.ApiUsageRecordKey; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.msg.TbMsg; @@ -76,7 +75,7 @@ public class TbSendEmailNode implements TbNode { validateType(msg.getType()); EmailPojo email = getEmail(msg); withCallback(ctx.getMailExecutor().executeAsync(() -> { - sendEmail(ctx, email); + sendEmail(ctx, msg, email); return null; }), ok -> ctx.tellSuccess(msg), @@ -86,9 +85,9 @@ public class TbSendEmailNode implements TbNode { } } - private void sendEmail(TbContext ctx, EmailPojo email) throws Exception { + private void sendEmail(TbContext ctx, TbMsg msg, EmailPojo email) throws Exception { if (this.config.isUseSystemSmtpSettings()) { - ctx.getMailService().send(ctx.getTenantId(), email.getFrom(), email.getTo(), email.getCc(), + ctx.getMailService().send(ctx.getTenantId(), msg.getCustomerId(), email.getFrom(), email.getTo(), email.getCc(), email.getBcc(), email.getSubject(), email.getBody()); } else { MimeMessage mailMsg = mailSender.createMimeMessage(); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/sms/TbSendSmsNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/sms/TbSendSmsNode.java index df75afe086..22d293c69a 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/sms/TbSendSmsNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/sms/TbSendSmsNode.java @@ -76,7 +76,7 @@ public class TbSendSmsNode implements TbNode { String message = TbNodeUtils.processPattern(this.config.getSmsMessageTemplate(), msg); String[] numbersToList = numbersTo.split(","); if (this.config.isUseSystemSmsSettings()) { - ctx.getSmsService().sendSms(ctx.getTenantId(), numbersToList, message); + ctx.getSmsService().sendSms(ctx.getTenantId(), msg.getCustomerId(), numbersToList, message); } else { for (String numberTo : numbersToList) { this.smsSender.sendSms(numberTo, message); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java index dc657cfce6..c99a575cc3 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java @@ -25,6 +25,7 @@ import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; @@ -93,7 +94,7 @@ public class TbMsgTimeseriesNode implements TbNode { if (ttl == 0L) { ttl = tenantProfileDefaultStorageTtl; } - ctx.getTelemetryService().saveAndNotify(ctx.getTenantId(), msg.getOriginator(), tsKvEntryList, ttl, new TelemetryNodeCallback(ctx, msg)); + ctx.getTelemetryService().saveAndNotify(ctx.getTenantId(), msg.getCustomerId(), msg.getOriginator(), tsKvEntryList, ttl, new TelemetryNodeCallback(ctx, msg)); } public static long getTs(TbMsg msg) {