diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index e645339e0b..afe929f6b1 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -39,6 +39,7 @@ import org.thingsboard.server.common.data.Customer; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceProfile; +import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.alarm.Alarm; import org.thingsboard.server.common.data.asset.Asset; import org.thingsboard.server.common.data.id.DeviceId; @@ -510,14 +511,25 @@ class DefaultTbContext implements TbContext { mainCtx.getRuleNodeStateService().removeByRuleNodeIdAndEntityId(getTenantId(), getSelfId(), entityId); } + @Override + public void addTenantProfileListener(Consumer listener) { + mainCtx.getTenantProfileCache().addListener(getTenantId(), getSelfId(), listener); + } + @Override public void addDeviceProfileListeners(Consumer profileListener, BiConsumer deviceListener) { mainCtx.getDeviceProfileCache().addListener(getTenantId(), getSelfId(), profileListener, deviceListener); } @Override - public void removeProfileListener() { + public void removeListeners() { mainCtx.getDeviceProfileCache().removeListener(getTenantId(), getSelfId()); + mainCtx.getTenantProfileCache().removeListener(getTenantId(), getSelfId()); + } + + @Override + public TenantProfile getTenantProfile() { + return mainCtx.getTenantProfileCache().get(getTenantId()); } private TbMsgMetaData getActionMetaData(RuleNodeId ruleNodeId) { diff --git a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java index fb5fade521..2141027fa7 100644 --- a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java +++ b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java @@ -45,6 +45,7 @@ import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.audit.ActionType; import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.common.data.id.DeviceId; @@ -69,6 +70,7 @@ import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.kv.ReadTsKvQuery; import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; import org.thingsboard.server.common.transport.adaptor.JsonConverter; import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.queue.util.TbCoreComponent; @@ -93,6 +95,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; /** @@ -205,7 +208,7 @@ public class TelemetryController extends BaseController { @RequestParam(name = "interval", defaultValue = "0") Long interval, @RequestParam(name = "limit", defaultValue = "100") Integer limit, @RequestParam(name = "agg", defaultValue = "NONE") String aggStr, - @RequestParam(name= "orderBy", defaultValue = "DESC") String orderBy, + @RequestParam(name = "orderBy", defaultValue = "DESC") String orderBy, @RequestParam(name = "useStrictDataTypes", required = false, defaultValue = "false") Boolean useStrictDataTypes) throws ThingsboardException { return accessValidator.validateEntityAndCallback(getCurrentUser(), Operation.READ_TELEMETRY, entityType, entityIdStr, (result, tenantId, entityId) -> { @@ -392,7 +395,7 @@ public class TelemetryController extends BaseController { if (attributes.isEmpty()) { return getImmediateDeferredResult("No attributes data found in request body!", HttpStatus.BAD_REQUEST); } - for (AttributeKvEntry attributeKvEntry: attributes) { + for (AttributeKvEntry attributeKvEntry : attributes) { if (attributeKvEntry.getKey().isEmpty() || attributeKvEntry.getKey().trim().length() == 0) { return getImmediateDeferredResult("Key cannot be empty or contains only spaces", HttpStatus.BAD_REQUEST); } @@ -440,9 +443,13 @@ public class TelemetryController extends BaseController { if (entries.isEmpty()) { return getImmediateDeferredResult("No timeseries data found in request body!", HttpStatus.BAD_REQUEST); } - SecurityUser user = getCurrentUser(); return accessValidator.validateEntityAndCallback(getCurrentUser(), Operation.WRITE_TELEMETRY, entityIdSrc, (result, tenantId, entityId) -> { - tsSubService.saveAndNotify(tenantId, entityId, entries, ttl, new FutureCallback() { + long tenantTtl = ttl; + if (!TenantId.SYS_TENANT_ID.equals(tenantId) && tenantTtl == 0) { + TenantProfile tenantProfile = tenantProfileCache.get(tenantId); + tenantTtl = TimeUnit.DAYS.toSeconds(((DefaultTenantProfileConfiguration) tenantProfile.getProfileData().getConfiguration()).getDefaultStorageTtlDays()); + } + tsSubService.saveAndNotify(tenantId, entityId, entries, tenantTtl, new FutureCallback() { @Override public void onSuccess(@Nullable Void tmp) { result.setResult(new ResponseEntity(HttpStatus.OK)); diff --git a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java index 5ef5adc386..234b64f94e 100644 --- a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java @@ -521,27 +521,27 @@ public class DefaultDeviceStateService implements DeviceStateService { private void save(DeviceId deviceId, String key, long value) { if (persistToTelemetry) { - tsSubService.saveAndNotify( + tsSubService.saveAndNotifyInternal( TenantId.SYS_TENANT_ID, deviceId, Collections.singletonList(new BasicTsKvEntry(System.currentTimeMillis(), new LongDataEntry(key, value))), - new AttributeSaveCallback(deviceId, key, value)); + new AttributeSaveCallback<>(deviceId, key, value)); } else { - tsSubService.saveAttrAndNotify(TenantId.SYS_TENANT_ID, deviceId, DataConstants.SERVER_SCOPE, key, value, new AttributeSaveCallback(deviceId, key, value)); + tsSubService.saveAttrAndNotify(TenantId.SYS_TENANT_ID, deviceId, DataConstants.SERVER_SCOPE, key, value, new AttributeSaveCallback<>(deviceId, key, value)); } } private void save(DeviceId deviceId, String key, boolean value) { if (persistToTelemetry) { - tsSubService.saveAndNotify( + tsSubService.saveAndNotifyInternal( TenantId.SYS_TENANT_ID, deviceId, Collections.singletonList(new BasicTsKvEntry(System.currentTimeMillis(), new BooleanDataEntry(key, value))), - new AttributeSaveCallback(deviceId, key, value)); + new AttributeSaveCallback<>(deviceId, key, value)); } else { - tsSubService.saveAttrAndNotify(TenantId.SYS_TENANT_ID, deviceId, DataConstants.SERVER_SCOPE, key, value, new AttributeSaveCallback(deviceId, key, value)); + tsSubService.saveAttrAndNotify(TenantId.SYS_TENANT_ID, deviceId, DataConstants.SERVER_SCOPE, key, value, new AttributeSaveCallback<>(deviceId, key, value)); } } - private static class AttributeSaveCallback implements FutureCallback { + private static class AttributeSaveCallback implements FutureCallback { private final DeviceId deviceId; private final String key; private final Object value; @@ -553,7 +553,7 @@ public class DefaultDeviceStateService implements DeviceStateService { } @Override - public void onSuccess(@Nullable Void result) { + public void onSuccess(@Nullable T result) { log.trace("[{}] Successfully updated attribute [{}] with value [{}]", deviceId, key, value); } diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index 7a35e6b317..5b3ad05ee9 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -25,6 +25,7 @@ import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.data.ApiUsageRecordKey; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityView; +import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; @@ -34,13 +35,14 @@ import org.thingsboard.server.common.data.kv.DoubleDataEntry; import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.entityview.EntityViewService; +import org.thingsboard.server.dao.tenant.TbTenantProfileCache; import org.thingsboard.server.dao.timeseries.TimeseriesService; -import org.thingsboard.server.dao.usagerecord.ApiUsageStateService; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.usagestats.TbApiUsageClient; @@ -119,11 +121,12 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer @Override public void saveAndNotify(TenantId tenantId, EntityId entityId, List ts, long ttl, FutureCallback callback) { checkInternalEntity(entityId); - if (apiUsageStateService.getApiUsageState(tenantId).isDbStorageEnabled()) { + boolean sysTenant = TenantId.SYS_TENANT_ID.equals(tenantId) || tenantId == null; + if (sysTenant || apiUsageStateService.getApiUsageState(tenantId).isDbStorageEnabled()) { saveAndNotifyInternal(tenantId, entityId, ts, ttl, new FutureCallback() { @Override public void onSuccess(Integer result) { - if (result != null && result > 0) { + if (!sysTenant && result != null && result > 0) { apiUsageClient.report(tenantId, ApiUsageRecordKey.STORAGE_DP_COUNT, result); } callback.onSuccess(null); @@ -134,7 +137,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer callback.onFailure(t); } }); - } else{ + } else { callback.onFailure(new RuntimeException("DB storage writes are disabled due to API limits!")); } } diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/tenant/TbTenantProfileCache.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/tenant/TbTenantProfileCache.java index 735e23e73a..ea3d65eb49 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/tenant/TbTenantProfileCache.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/tenant/TbTenantProfileCache.java @@ -16,9 +16,12 @@ package org.thingsboard.server.dao.tenant; import org.thingsboard.server.common.data.TenantProfile; +import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantProfileId; +import java.util.function.Consumer; + public interface TbTenantProfileCache { TenantProfile get(TenantId tenantId); @@ -31,4 +34,8 @@ public interface TbTenantProfileCache { void evict(TenantId id); + void addListener(TenantId tenantId, EntityId listenerId, Consumer profileListener); + + void removeListener(TenantId tenantId, EntityId listenerId); + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/tenant/DefaultTbTenantProfileCache.java b/dao/src/main/java/org/thingsboard/server/dao/tenant/DefaultTbTenantProfileCache.java index adf05e0f74..b468019872 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/tenant/DefaultTbTenantProfileCache.java +++ b/dao/src/main/java/org/thingsboard/server/dao/tenant/DefaultTbTenantProfileCache.java @@ -19,16 +19,15 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.TenantProfile; +import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantProfileId; -import org.thingsboard.server.dao.tenant.TbTenantProfileCache; -import org.thingsboard.server.dao.tenant.TenantProfileService; -import org.thingsboard.server.dao.tenant.TenantService; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; @Service @Slf4j @@ -40,6 +39,7 @@ public class DefaultTbTenantProfileCache implements TbTenantProfileCache { private final ConcurrentMap tenantProfilesMap = new ConcurrentHashMap<>(); private final ConcurrentMap tenantsMap = new ConcurrentHashMap<>(); + private final ConcurrentMap>> profileListeners = new ConcurrentHashMap<>(); public DefaultTbTenantProfileCache(TenantProfileService tenantProfileService, TenantService tenantService) { this.tenantProfileService = tenantProfileService; @@ -85,17 +85,56 @@ public class DefaultTbTenantProfileCache implements TbTenantProfileCache { public void put(TenantProfile profile) { if (profile.getId() != null) { tenantProfilesMap.put(profile.getId(), profile); + notifyTenantListeners(profile); } } @Override public void evict(TenantProfileId profileId) { tenantProfilesMap.remove(profileId); + notifyTenantListeners(get(profileId)); + } + + public void notifyTenantListeners(TenantProfile tenantProfile) { + if (tenantProfile != null) { + tenantsMap.forEach(((tenantId, tenantProfileId) -> { + if (tenantProfileId.equals(tenantProfile.getId())) { + ConcurrentMap> tenantListeners = profileListeners.get(tenantId); + if (tenantListeners != null) { + tenantListeners.forEach((id, listener) -> listener.accept(tenantProfile)); + } + } + })); + } } @Override public void evict(TenantId tenantId) { tenantsMap.remove(tenantId); + TenantProfile tenantProfile = get(tenantId); + if (tenantProfile != null) { + ConcurrentMap> tenantListeners = profileListeners.get(tenantId); + if (tenantListeners != null) { + tenantListeners.forEach((id, listener) -> listener.accept(tenantProfile)); + } + } + } + + @Override + public void addListener(TenantId tenantId, EntityId listenerId, Consumer profileListener) { + //Force cache of the tenant id. + get(tenantId); + if (profileListener != null) { + profileListeners.computeIfAbsent(tenantId, id -> new ConcurrentHashMap<>()).put(listenerId, profileListener); + } + } + + @Override + public void removeListener(TenantId tenantId, EntityId listenerId) { + ConcurrentMap> tenantListeners = profileListeners.get(tenantId); + if (tenantListeners != null) { + tenantListeners.remove(listenerId); + } } } diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java index eb494af569..8dd9add1bd 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java @@ -23,6 +23,7 @@ import org.thingsboard.server.common.data.ApiUsageRecordKey; import org.thingsboard.server.common.data.Customer; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceProfile; +import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.alarm.Alarm; import org.thingsboard.server.common.data.asset.Asset; import org.thingsboard.server.common.data.id.DeviceId; @@ -237,7 +238,11 @@ public interface TbContext { void clearRuleNodeStates(); + void addTenantProfileListener(Consumer listener); + void addDeviceProfileListeners(Consumer listener, BiConsumer deviceListener); - void removeProfileListener(); + void removeListeners(); + + TenantProfile getTenantProfile(); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNode.java index b4bb19438f..27852316af 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNode.java @@ -149,7 +149,7 @@ public class TbDeviceProfileNode implements TbNode { @Override public void destroy() { - ctx.removeProfileListener(); + ctx.removeListeners(); deviceStates.clear(); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java index 6042ac9939..e803392d5b 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java @@ -24,6 +24,7 @@ import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.server.common.data.TenantProfile; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; @@ -31,10 +32,12 @@ import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.session.SessionMsgType; import org.thingsboard.server.common.transport.adaptor.JsonConverter; +import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; @Slf4j @RuleNode( @@ -50,12 +53,20 @@ import java.util.Map; public class TbMsgTimeseriesNode implements TbNode { private TbMsgTimeseriesNodeConfiguration config; + private TbContext ctx; private long tenantProfileDefaultStorageTtl; @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { this.config = TbNodeUtils.convert(configuration, TbMsgTimeseriesNodeConfiguration.class); + this.ctx = ctx; + ctx.addTenantProfileListener(this::onTenantProfileUpdate); + onTenantProfileUpdate(ctx.getTenantProfile()); + } + void onTenantProfileUpdate(TenantProfile tenantProfile) { + DefaultTenantProfileConfiguration configuration = (DefaultTenantProfileConfiguration) tenantProfile.getProfileData().getConfiguration(); + tenantProfileDefaultStorageTtl = TimeUnit.DAYS.toSeconds(configuration.getDefaultStorageTtlDays()); } @Override @@ -101,6 +112,7 @@ public class TbMsgTimeseriesNode implements TbNode { @Override public void destroy() { + ctx.removeListeners(); } }