diff --git a/application/src/main/java/org/thingsboard/server/service/device/DeviceProvisionServiceImpl.java b/application/src/main/java/org/thingsboard/server/service/device/DeviceProvisionServiceImpl.java index ad734a5c07..7fd42740c4 100644 --- a/application/src/main/java/org/thingsboard/server/service/device/DeviceProvisionServiceImpl.java +++ b/application/src/main/java/org/thingsboard/server/service/device/DeviceProvisionServiceImpl.java @@ -210,7 +210,7 @@ public class DeviceProvisionServiceImpl implements DeviceProvisionService { } } - private ListenableFuture> saveProvisionStateAttribute(Device device) { + private ListenableFuture> saveProvisionStateAttribute(Device device) { return attributesService.save(device.getTenantId(), device.getId(), DataConstants.SERVER_SCOPE, Collections.singletonList(new BaseAttributeKvEntry(new StringDataEntry(DEVICE_PROVISION_STATE, PROVISIONED_STATE), System.currentTimeMillis()))); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index de0cea14d6..6c991444b2 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -309,10 +309,10 @@ public final class EdgeGrpcSession implements Closeable { public void onSuccess(@Nullable UUID ifOffset) { if (ifOffset != null) { Long newStartTs = Uuids.unixTimestamp(ifOffset); - ListenableFuture> updateFuture = updateQueueStartTs(newStartTs); + ListenableFuture> updateFuture = updateQueueStartTs(newStartTs); Futures.addCallback(updateFuture, new FutureCallback<>() { @Override - public void onSuccess(@Nullable List list) { + public void onSuccess(@Nullable List list) { log.debug("[{}] queue offset was updated [{}][{}]", sessionId, ifOffset, newStartTs); result.set(null); } @@ -496,7 +496,7 @@ public final class EdgeGrpcSession implements Closeable { }, ctx.getGrpcCallbackExecutorService()); } - private ListenableFuture> updateQueueStartTs(Long newStartTs) { + private ListenableFuture> updateQueueStartTs(Long newStartTs) { log.trace("[{}] updating QueueStartTs [{}][{}]", this.sessionId, edge.getId(), newStartTs); List attributes = Collections.singletonList( new BaseAttributeKvEntry( diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java index 9a9b568a23..a72f41c887 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/TelemetryEdgeProcessor.java @@ -206,10 +206,10 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor { SettableFuture futureToSet = SettableFuture.create(); JsonObject json = JsonUtils.getJsonObject(msg.getKvList()); Set attributes = JsonConverter.convertToAttributes(json); - ListenableFuture> future = attributesService.save(tenantId, entityId, metaData.getValue("scope"), new ArrayList<>(attributes)); - Futures.addCallback(future, new FutureCallback>() { + ListenableFuture> future = attributesService.save(tenantId, entityId, metaData.getValue("scope"), new ArrayList<>(attributes)); + Futures.addCallback(future, new FutureCallback<>() { @Override - public void onSuccess(@Nullable List voids) { + public void onSuccess(@Nullable List keys) { Pair defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId); String queueName = defaultQueueAndRuleChain.getKey(); RuleChainId ruleChainId = defaultQueueAndRuleChain.getValue(); diff --git a/application/src/main/java/org/thingsboard/server/service/install/DefaultSystemDataLoaderService.java b/application/src/main/java/org/thingsboard/server/service/install/DefaultSystemDataLoaderService.java index c5bdf722af..9e7636879f 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/DefaultSystemDataLoaderService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/DefaultSystemDataLoaderService.java @@ -532,7 +532,7 @@ public class DefaultSystemDataLoaderService implements SystemDataLoaderService { Collections.singletonList(new BasicTsKvEntry(System.currentTimeMillis(), new BooleanDataEntry(key, value))), 0L); addTsCallback(saveFuture, new TelemetrySaveCallback<>(deviceId, key, value)); } else { - ListenableFuture> saveFuture = attributesService.save(TenantId.SYS_TENANT_ID, deviceId, DataConstants.SERVER_SCOPE, + ListenableFuture> saveFuture = attributesService.save(TenantId.SYS_TENANT_ID, deviceId, DataConstants.SERVER_SCOPE, Collections.singletonList(new BaseAttributeKvEntry(new BooleanDataEntry(key, value) , System.currentTimeMillis()))); addTsCallback(saveFuture, new TelemetrySaveCallback<>(deviceId, key, value)); diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index 440c9075e8..38275c09b6 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -243,7 +243,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer @Override public void saveAndNotifyInternal(TenantId tenantId, EntityId entityId, String scope, List attributes, boolean notifyDevice, FutureCallback callback) { - ListenableFuture> saveFuture = attrService.save(tenantId, entityId, scope, attributes); + ListenableFuture> saveFuture = attrService.save(tenantId, entityId, scope, attributes); addVoidCallback(saveFuture, callback); addWsCallback(saveFuture, success -> onAttributesUpdate(tenantId, entityId, scope, attributes, notifyDevice)); } @@ -269,7 +269,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer @Override public void deleteAndNotifyInternal(TenantId tenantId, EntityId entityId, String scope, List keys, FutureCallback callback) { - ListenableFuture> deleteFuture = attrService.removeAll(tenantId, entityId, scope, keys); + ListenableFuture> deleteFuture = attrService.removeAll(tenantId, entityId, scope, keys); addVoidCallback(deleteFuture, callback); addWsCallback(deleteFuture, success -> onAttributesDelete(tenantId, entityId, scope, keys)); } diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java index 79e19d5c82..40cce12a12 100644 --- a/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/AbstractRuleEngineControllerTest.java @@ -57,12 +57,18 @@ public abstract class AbstractRuleEngineControllerTest extends AbstractControlle } protected PageData getDebugEvents(TenantId tenantId, EntityId entityId, int limit) throws Exception { + return getEvents(tenantId, entityId, DataConstants.DEBUG_RULE_NODE, limit); + } + + protected PageData getEvents(TenantId tenantId, EntityId entityId, String eventType, int limit) throws Exception { TimePageLink pageLink = new TimePageLink(limit); return doGetTypedWithTimePageLink("/api/events/{entityType}/{entityId}/{eventType}?tenantId={tenantId}&", new TypeReference>() { - }, pageLink, entityId.getEntityType(), entityId.getId(), DataConstants.DEBUG_RULE_NODE, tenantId.getId()); + }, pageLink, entityId.getEntityType(), entityId.getId(), eventType, tenantId.getId()); } + + protected JsonNode getMetadata(Event outEvent) { String metaDataStr = outEvent.getBody().get("metadata").asText(); try { diff --git a/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java index c3a9d9817a..0f08e618c3 100644 --- a/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/rules/lifecycle/AbstractRuleEngineLifecycleIntegrationTest.java @@ -105,17 +105,20 @@ public abstract class AbstractRuleEngineLifecycleIntegrationTest extends Abstrac Assert.assertNotNull(ruleChainFinal.getFirstRuleNodeId()); //TODO find out why RULE_NODE update event did not appear all the time -// List rcEvents = Awaitility.await("get debug by rule chain") -// .pollInterval(10, MILLISECONDS) -// .atMost(TIMEOUT, TimeUnit.SECONDS) -// .until(() -> { -// List debugEvents = getDebugEvents(tenantId, ruleChainFinal.getFirstRuleNodeId(), 1000) -// .getData().stream().filter(x->true).collect(Collectors.toList()); -// log.warn("filtered debug events [{}]", debugEvents.size()); -// debugEvents.forEach((e) -> log.warn("event: {}", e)); -// return debugEvents; -// }, -// x -> x.size() >= 2); + List rcEvents = Awaitility.await("Rule Node started successfully") + .pollInterval(10, MILLISECONDS) + .atMost(TIMEOUT, TimeUnit.SECONDS) + .until(() -> { + List debugEvents = getEvents(tenantId, ruleChainFinal.getFirstRuleNodeId(), DataConstants.LC_EVENT, 1000) + .getData().stream().filter(e-> { + var body = e.getBody(); + return body.has("event") && body.get("event").asText().equals("STARTED") + && body.has("success") && body.get("success").asBoolean(); + }).collect(Collectors.toList()); + debugEvents.forEach((e) -> log.trace("event: {}", e)); + return debugEvents; + }, + x -> x.size() == 1); // Saving the device Device device = new Device(); @@ -133,6 +136,7 @@ public abstract class AbstractRuleEngineLifecycleIntegrationTest extends Abstrac TbMsg tbMsg = TbMsg.newMsg("CUSTOM", device.getId(), new TbMsgMetaData(), "{}", tbMsgCallback); QueueToRuleEngineMsg qMsg = new QueueToRuleEngineMsg(tenantId, tbMsg, null, null); // Pushing Message to the system + log.warn("before tell tbMsgCallback"); actorSystem.tell(qMsg); log.warn("awaiting tbMsgCallback"); Mockito.verify(tbMsgCallback, Mockito.timeout(TimeUnit.SECONDS.toMillis(TIMEOUT))).onSuccess(); diff --git a/application/src/test/resources/logback-test.xml b/application/src/test/resources/logback-test.xml index d3301bf660..f051721391 100644 --- a/application/src/test/resources/logback-test.xml +++ b/application/src/test/resources/logback-test.xml @@ -9,7 +9,8 @@ - + + diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java index 5ae8f68eb1..6497778676 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java @@ -37,9 +37,9 @@ public interface AttributesService { ListenableFuture> findAll(TenantId tenantId, EntityId entityId, String scope); - ListenableFuture> save(TenantId tenantId, EntityId entityId, String scope, List attributes); + ListenableFuture> save(TenantId tenantId, EntityId entityId, String scope, List attributes); - ListenableFuture> removeAll(TenantId tenantId, EntityId entityId, String scope, List attributeKeys); + ListenableFuture> removeAll(TenantId tenantId, EntityId entityId, String scope, List attributeKeys); List findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId); diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesCacheWrapper.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesCacheWrapper.java index 6d620800ca..466cd4b0f1 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesCacheWrapper.java +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesCacheWrapper.java @@ -1,63 +1,15 @@ -/** - * Copyright © 2016-2022 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ package org.thingsboard.server.dao.attributes; -import lombok.extern.slf4j.Slf4j; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.cache.Cache; -import org.springframework.cache.CacheManager; -import org.springframework.context.annotation.Primary; -import org.springframework.stereotype.Service; import org.thingsboard.server.common.data.kv.AttributeKvEntry; -import static org.thingsboard.server.common.data.CacheConstants.ATTRIBUTES_CACHE; +public interface AttributesCacheWrapper { -@Service -@ConditionalOnProperty(prefix = "cache.attributes", value = "enabled", havingValue = "true") -@Primary -@Slf4j -public class AttributesCacheWrapper { - private final Cache attributesCache; + Cache.ValueWrapper get(AttributeCacheKey attributeCacheKey); - public AttributesCacheWrapper(CacheManager cacheManager) { - this.attributesCache = cacheManager.getCache(ATTRIBUTES_CACHE); - } + void put(AttributeCacheKey attributeCacheKey, AttributeKvEntry attributeKvEntry); - public Cache.ValueWrapper get(AttributeCacheKey attributeCacheKey) { - try { - return attributesCache.get(attributeCacheKey); - } catch (Exception e) { - log.debug("Failed to retrieve element from cache for key {}. Reason - {}.", attributeCacheKey, e.getMessage()); - return null; - } - } + void putIfAbsent(AttributeCacheKey attributeCacheKey, AttributeKvEntry attributeKvEntry); - public void put(AttributeCacheKey attributeCacheKey, AttributeKvEntry attributeKvEntry) { - try { - attributesCache.put(attributeCacheKey, attributeKvEntry); - } catch (Exception e) { - log.debug("Failed to put element from cache for key {}. Reason - {}.", attributeCacheKey, e.getMessage()); - } - } - - public void evict(AttributeCacheKey attributeCacheKey) { - try { - attributesCache.evict(attributeCacheKey); - } catch (Exception e) { - log.debug("Failed to evict element from cache for key {}. Reason - {}.", attributeCacheKey, e.getMessage()); - } - } + void evict(AttributeCacheKey attributeCacheKey); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesDao.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesDao.java index e3819489de..efa4a30ce4 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesDao.java @@ -37,9 +37,9 @@ public interface AttributesDao { ListenableFuture> findAll(TenantId tenantId, EntityId entityId, String attributeType); - ListenableFuture save(TenantId tenantId, EntityId entityId, String attributeType, AttributeKvEntry attribute); + ListenableFuture save(TenantId tenantId, EntityId entityId, String attributeType, AttributeKvEntry attribute); - ListenableFuture> removeAll(TenantId tenantId, EntityId entityId, String attributeType, List keys); + List> removeAll(TenantId tenantId, EntityId entityId, String attributeType, List keys); List findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId); diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java index 8d6c57e709..508cf4c16a 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -80,17 +80,16 @@ public class BaseAttributesService implements AttributesService { } @Override - public ListenableFuture> save(TenantId tenantId, EntityId entityId, String scope, List attributes) { + public ListenableFuture> save(TenantId tenantId, EntityId entityId, String scope, List attributes) { validate(entityId, scope); - attributes.forEach(attribute -> validate(attribute)); - - List> saveFutures = attributes.stream().map(attribute -> attributesDao.save(tenantId, entityId, scope, attribute)).collect(Collectors.toList()); + attributes.forEach(AttributeUtils::validate); + List> saveFutures = attributes.stream().map(attribute -> attributesDao.save(tenantId, entityId, scope, attribute)).collect(Collectors.toList()); return Futures.allAsList(saveFutures); } @Override - public ListenableFuture> removeAll(TenantId tenantId, EntityId entityId, String scope, List attributeKeys) { + public ListenableFuture> removeAll(TenantId tenantId, EntityId entityId, String scope, List attributeKeys) { validate(entityId, scope); - return attributesDao.removeAll(tenantId, entityId, scope, attributeKeys); + return Futures.allAsList(attributesDao.removeAll(tenantId, entityId, scope, attributeKeys)); } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java index feb4b0e275..abe741d6d7 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -30,7 +30,6 @@ import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; -import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.stats.DefaultCounter; import org.thingsboard.server.common.stats.StatsFactory; import org.thingsboard.server.dao.cache.CacheExecutorService; @@ -88,9 +87,9 @@ public class CachedAttributesService implements AttributesService { /** * Will return: - * - for the local cache type (cache.type="coffeine"): directExecutor (run callback immediately in the same thread) - * - for the remote cache: dedicated thread pool for the cache IO calls to unblock any caller thread - * */ + * - for the local cache type (cache.type="coffeine"): directExecutor (run callback immediately in the same thread) + * - for the remote cache: dedicated thread pool for the cache IO calls to unblock any caller thread + */ Executor getExecutor(String cacheType, CacheExecutorService cacheExecutorService) { if (StringUtils.isEmpty(cacheType) || LOCAL_CACHE_TYPE.equals(cacheType)) { log.info("Going to use directExecutor for the local cache type {}", cacheType); @@ -116,7 +115,7 @@ public class CachedAttributesService implements AttributesService { ListenableFuture> result = attributesDao.find(tenantId, entityId, scope, attributeKey); return Futures.transform(result, foundAttrKvEntry -> { // TODO: think if it's a good idea to store 'empty' attributes - cacheWrapper.put(attributeCacheKey, foundAttrKvEntry.orElse(null)); + cacheWrapper.putIfAbsent(attributeCacheKey, foundAttrKvEntry.orElse(null)); return foundAttrKvEntry; }, cacheExecutor); } @@ -162,12 +161,12 @@ public class CachedAttributesService implements AttributesService { private List mergeDbAndCacheAttributes(EntityId entityId, String scope, List cachedAttributes, Set notFoundAttributeKeys, List foundInDbAttributes) { for (AttributeKvEntry foundInDbAttribute : foundInDbAttributes) { AttributeCacheKey attributeCacheKey = new AttributeCacheKey(scope, entityId, foundInDbAttribute.getKey()); - cacheWrapper.put(attributeCacheKey, foundInDbAttribute); + cacheWrapper.putIfAbsent(attributeCacheKey, foundInDbAttribute); notFoundAttributeKeys.remove(foundInDbAttribute.getKey()); } - for (String key : notFoundAttributeKeys){ - cacheWrapper.put(new AttributeCacheKey(scope, entityId, key), null); - } +// for (String key : notFoundAttributeKeys) { +// cacheWrapper.putIfAbsent(new AttributeCacheKey(scope, entityId, key), null); +// } List mergedAttributes = new ArrayList<>(cachedAttributes); mergedAttributes.addAll(foundInDbAttributes); return mergedAttributes; @@ -190,34 +189,30 @@ public class CachedAttributesService implements AttributesService { } @Override - public ListenableFuture> save(TenantId tenantId, EntityId entityId, String scope, List attributes) { + public ListenableFuture> save(TenantId tenantId, EntityId entityId, String scope, List attributes) { validate(entityId, scope); attributes.forEach(AttributeUtils::validate); - List> saveFutures = attributes.stream().map(attribute -> attributesDao.save(tenantId, entityId, scope, attribute)).collect(Collectors.toList()); - ListenableFuture> future = Futures.allAsList(saveFutures); + List> futures = new ArrayList<>(attributes.size()); + for (var attribute : attributes) { + ListenableFuture future = attributesDao.save(tenantId, entityId, scope, attribute); + futures.add(Futures.transform(future, key -> { + cacheWrapper.evict(new AttributeCacheKey(scope, entityId, key)); + return key; + }, cacheExecutor)); + } - // TODO: can do if (attributesCache.get() != null) attributesCache.put() instead, but will be more twice more requests to cache - List attributeKeys = attributes.stream().map(KvEntry::getKey).collect(Collectors.toList()); - future.addListener(() -> evictAttributesFromCache(tenantId, entityId, scope, attributeKeys), cacheExecutor); - return future; + return Futures.allAsList(futures); } @Override - public ListenableFuture> removeAll(TenantId tenantId, EntityId entityId, String scope, List attributeKeys) { + public ListenableFuture> removeAll(TenantId tenantId, EntityId entityId, String scope, List attributeKeys) { validate(entityId, scope); - ListenableFuture> future = attributesDao.removeAll(tenantId, entityId, scope, attributeKeys); - future.addListener(() -> evictAttributesFromCache(tenantId, entityId, scope, attributeKeys), cacheExecutor); - return future; + List> futures = attributesDao.removeAll(tenantId, entityId, scope, attributeKeys); + return Futures.allAsList(futures.stream().map(future -> Futures.transform(future, key -> { + cacheWrapper.evict(new AttributeCacheKey(scope, entityId, key)); + return key; + }, cacheExecutor)).collect(Collectors.toList())); } - private void evictAttributesFromCache(TenantId tenantId, EntityId entityId, String scope, List attributeKeys) { - try { - for (String attributeKey : attributeKeys) { - cacheWrapper.evict(new AttributeCacheKey(scope, entityId, attributeKey)); - } - } catch (Exception e) { - log.error("[{}][{}] Failed to remove values from cache.", tenantId, entityId, e); - } - } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/DefaultAttributesCacheWrapper.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/DefaultAttributesCacheWrapper.java new file mode 100644 index 0000000000..aa0222551a --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/DefaultAttributesCacheWrapper.java @@ -0,0 +1,63 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.attributes; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.cache.Cache; +import org.springframework.cache.CacheManager; +import org.springframework.context.annotation.Primary; +import org.springframework.stereotype.Service; +import org.thingsboard.server.common.data.kv.AttributeKvEntry; + +import static org.thingsboard.server.common.data.CacheConstants.ATTRIBUTES_CACHE; + +@Service +@ConditionalOnProperty(prefix = "cache.attributes", value = "enabled", havingValue = "true") +@Primary +@Slf4j +public class DefaultAttributesCacheWrapper implements AttributesCacheWrapper { + private final Cache attributesCache; + + public DefaultAttributesCacheWrapper(CacheManager cacheManager) { + this.attributesCache = cacheManager.getCache(ATTRIBUTES_CACHE); + } + + @Override + public Cache.ValueWrapper get(AttributeCacheKey attributeCacheKey) { + var result = attributesCache.get(attributeCacheKey); + log.warn("[{}] Get = {}", attributeCacheKey, result); + return result; + } + + @Override + public void put(AttributeCacheKey attributeCacheKey, AttributeKvEntry attributeKvEntry) { + log.warn("[{}] Put = {}", attributeCacheKey, attributeKvEntry); + attributesCache.put(attributeCacheKey, attributeKvEntry); + } + + @Override + public void putIfAbsent(AttributeCacheKey attributeCacheKey, AttributeKvEntry attributeKvEntry) { + var result = attributesCache.putIfAbsent(attributeCacheKey, attributeKvEntry); + log.warn("[{}] Put if absent = {}, result = {}", attributeCacheKey, attributeKvEntry, result); + } + + @Override + public void evict(AttributeCacheKey attributeCacheKey) { + log.warn("[{}] Evict", attributeCacheKey); + attributesCache.evict(attributeCacheKey); + } +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java index da119d453e..a5f0e04700 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -29,6 +29,7 @@ import org.springframework.cache.annotation.Cacheable; import org.springframework.cache.annotation.Caching; import org.springframework.dao.ConcurrencyFailureException; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import org.springframework.util.StringUtils; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.id.EntityId; @@ -48,6 +49,8 @@ import org.thingsboard.server.dao.exception.DataValidationException; import org.thingsboard.server.dao.service.ConstraintValidator; import javax.annotation.Nullable; +import java.io.PrintWriter; +import java.io.StringWriter; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -86,11 +89,7 @@ public class BaseRelationService implements RelationService { @Cacheable(cacheNames = RELATIONS_CACHE, key = "{#from, #to, #relationType, #typeGroup}") @Override public EntityRelation getRelation(TenantId tenantId, EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup) { - try { - return getRelationAsync(tenantId, from, to, relationType, typeGroup).get(); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } + return getRelation(tenantId, from, to, relationType, typeGroup); } @Override @@ -108,6 +107,7 @@ public class BaseRelationService implements RelationService { @CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type, #relation.typeGroup, 'TO'}") }) @Override + @Transactional public boolean saveRelation(TenantId tenantId, EntityRelation relation) { log.trace("Executing saveRelation [{}]", relation); validate(relation); @@ -181,6 +181,7 @@ public class BaseRelationService implements RelationService { public ListenableFuture deleteRelationAsync(TenantId tenantId, EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup) { log.trace("Executing deleteRelationAsync [{}][{}][{}][{}]", from, to, relationType, typeGroup); validate(from, to, relationType, typeGroup); + //TODO: clear cache using transform return relationDao.deleteRelationAsync(tenantId, from, to, relationType, typeGroup); } @@ -319,11 +320,8 @@ public class BaseRelationService implements RelationService { public List findByFrom(TenantId tenantId, EntityId from, RelationTypeGroup typeGroup) { validate(from); validateTypeGroup(typeGroup); - try { - return relationDao.findAllByFromAsync(tenantId, from, typeGroup).get(); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } + log.trace("[{}] Find by from: [{}][{}]: ", tenantId, from, typeGroup, new RuntimeException()); + return relationDao.findAllByFrom(tenantId, from, typeGroup); } @Override @@ -345,7 +343,7 @@ public class BaseRelationService implements RelationService { } else { ListenableFuture> relationsFuture = relationDao.findAllByFromAsync(tenantId, from, typeGroup); Futures.addCallback(relationsFuture, - new FutureCallback>() { + new FutureCallback<>() { @Override public void onSuccess(@Nullable List result) { cache.putIfAbsent(fromAndTypeGroup, result); @@ -401,11 +399,7 @@ public class BaseRelationService implements RelationService { public List findByTo(TenantId tenantId, EntityId to, RelationTypeGroup typeGroup) { validate(to); validateTypeGroup(typeGroup); - try { - return relationDao.findAllByToAsync(tenantId, to, typeGroup).get(); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } + return relationDao.findAllByTo(tenantId, to, typeGroup); } @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlBlockingQueue.java b/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlBlockingQueue.java index 94ce9efd20..c1ee7f3fd3 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlBlockingQueue.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlBlockingQueue.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -66,7 +66,10 @@ public class TbSqlBlockingQueue implements TbSqlQueue { } queue.drainTo(entities, batchSize - 1); boolean fullPack = entities.size() == batchSize; - log.debug("[{}] Going to save {} entities", logName, entities.size()); + if (log.isDebugEnabled()) { + log.debug("[{}] Going to save {} entities", logName, entities.size()); + log.trace("[{}] Going to save entities: {}", logName, entities); + } Stream entitiesStream = entities.stream().map(TbSqlQueueElement::getEntity); saveFunction.accept( (params.isBatchSortEnabled() ? entitiesStream.sorted(batchUpdateComparator) : entitiesStream) diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlQueueElement.java b/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlQueueElement.java index 0de2d839ed..db15026819 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlQueueElement.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/TbSqlQueueElement.java @@ -17,7 +17,9 @@ package org.thingsboard.server.dao.sql; import com.google.common.util.concurrent.SettableFuture; import lombok.Getter; +import lombok.ToString; +@ToString(exclude = "future") public final class TbSqlQueueElement { @Getter private final SettableFuture future; diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java index 20b9e0d1f7..185dae1c2f 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,6 +18,7 @@ package org.thingsboard.server.dao.sql.attributes; import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -39,6 +40,7 @@ import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; +import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; import java.util.List; @@ -153,7 +155,7 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl } @Override - public ListenableFuture save(TenantId tenantId, EntityId entityId, String attributeType, AttributeKvEntry attribute) { + public ListenableFuture save(TenantId tenantId, EntityId entityId, String attributeType, AttributeKvEntry attribute) { AttributeKvEntity entity = new AttributeKvEntity(); entity.setId(new AttributeKvCompositeKey(entityId.getEntityType(), entityId.getId(), attributeType, attribute.getKey())); entity.setLastUpdateTs(attribute.getLastUpdateTs()); @@ -165,18 +167,20 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl return addToQueue(entity); } - private ListenableFuture addToQueue(AttributeKvEntity entity) { - return queue.add(entity); + private ListenableFuture addToQueue(AttributeKvEntity entity) { + return Futures.transform(queue.add(entity), v -> entity.getId().getAttributeKey(), MoreExecutors.directExecutor()); } @Override - public ListenableFuture> removeAll(TenantId tenantId, EntityId entityId, String attributeType, List keys) { - return service.submit(() -> { - keys.forEach(key -> - attributeKvRepository.delete(entityId.getEntityType(), entityId.getId(), attributeType, key) - ); - return null; - }); + public List> removeAll(TenantId tenantId, EntityId entityId, String attributeType, List keys) { + List> futuresList = new ArrayList<>(keys.size()); + for (String key : keys) { + futuresList.add(service.submit(() -> { + attributeKvRepository.delete(entityId.getEntityType(), entityId.getId(), attributeType, key); + return key; + })); + } + return futuresList; } private AttributeKvCompositeKey getAttributeKvCompositeKey(EntityId entityId, String attributeType, String attributeKey) { diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/BaseEntityServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/BaseEntityServiceTest.java index 430ac85391..808e499239 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/BaseEntityServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/BaseEntityServiceTest.java @@ -339,12 +339,12 @@ public abstract class BaseEntityServiceTest extends AbstractServiceTest { List highTemperatures = new ArrayList<>(); createTestHierarchy(tenantId, assets, devices, new ArrayList<>(), new ArrayList<>(), temperatures, highTemperatures); - List>> attributeFutures = new ArrayList<>(); + List>> attributeFutures = new ArrayList<>(); for (int i = 0; i < devices.size(); i++) { Device device = devices.get(i); attributeFutures.add(saveLongAttribute(device.getId(), "temperature", temperatures.get(i), DataConstants.CLIENT_SCOPE)); } - Futures.successfulAsList(attributeFutures).get(); + Futures.allAsList(attributeFutures).get(); RelationsQueryFilter filter = new RelationsQueryFilter(); filter.setRootEntity(tenantId); @@ -518,12 +518,12 @@ public abstract class BaseEntityServiceTest extends AbstractServiceTest { List highTemperatures = new ArrayList<>(); createTestHierarchy(tenantId, assets, devices, new ArrayList<>(), new ArrayList<>(), temperatures, highTemperatures); - List>> attributeFutures = new ArrayList<>(); + List>> attributeFutures = new ArrayList<>(); for (int i = 0; i < devices.size(); i++) { Device device = devices.get(i); attributeFutures.add(saveLongAttribute(device.getId(), "temperature", temperatures.get(i), DataConstants.CLIENT_SCOPE)); } - Futures.successfulAsList(attributeFutures).get(); + Futures.allAsList(attributeFutures).get(); DeviceSearchQueryFilter filter = new DeviceSearchQueryFilter(); filter.setRootEntity(tenantId); @@ -593,12 +593,12 @@ public abstract class BaseEntityServiceTest extends AbstractServiceTest { List highConsumptions = new ArrayList<>(); createTestHierarchy(tenantId, assets, devices, consumptions, highConsumptions, new ArrayList<>(), new ArrayList<>()); - List>> attributeFutures = new ArrayList<>(); + List>> attributeFutures = new ArrayList<>(); for (int i = 0; i < assets.size(); i++) { Asset asset = assets.get(i); attributeFutures.add(saveLongAttribute(asset.getId(), "consumption", consumptions.get(i), DataConstants.SERVER_SCOPE)); } - Futures.successfulAsList(attributeFutures).get(); + Futures.allAsList(attributeFutures).get(); AssetSearchQueryFilter filter = new AssetSearchQueryFilter(); filter.setRootEntity(tenantId); @@ -1048,14 +1048,14 @@ public abstract class BaseEntityServiceTest extends AbstractServiceTest { } } - List>> attributeFutures = new ArrayList<>(); + List>> attributeFutures = new ArrayList<>(); for (int i = 0; i < devices.size(); i++) { Device device = devices.get(i); for (String currentScope : DataConstants.allScopes()) { attributeFutures.add(saveLongAttribute(device.getId(), "temperature", temperatures.get(i), currentScope)); } } - Futures.successfulAsList(attributeFutures).get(); + Futures.allAsList(attributeFutures).get(); DeviceTypeFilter filter = new DeviceTypeFilter(); filter.setDeviceType("default"); @@ -1150,12 +1150,12 @@ public abstract class BaseEntityServiceTest extends AbstractServiceTest { } } - List>> attributeFutures = new ArrayList<>(); + List>> attributeFutures = new ArrayList<>(); for (int i = 0; i < devices.size(); i++) { Device device = devices.get(i); attributeFutures.add(saveLongAttribute(device.getId(), "temperature", temperatures.get(i), DataConstants.CLIENT_SCOPE)); } - Futures.successfulAsList(attributeFutures).get(); + Futures.allAsList(attributeFutures).get(); DeviceTypeFilter filter = new DeviceTypeFilter(); filter.setDeviceType("default"); @@ -1301,7 +1301,7 @@ public abstract class BaseEntityServiceTest extends AbstractServiceTest { Device device = devices.get(i); timeseriesFutures.add(saveLongTimeseries(device.getId(), "temperature", temperatures.get(i))); } - Futures.successfulAsList(timeseriesFutures).get(); + Futures.allAsList(timeseriesFutures).get(); DeviceTypeFilter filter = new DeviceTypeFilter(); filter.setDeviceType("default"); @@ -1428,12 +1428,12 @@ public abstract class BaseEntityServiceTest extends AbstractServiceTest { } } - List>> attributeFutures = new ArrayList<>(); + List>> attributeFutures = new ArrayList<>(); for (int i = 0; i < devices.size(); i++) { Device device = devices.get(i); attributeFutures.add(saveStringAttribute(device.getId(), "attributeString", attributeStrings.get(i), DataConstants.CLIENT_SCOPE)); } - Futures.successfulAsList(attributeFutures).get(); + Futures.allAsList(attributeFutures).get(); DeviceTypeFilter filter = new DeviceTypeFilter(); filter.setDeviceType("default"); @@ -1764,13 +1764,13 @@ public abstract class BaseEntityServiceTest extends AbstractServiceTest { return filter; } - private ListenableFuture> saveLongAttribute(EntityId entityId, String key, long value, String scope) { + private ListenableFuture> saveLongAttribute(EntityId entityId, String key, long value, String scope) { KvEntry attrValue = new LongDataEntry(key, value); AttributeKvEntry attr = new BaseAttributeKvEntry(attrValue, 42L); return attributesService.save(SYSTEM_TENANT_ID, entityId, scope, Collections.singletonList(attr)); } - private ListenableFuture> saveStringAttribute(EntityId entityId, String key, String value, String scope) { + private ListenableFuture> saveStringAttribute(EntityId entityId, String key, String value, String scope) { KvEntry attrValue = new StringDataEntry(key, value); AttributeKvEntry attr = new BaseAttributeKvEntry(attrValue, 42L); return attributesService.save(SYSTEM_TENANT_ID, entityId, scope, Collections.singletonList(attr)); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbAbstractGetAttributesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbAbstractGetAttributesNode.java index 1c38a88709..f5752154c9 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbAbstractGetAttributesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/metadata/TbAbstractGetAttributesNode.java @@ -15,7 +15,6 @@ */ package org.thingsboard.rule.engine.metadata; -import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.json.JsonWriteFeature; import com.fasterxml.jackson.databind.ObjectMapper; @@ -24,6 +23,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.gson.JsonParseException; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.BooleanUtils; import org.thingsboard.rule.engine.api.TbContext; @@ -50,6 +50,7 @@ import static org.thingsboard.server.common.data.DataConstants.LATEST_TS; import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE; import static org.thingsboard.server.common.data.DataConstants.SHARED_SCOPE; +@Slf4j public abstract class TbAbstractGetAttributesNode implements TbNode { private static ObjectMapper mapper = new ObjectMapper(); @@ -112,6 +113,7 @@ public abstract class TbAbstractGetAttributesNode> attributeKvEntryListFuture = ctx.getAttributesService().find(ctx.getTenantId(), entityId, scope, keys); return Futures.transform(attributeKvEntryListFuture, attributeKvEntryList -> { + log.warn("[{}][{}][{}][{}] Lookup attribute result: {}", ctx.getTenantId(), entityId, scope, keys, attributeKvEntryList); if (!CollectionUtils.isEmpty(attributeKvEntryList)) { List existingAttributesKvEntry = attributeKvEntryList.stream().filter(attributeKvEntry -> keys.contains(attributeKvEntry.getKey())).collect(Collectors.toList()); existingAttributesKvEntry.forEach(kvEntry -> msg.getMetaData().putValue(prefix + kvEntry.getKey(), kvEntry.getValueAsString()));