Investigating the cache issues

This commit is contained in:
Andrii Shvaika 2022-04-29 16:12:22 +03:00
parent 23c774610a
commit 15104028ea
20 changed files with 188 additions and 163 deletions

View File

@ -210,7 +210,7 @@ public class DeviceProvisionServiceImpl implements DeviceProvisionService {
}
}
private ListenableFuture<List<Void>> saveProvisionStateAttribute(Device device) {
private ListenableFuture<List<String>> saveProvisionStateAttribute(Device device) {
return attributesService.save(device.getTenantId(), device.getId(), DataConstants.SERVER_SCOPE,
Collections.singletonList(new BaseAttributeKvEntry(new StringDataEntry(DEVICE_PROVISION_STATE, PROVISIONED_STATE),
System.currentTimeMillis())));

View File

@ -309,10 +309,10 @@ public final class EdgeGrpcSession implements Closeable {
public void onSuccess(@Nullable UUID ifOffset) {
if (ifOffset != null) {
Long newStartTs = Uuids.unixTimestamp(ifOffset);
ListenableFuture<List<Void>> updateFuture = updateQueueStartTs(newStartTs);
ListenableFuture<List<String>> updateFuture = updateQueueStartTs(newStartTs);
Futures.addCallback(updateFuture, new FutureCallback<>() {
@Override
public void onSuccess(@Nullable List<Void> list) {
public void onSuccess(@Nullable List<String> list) {
log.debug("[{}] queue offset was updated [{}][{}]", sessionId, ifOffset, newStartTs);
result.set(null);
}
@ -496,7 +496,7 @@ public final class EdgeGrpcSession implements Closeable {
}, ctx.getGrpcCallbackExecutorService());
}
private ListenableFuture<List<Void>> updateQueueStartTs(Long newStartTs) {
private ListenableFuture<List<String>> updateQueueStartTs(Long newStartTs) {
log.trace("[{}] updating QueueStartTs [{}][{}]", this.sessionId, edge.getId(), newStartTs);
List<AttributeKvEntry> attributes = Collections.singletonList(
new BaseAttributeKvEntry(

View File

@ -206,10 +206,10 @@ public class TelemetryEdgeProcessor extends BaseEdgeProcessor {
SettableFuture<Void> futureToSet = SettableFuture.create();
JsonObject json = JsonUtils.getJsonObject(msg.getKvList());
Set<AttributeKvEntry> attributes = JsonConverter.convertToAttributes(json);
ListenableFuture<List<Void>> future = attributesService.save(tenantId, entityId, metaData.getValue("scope"), new ArrayList<>(attributes));
Futures.addCallback(future, new FutureCallback<List<Void>>() {
ListenableFuture<List<String>> future = attributesService.save(tenantId, entityId, metaData.getValue("scope"), new ArrayList<>(attributes));
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(@Nullable List<Void> voids) {
public void onSuccess(@Nullable List<String> keys) {
Pair<String, RuleChainId> defaultQueueAndRuleChain = getDefaultQueueNameAndRuleChainId(tenantId, entityId);
String queueName = defaultQueueAndRuleChain.getKey();
RuleChainId ruleChainId = defaultQueueAndRuleChain.getValue();

View File

@ -532,7 +532,7 @@ public class DefaultSystemDataLoaderService implements SystemDataLoaderService {
Collections.singletonList(new BasicTsKvEntry(System.currentTimeMillis(), new BooleanDataEntry(key, value))), 0L);
addTsCallback(saveFuture, new TelemetrySaveCallback<>(deviceId, key, value));
} else {
ListenableFuture<List<Void>> saveFuture = attributesService.save(TenantId.SYS_TENANT_ID, deviceId, DataConstants.SERVER_SCOPE,
ListenableFuture<List<String>> saveFuture = attributesService.save(TenantId.SYS_TENANT_ID, deviceId, DataConstants.SERVER_SCOPE,
Collections.singletonList(new BaseAttributeKvEntry(new BooleanDataEntry(key, value)
, System.currentTimeMillis())));
addTsCallback(saveFuture, new TelemetrySaveCallback<>(deviceId, key, value));

View File

@ -243,7 +243,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
@Override
public void saveAndNotifyInternal(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, boolean notifyDevice, FutureCallback<Void> callback) {
ListenableFuture<List<Void>> saveFuture = attrService.save(tenantId, entityId, scope, attributes);
ListenableFuture<List<String>> saveFuture = attrService.save(tenantId, entityId, scope, attributes);
addVoidCallback(saveFuture, callback);
addWsCallback(saveFuture, success -> onAttributesUpdate(tenantId, entityId, scope, attributes, notifyDevice));
}
@ -269,7 +269,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
@Override
public void deleteAndNotifyInternal(TenantId tenantId, EntityId entityId, String scope, List<String> keys, FutureCallback<Void> callback) {
ListenableFuture<List<Void>> deleteFuture = attrService.removeAll(tenantId, entityId, scope, keys);
ListenableFuture<List<String>> deleteFuture = attrService.removeAll(tenantId, entityId, scope, keys);
addVoidCallback(deleteFuture, callback);
addWsCallback(deleteFuture, success -> onAttributesDelete(tenantId, entityId, scope, keys));
}

View File

@ -57,12 +57,18 @@ public abstract class AbstractRuleEngineControllerTest extends AbstractControlle
}
protected PageData<Event> getDebugEvents(TenantId tenantId, EntityId entityId, int limit) throws Exception {
return getEvents(tenantId, entityId, DataConstants.DEBUG_RULE_NODE, limit);
}
protected PageData<Event> getEvents(TenantId tenantId, EntityId entityId, String eventType, int limit) throws Exception {
TimePageLink pageLink = new TimePageLink(limit);
return doGetTypedWithTimePageLink("/api/events/{entityType}/{entityId}/{eventType}?tenantId={tenantId}&",
new TypeReference<PageData<Event>>() {
}, pageLink, entityId.getEntityType(), entityId.getId(), DataConstants.DEBUG_RULE_NODE, tenantId.getId());
}, pageLink, entityId.getEntityType(), entityId.getId(), eventType, tenantId.getId());
}
protected JsonNode getMetadata(Event outEvent) {
String metaDataStr = outEvent.getBody().get("metadata").asText();
try {

View File

@ -105,17 +105,20 @@ public abstract class AbstractRuleEngineLifecycleIntegrationTest extends Abstrac
Assert.assertNotNull(ruleChainFinal.getFirstRuleNodeId());
//TODO find out why RULE_NODE update event did not appear all the time
// List<Event> rcEvents = Awaitility.await("get debug by rule chain")
// .pollInterval(10, MILLISECONDS)
// .atMost(TIMEOUT, TimeUnit.SECONDS)
// .until(() -> {
// List<Event> debugEvents = getDebugEvents(tenantId, ruleChainFinal.getFirstRuleNodeId(), 1000)
// .getData().stream().filter(x->true).collect(Collectors.toList());
// log.warn("filtered debug events [{}]", debugEvents.size());
// debugEvents.forEach((e) -> log.warn("event: {}", e));
// return debugEvents;
// },
// x -> x.size() >= 2);
List<Event> rcEvents = Awaitility.await("Rule Node started successfully")
.pollInterval(10, MILLISECONDS)
.atMost(TIMEOUT, TimeUnit.SECONDS)
.until(() -> {
List<Event> debugEvents = getEvents(tenantId, ruleChainFinal.getFirstRuleNodeId(), DataConstants.LC_EVENT, 1000)
.getData().stream().filter(e-> {
var body = e.getBody();
return body.has("event") && body.get("event").asText().equals("STARTED")
&& body.has("success") && body.get("success").asBoolean();
}).collect(Collectors.toList());
debugEvents.forEach((e) -> log.trace("event: {}", e));
return debugEvents;
},
x -> x.size() == 1);
// Saving the device
Device device = new Device();
@ -133,6 +136,7 @@ public abstract class AbstractRuleEngineLifecycleIntegrationTest extends Abstrac
TbMsg tbMsg = TbMsg.newMsg("CUSTOM", device.getId(), new TbMsgMetaData(), "{}", tbMsgCallback);
QueueToRuleEngineMsg qMsg = new QueueToRuleEngineMsg(tenantId, tbMsg, null, null);
// Pushing Message to the system
log.warn("before tell tbMsgCallback");
actorSystem.tell(qMsg);
log.warn("awaiting tbMsgCallback");
Mockito.verify(tbMsgCallback, Mockito.timeout(TimeUnit.SECONDS.toMillis(TIMEOUT))).onSuccess();

View File

@ -9,7 +9,8 @@
<!-- <logger name="org.thingsboard.server.service.subscription" level="TRACE"/>-->
<logger name="org.thingsboard.server.controller.TbTestWebSocketClient" level="INFO"/>
<logger name="org.thingsboard.server" level="WARN"/>
<logger name="org.thingsboard.server.queue" level="INFO"/>
<logger name="org.thingsboard.server" level="TRACE"/>
<logger name="org.springframework" level="WARN"/>
<logger name="org.springframework.boot.test" level="WARN"/>
<logger name="org.apache.cassandra" level="WARN"/>

View File

@ -37,9 +37,9 @@ public interface AttributesService {
ListenableFuture<List<AttributeKvEntry>> findAll(TenantId tenantId, EntityId entityId, String scope);
ListenableFuture<List<Void>> save(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes);
ListenableFuture<List<String>> save(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes);
ListenableFuture<List<Void>> removeAll(TenantId tenantId, EntityId entityId, String scope, List<String> attributeKeys);
ListenableFuture<List<String>> removeAll(TenantId tenantId, EntityId entityId, String scope, List<String> attributeKeys);
List<String> findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId);

View File

@ -1,63 +1,15 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.attributes;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import static org.thingsboard.server.common.data.CacheConstants.ATTRIBUTES_CACHE;
public interface AttributesCacheWrapper {
@Service
@ConditionalOnProperty(prefix = "cache.attributes", value = "enabled", havingValue = "true")
@Primary
@Slf4j
public class AttributesCacheWrapper {
private final Cache attributesCache;
Cache.ValueWrapper get(AttributeCacheKey attributeCacheKey);
public AttributesCacheWrapper(CacheManager cacheManager) {
this.attributesCache = cacheManager.getCache(ATTRIBUTES_CACHE);
}
void put(AttributeCacheKey attributeCacheKey, AttributeKvEntry attributeKvEntry);
public Cache.ValueWrapper get(AttributeCacheKey attributeCacheKey) {
try {
return attributesCache.get(attributeCacheKey);
} catch (Exception e) {
log.debug("Failed to retrieve element from cache for key {}. Reason - {}.", attributeCacheKey, e.getMessage());
return null;
}
}
void putIfAbsent(AttributeCacheKey attributeCacheKey, AttributeKvEntry attributeKvEntry);
public void put(AttributeCacheKey attributeCacheKey, AttributeKvEntry attributeKvEntry) {
try {
attributesCache.put(attributeCacheKey, attributeKvEntry);
} catch (Exception e) {
log.debug("Failed to put element from cache for key {}. Reason - {}.", attributeCacheKey, e.getMessage());
}
}
public void evict(AttributeCacheKey attributeCacheKey) {
try {
attributesCache.evict(attributeCacheKey);
} catch (Exception e) {
log.debug("Failed to evict element from cache for key {}. Reason - {}.", attributeCacheKey, e.getMessage());
}
}
void evict(AttributeCacheKey attributeCacheKey);
}

View File

@ -37,9 +37,9 @@ public interface AttributesDao {
ListenableFuture<List<AttributeKvEntry>> findAll(TenantId tenantId, EntityId entityId, String attributeType);
ListenableFuture<Void> save(TenantId tenantId, EntityId entityId, String attributeType, AttributeKvEntry attribute);
ListenableFuture<String> save(TenantId tenantId, EntityId entityId, String attributeType, AttributeKvEntry attribute);
ListenableFuture<List<Void>> removeAll(TenantId tenantId, EntityId entityId, String attributeType, List<String> keys);
List<ListenableFuture<String>> removeAll(TenantId tenantId, EntityId entityId, String attributeType, List<String> keys);
List<String> findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId);

View File

@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -80,17 +80,16 @@ public class BaseAttributesService implements AttributesService {
}
@Override
public ListenableFuture<List<Void>> save(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
public ListenableFuture<List<String>> save(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
validate(entityId, scope);
attributes.forEach(attribute -> validate(attribute));
List<ListenableFuture<Void>> saveFutures = attributes.stream().map(attribute -> attributesDao.save(tenantId, entityId, scope, attribute)).collect(Collectors.toList());
attributes.forEach(AttributeUtils::validate);
List<ListenableFuture<String>> saveFutures = attributes.stream().map(attribute -> attributesDao.save(tenantId, entityId, scope, attribute)).collect(Collectors.toList());
return Futures.allAsList(saveFutures);
}
@Override
public ListenableFuture<List<Void>> removeAll(TenantId tenantId, EntityId entityId, String scope, List<String> attributeKeys) {
public ListenableFuture<List<String>> removeAll(TenantId tenantId, EntityId entityId, String scope, List<String> attributeKeys) {
validate(entityId, scope);
return attributesDao.removeAll(tenantId, entityId, scope, attributeKeys);
return Futures.allAsList(attributesDao.removeAll(tenantId, entityId, scope, attributeKeys));
}
}

View File

@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -30,7 +30,6 @@ import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.stats.DefaultCounter;
import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.dao.cache.CacheExecutorService;
@ -88,9 +87,9 @@ public class CachedAttributesService implements AttributesService {
/**
* Will return:
* - for the <b>local</b> cache type (cache.type="coffeine"): directExecutor (run callback immediately in the same thread)
* - for the <b>remote</b> cache: dedicated thread pool for the cache IO calls to unblock any caller thread
* */
* - for the <b>local</b> cache type (cache.type="coffeine"): directExecutor (run callback immediately in the same thread)
* - for the <b>remote</b> cache: dedicated thread pool for the cache IO calls to unblock any caller thread
*/
Executor getExecutor(String cacheType, CacheExecutorService cacheExecutorService) {
if (StringUtils.isEmpty(cacheType) || LOCAL_CACHE_TYPE.equals(cacheType)) {
log.info("Going to use directExecutor for the local cache type {}", cacheType);
@ -116,7 +115,7 @@ public class CachedAttributesService implements AttributesService {
ListenableFuture<Optional<AttributeKvEntry>> result = attributesDao.find(tenantId, entityId, scope, attributeKey);
return Futures.transform(result, foundAttrKvEntry -> {
// TODO: think if it's a good idea to store 'empty' attributes
cacheWrapper.put(attributeCacheKey, foundAttrKvEntry.orElse(null));
cacheWrapper.putIfAbsent(attributeCacheKey, foundAttrKvEntry.orElse(null));
return foundAttrKvEntry;
}, cacheExecutor);
}
@ -162,12 +161,12 @@ public class CachedAttributesService implements AttributesService {
private List<AttributeKvEntry> mergeDbAndCacheAttributes(EntityId entityId, String scope, List<AttributeKvEntry> cachedAttributes, Set<String> notFoundAttributeKeys, List<AttributeKvEntry> foundInDbAttributes) {
for (AttributeKvEntry foundInDbAttribute : foundInDbAttributes) {
AttributeCacheKey attributeCacheKey = new AttributeCacheKey(scope, entityId, foundInDbAttribute.getKey());
cacheWrapper.put(attributeCacheKey, foundInDbAttribute);
cacheWrapper.putIfAbsent(attributeCacheKey, foundInDbAttribute);
notFoundAttributeKeys.remove(foundInDbAttribute.getKey());
}
for (String key : notFoundAttributeKeys){
cacheWrapper.put(new AttributeCacheKey(scope, entityId, key), null);
}
// for (String key : notFoundAttributeKeys) {
// cacheWrapper.putIfAbsent(new AttributeCacheKey(scope, entityId, key), null);
// }
List<AttributeKvEntry> mergedAttributes = new ArrayList<>(cachedAttributes);
mergedAttributes.addAll(foundInDbAttributes);
return mergedAttributes;
@ -190,34 +189,30 @@ public class CachedAttributesService implements AttributesService {
}
@Override
public ListenableFuture<List<Void>> save(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
public ListenableFuture<List<String>> save(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
validate(entityId, scope);
attributes.forEach(AttributeUtils::validate);
List<ListenableFuture<Void>> saveFutures = attributes.stream().map(attribute -> attributesDao.save(tenantId, entityId, scope, attribute)).collect(Collectors.toList());
ListenableFuture<List<Void>> future = Futures.allAsList(saveFutures);
List<ListenableFuture<String>> futures = new ArrayList<>(attributes.size());
for (var attribute : attributes) {
ListenableFuture<String> future = attributesDao.save(tenantId, entityId, scope, attribute);
futures.add(Futures.transform(future, key -> {
cacheWrapper.evict(new AttributeCacheKey(scope, entityId, key));
return key;
}, cacheExecutor));
}
// TODO: can do if (attributesCache.get() != null) attributesCache.put() instead, but will be more twice more requests to cache
List<String> attributeKeys = attributes.stream().map(KvEntry::getKey).collect(Collectors.toList());
future.addListener(() -> evictAttributesFromCache(tenantId, entityId, scope, attributeKeys), cacheExecutor);
return future;
return Futures.allAsList(futures);
}
@Override
public ListenableFuture<List<Void>> removeAll(TenantId tenantId, EntityId entityId, String scope, List<String> attributeKeys) {
public ListenableFuture<List<String>> removeAll(TenantId tenantId, EntityId entityId, String scope, List<String> attributeKeys) {
validate(entityId, scope);
ListenableFuture<List<Void>> future = attributesDao.removeAll(tenantId, entityId, scope, attributeKeys);
future.addListener(() -> evictAttributesFromCache(tenantId, entityId, scope, attributeKeys), cacheExecutor);
return future;
List<ListenableFuture<String>> futures = attributesDao.removeAll(tenantId, entityId, scope, attributeKeys);
return Futures.allAsList(futures.stream().map(future -> Futures.transform(future, key -> {
cacheWrapper.evict(new AttributeCacheKey(scope, entityId, key));
return key;
}, cacheExecutor)).collect(Collectors.toList()));
}
private void evictAttributesFromCache(TenantId tenantId, EntityId entityId, String scope, List<String> attributeKeys) {
try {
for (String attributeKey : attributeKeys) {
cacheWrapper.evict(new AttributeCacheKey(scope, entityId, attributeKey));
}
} catch (Exception e) {
log.error("[{}][{}] Failed to remove values from cache.", tenantId, entityId, e);
}
}
}

View File

@ -0,0 +1,63 @@
/**
* Copyright © 2016-2022 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.attributes;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import static org.thingsboard.server.common.data.CacheConstants.ATTRIBUTES_CACHE;
@Service
@ConditionalOnProperty(prefix = "cache.attributes", value = "enabled", havingValue = "true")
@Primary
@Slf4j
public class DefaultAttributesCacheWrapper implements AttributesCacheWrapper {
private final Cache attributesCache;
public DefaultAttributesCacheWrapper(CacheManager cacheManager) {
this.attributesCache = cacheManager.getCache(ATTRIBUTES_CACHE);
}
@Override
public Cache.ValueWrapper get(AttributeCacheKey attributeCacheKey) {
var result = attributesCache.get(attributeCacheKey);
log.warn("[{}] Get = {}", attributeCacheKey, result);
return result;
}
@Override
public void put(AttributeCacheKey attributeCacheKey, AttributeKvEntry attributeKvEntry) {
log.warn("[{}] Put = {}", attributeCacheKey, attributeKvEntry);
attributesCache.put(attributeCacheKey, attributeKvEntry);
}
@Override
public void putIfAbsent(AttributeCacheKey attributeCacheKey, AttributeKvEntry attributeKvEntry) {
var result = attributesCache.putIfAbsent(attributeCacheKey, attributeKvEntry);
log.warn("[{}] Put if absent = {}, result = {}", attributeCacheKey, attributeKvEntry, result);
}
@Override
public void evict(AttributeCacheKey attributeCacheKey) {
log.warn("[{}] Evict", attributeCacheKey);
attributesCache.evict(attributeCacheKey);
}
}

View File

@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -29,6 +29,7 @@ import org.springframework.cache.annotation.Cacheable;
import org.springframework.cache.annotation.Caching;
import org.springframework.dao.ConcurrencyFailureException;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.StringUtils;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.EntityId;
@ -48,6 +49,8 @@ import org.thingsboard.server.dao.exception.DataValidationException;
import org.thingsboard.server.dao.service.ConstraintValidator;
import javax.annotation.Nullable;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@ -86,11 +89,7 @@ public class BaseRelationService implements RelationService {
@Cacheable(cacheNames = RELATIONS_CACHE, key = "{#from, #to, #relationType, #typeGroup}")
@Override
public EntityRelation getRelation(TenantId tenantId, EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup) {
try {
return getRelationAsync(tenantId, from, to, relationType, typeGroup).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
return getRelation(tenantId, from, to, relationType, typeGroup);
}
@Override
@ -108,6 +107,7 @@ public class BaseRelationService implements RelationService {
@CacheEvict(cacheNames = RELATIONS_CACHE, key = "{#relation.to, #relation.type, #relation.typeGroup, 'TO'}")
})
@Override
@Transactional
public boolean saveRelation(TenantId tenantId, EntityRelation relation) {
log.trace("Executing saveRelation [{}]", relation);
validate(relation);
@ -181,6 +181,7 @@ public class BaseRelationService implements RelationService {
public ListenableFuture<Boolean> deleteRelationAsync(TenantId tenantId, EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup) {
log.trace("Executing deleteRelationAsync [{}][{}][{}][{}]", from, to, relationType, typeGroup);
validate(from, to, relationType, typeGroup);
//TODO: clear cache using transform
return relationDao.deleteRelationAsync(tenantId, from, to, relationType, typeGroup);
}
@ -319,11 +320,8 @@ public class BaseRelationService implements RelationService {
public List<EntityRelation> findByFrom(TenantId tenantId, EntityId from, RelationTypeGroup typeGroup) {
validate(from);
validateTypeGroup(typeGroup);
try {
return relationDao.findAllByFromAsync(tenantId, from, typeGroup).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
log.trace("[{}] Find by from: [{}][{}]: ", tenantId, from, typeGroup, new RuntimeException());
return relationDao.findAllByFrom(tenantId, from, typeGroup);
}
@Override
@ -345,7 +343,7 @@ public class BaseRelationService implements RelationService {
} else {
ListenableFuture<List<EntityRelation>> relationsFuture = relationDao.findAllByFromAsync(tenantId, from, typeGroup);
Futures.addCallback(relationsFuture,
new FutureCallback<List<EntityRelation>>() {
new FutureCallback<>() {
@Override
public void onSuccess(@Nullable List<EntityRelation> result) {
cache.putIfAbsent(fromAndTypeGroup, result);
@ -401,11 +399,7 @@ public class BaseRelationService implements RelationService {
public List<EntityRelation> findByTo(TenantId tenantId, EntityId to, RelationTypeGroup typeGroup) {
validate(to);
validateTypeGroup(typeGroup);
try {
return relationDao.findAllByToAsync(tenantId, to, typeGroup).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
return relationDao.findAllByTo(tenantId, to, typeGroup);
}
@Override

View File

@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -66,7 +66,10 @@ public class TbSqlBlockingQueue<E> implements TbSqlQueue<E> {
}
queue.drainTo(entities, batchSize - 1);
boolean fullPack = entities.size() == batchSize;
log.debug("[{}] Going to save {} entities", logName, entities.size());
if (log.isDebugEnabled()) {
log.debug("[{}] Going to save {} entities", logName, entities.size());
log.trace("[{}] Going to save entities: {}", logName, entities);
}
Stream<E> entitiesStream = entities.stream().map(TbSqlQueueElement::getEntity);
saveFunction.accept(
(params.isBatchSortEnabled() ? entitiesStream.sorted(batchUpdateComparator) : entitiesStream)

View File

@ -17,7 +17,9 @@ package org.thingsboard.server.dao.sql;
import com.google.common.util.concurrent.SettableFuture;
import lombok.Getter;
import lombok.ToString;
@ToString(exclude = "future")
public final class TbSqlQueueElement<E> {
@Getter
private final SettableFuture<Void> future;

View File

@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -18,6 +18,7 @@ package org.thingsboard.server.dao.sql.attributes;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
@ -39,6 +40,7 @@ import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
@ -153,7 +155,7 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl
}
@Override
public ListenableFuture<Void> save(TenantId tenantId, EntityId entityId, String attributeType, AttributeKvEntry attribute) {
public ListenableFuture<String> save(TenantId tenantId, EntityId entityId, String attributeType, AttributeKvEntry attribute) {
AttributeKvEntity entity = new AttributeKvEntity();
entity.setId(new AttributeKvCompositeKey(entityId.getEntityType(), entityId.getId(), attributeType, attribute.getKey()));
entity.setLastUpdateTs(attribute.getLastUpdateTs());
@ -165,18 +167,20 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl
return addToQueue(entity);
}
private ListenableFuture<Void> addToQueue(AttributeKvEntity entity) {
return queue.add(entity);
private ListenableFuture<String> addToQueue(AttributeKvEntity entity) {
return Futures.transform(queue.add(entity), v -> entity.getId().getAttributeKey(), MoreExecutors.directExecutor());
}
@Override
public ListenableFuture<List<Void>> removeAll(TenantId tenantId, EntityId entityId, String attributeType, List<String> keys) {
return service.submit(() -> {
keys.forEach(key ->
attributeKvRepository.delete(entityId.getEntityType(), entityId.getId(), attributeType, key)
);
return null;
});
public List<ListenableFuture<String>> removeAll(TenantId tenantId, EntityId entityId, String attributeType, List<String> keys) {
List<ListenableFuture<String>> futuresList = new ArrayList<>(keys.size());
for (String key : keys) {
futuresList.add(service.submit(() -> {
attributeKvRepository.delete(entityId.getEntityType(), entityId.getId(), attributeType, key);
return key;
}));
}
return futuresList;
}
private AttributeKvCompositeKey getAttributeKvCompositeKey(EntityId entityId, String attributeType, String attributeKey) {

View File

@ -339,12 +339,12 @@ public abstract class BaseEntityServiceTest extends AbstractServiceTest {
List<Long> highTemperatures = new ArrayList<>();
createTestHierarchy(tenantId, assets, devices, new ArrayList<>(), new ArrayList<>(), temperatures, highTemperatures);
List<ListenableFuture<List<Void>>> attributeFutures = new ArrayList<>();
List<ListenableFuture<List<String>>> attributeFutures = new ArrayList<>();
for (int i = 0; i < devices.size(); i++) {
Device device = devices.get(i);
attributeFutures.add(saveLongAttribute(device.getId(), "temperature", temperatures.get(i), DataConstants.CLIENT_SCOPE));
}
Futures.successfulAsList(attributeFutures).get();
Futures.allAsList(attributeFutures).get();
RelationsQueryFilter filter = new RelationsQueryFilter();
filter.setRootEntity(tenantId);
@ -518,12 +518,12 @@ public abstract class BaseEntityServiceTest extends AbstractServiceTest {
List<Long> highTemperatures = new ArrayList<>();
createTestHierarchy(tenantId, assets, devices, new ArrayList<>(), new ArrayList<>(), temperatures, highTemperatures);
List<ListenableFuture<List<Void>>> attributeFutures = new ArrayList<>();
List<ListenableFuture<List<String>>> attributeFutures = new ArrayList<>();
for (int i = 0; i < devices.size(); i++) {
Device device = devices.get(i);
attributeFutures.add(saveLongAttribute(device.getId(), "temperature", temperatures.get(i), DataConstants.CLIENT_SCOPE));
}
Futures.successfulAsList(attributeFutures).get();
Futures.allAsList(attributeFutures).get();
DeviceSearchQueryFilter filter = new DeviceSearchQueryFilter();
filter.setRootEntity(tenantId);
@ -593,12 +593,12 @@ public abstract class BaseEntityServiceTest extends AbstractServiceTest {
List<Long> highConsumptions = new ArrayList<>();
createTestHierarchy(tenantId, assets, devices, consumptions, highConsumptions, new ArrayList<>(), new ArrayList<>());
List<ListenableFuture<List<Void>>> attributeFutures = new ArrayList<>();
List<ListenableFuture<List<String>>> attributeFutures = new ArrayList<>();
for (int i = 0; i < assets.size(); i++) {
Asset asset = assets.get(i);
attributeFutures.add(saveLongAttribute(asset.getId(), "consumption", consumptions.get(i), DataConstants.SERVER_SCOPE));
}
Futures.successfulAsList(attributeFutures).get();
Futures.allAsList(attributeFutures).get();
AssetSearchQueryFilter filter = new AssetSearchQueryFilter();
filter.setRootEntity(tenantId);
@ -1048,14 +1048,14 @@ public abstract class BaseEntityServiceTest extends AbstractServiceTest {
}
}
List<ListenableFuture<List<Void>>> attributeFutures = new ArrayList<>();
List<ListenableFuture<List<String>>> attributeFutures = new ArrayList<>();
for (int i = 0; i < devices.size(); i++) {
Device device = devices.get(i);
for (String currentScope : DataConstants.allScopes()) {
attributeFutures.add(saveLongAttribute(device.getId(), "temperature", temperatures.get(i), currentScope));
}
}
Futures.successfulAsList(attributeFutures).get();
Futures.allAsList(attributeFutures).get();
DeviceTypeFilter filter = new DeviceTypeFilter();
filter.setDeviceType("default");
@ -1150,12 +1150,12 @@ public abstract class BaseEntityServiceTest extends AbstractServiceTest {
}
}
List<ListenableFuture<List<Void>>> attributeFutures = new ArrayList<>();
List<ListenableFuture<List<String>>> attributeFutures = new ArrayList<>();
for (int i = 0; i < devices.size(); i++) {
Device device = devices.get(i);
attributeFutures.add(saveLongAttribute(device.getId(), "temperature", temperatures.get(i), DataConstants.CLIENT_SCOPE));
}
Futures.successfulAsList(attributeFutures).get();
Futures.allAsList(attributeFutures).get();
DeviceTypeFilter filter = new DeviceTypeFilter();
filter.setDeviceType("default");
@ -1301,7 +1301,7 @@ public abstract class BaseEntityServiceTest extends AbstractServiceTest {
Device device = devices.get(i);
timeseriesFutures.add(saveLongTimeseries(device.getId(), "temperature", temperatures.get(i)));
}
Futures.successfulAsList(timeseriesFutures).get();
Futures.allAsList(timeseriesFutures).get();
DeviceTypeFilter filter = new DeviceTypeFilter();
filter.setDeviceType("default");
@ -1428,12 +1428,12 @@ public abstract class BaseEntityServiceTest extends AbstractServiceTest {
}
}
List<ListenableFuture<List<Void>>> attributeFutures = new ArrayList<>();
List<ListenableFuture<List<String>>> attributeFutures = new ArrayList<>();
for (int i = 0; i < devices.size(); i++) {
Device device = devices.get(i);
attributeFutures.add(saveStringAttribute(device.getId(), "attributeString", attributeStrings.get(i), DataConstants.CLIENT_SCOPE));
}
Futures.successfulAsList(attributeFutures).get();
Futures.allAsList(attributeFutures).get();
DeviceTypeFilter filter = new DeviceTypeFilter();
filter.setDeviceType("default");
@ -1764,13 +1764,13 @@ public abstract class BaseEntityServiceTest extends AbstractServiceTest {
return filter;
}
private ListenableFuture<List<Void>> saveLongAttribute(EntityId entityId, String key, long value, String scope) {
private ListenableFuture<List<String>> saveLongAttribute(EntityId entityId, String key, long value, String scope) {
KvEntry attrValue = new LongDataEntry(key, value);
AttributeKvEntry attr = new BaseAttributeKvEntry(attrValue, 42L);
return attributesService.save(SYSTEM_TENANT_ID, entityId, scope, Collections.singletonList(attr));
}
private ListenableFuture<List<Void>> saveStringAttribute(EntityId entityId, String key, String value, String scope) {
private ListenableFuture<List<String>> saveStringAttribute(EntityId entityId, String key, String value, String scope) {
KvEntry attrValue = new StringDataEntry(key, value);
AttributeKvEntry attr = new BaseAttributeKvEntry(attrValue, 42L);
return attributesService.save(SYSTEM_TENANT_ID, entityId, scope, Collections.singletonList(attr));

View File

@ -15,7 +15,6 @@
*/
package org.thingsboard.rule.engine.metadata;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.json.JsonWriteFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
@ -24,6 +23,7 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.gson.JsonParseException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.BooleanUtils;
import org.thingsboard.rule.engine.api.TbContext;
@ -50,6 +50,7 @@ import static org.thingsboard.server.common.data.DataConstants.LATEST_TS;
import static org.thingsboard.server.common.data.DataConstants.SERVER_SCOPE;
import static org.thingsboard.server.common.data.DataConstants.SHARED_SCOPE;
@Slf4j
public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeConfiguration, T extends EntityId> implements TbNode {
private static ObjectMapper mapper = new ObjectMapper();
@ -112,6 +113,7 @@ public abstract class TbAbstractGetAttributesNode<C extends TbGetAttributesNodeC
}
ListenableFuture<List<AttributeKvEntry>> attributeKvEntryListFuture = ctx.getAttributesService().find(ctx.getTenantId(), entityId, scope, keys);
return Futures.transform(attributeKvEntryListFuture, attributeKvEntryList -> {
log.warn("[{}][{}][{}][{}] Lookup attribute result: {}", ctx.getTenantId(), entityId, scope, keys, attributeKvEntryList);
if (!CollectionUtils.isEmpty(attributeKvEntryList)) {
List<AttributeKvEntry> existingAttributesKvEntry = attributeKvEntryList.stream().filter(attributeKvEntry -> keys.contains(attributeKvEntry.getKey())).collect(Collectors.toList());
existingAttributesKvEntry.forEach(kvEntry -> msg.getMetaData().putValue(prefix + kvEntry.getKey(), kvEntry.getValueAsString()));