Fix race conditions in WS test client

This commit is contained in:
Andrii Shvaika 2022-05-17 12:49:54 +03:00
parent 02df48cfbe
commit 4f6ef91d33
3 changed files with 18 additions and 9 deletions

View File

@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
@ -60,12 +60,12 @@ public class TbTestWebSocketClient extends WebSocketClient {
public void onMessage(String s) { public void onMessage(String s) {
log.info("RECEIVED: {}", s); log.info("RECEIVED: {}", s);
lastMsg = s; lastMsg = s;
if (reply != null) {
reply.countDown();
}
if (update != null) { if (update != null) {
update.countDown(); update.countDown();
} }
if (reply != null) {
reply.countDown();
}
} }
@Override @Override

View File

@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
* You may obtain a copy of the License at * You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
@ -35,6 +35,7 @@ import org.thingsboard.server.common.data.query.DeviceTypeFilter;
import org.thingsboard.server.common.data.query.EntityData; import org.thingsboard.server.common.data.query.EntityData;
import org.thingsboard.server.common.data.query.EntityKey; import org.thingsboard.server.common.data.query.EntityKey;
import org.thingsboard.server.common.data.query.EntityKeyType; import org.thingsboard.server.common.data.query.EntityKeyType;
import org.thingsboard.server.common.data.query.SingleEntityFilter;
import org.thingsboard.server.gen.transport.TransportApiProtos; import org.thingsboard.server.gen.transport.TransportApiProtos;
import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataUpdate; import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataUpdate;
@ -319,7 +320,8 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt
protected void processJsonTestRequestAttributesValuesFromTheServer(String attrPubTopic, String attrSubTopic, String attrReqTopicPrefix) throws Exception { protected void processJsonTestRequestAttributesValuesFromTheServer(String attrPubTopic, String attrSubTopic, String attrReqTopicPrefix) throws Exception {
MqttTestClient client = new MqttTestClient(); MqttTestClient client = new MqttTestClient();
client.connectAndWait(accessToken); client.connectAndWait(accessToken);
DeviceTypeFilter dtf = new DeviceTypeFilter(savedDevice.getType(), savedDevice.getName()); SingleEntityFilter dtf = new SingleEntityFilter();
dtf.setSingleEntity(savedDevice.getId());
String clientKeysStr = "clientStr,clientBool,clientDbl,clientLong,clientJson"; String clientKeysStr = "clientStr,clientBool,clientDbl,clientLong,clientJson";
String sharedKeysStr = "sharedStr,sharedBool,sharedDbl,sharedLong,sharedJson"; String sharedKeysStr = "sharedStr,sharedBool,sharedDbl,sharedLong,sharedJson";
List<String> clientKeysList = List.of(clientKeysStr.split(",")); List<String> clientKeysList = List.of(clientKeysStr.split(","));
@ -389,7 +391,8 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt
100); 100);
assertNotNull(device); assertNotNull(device);
DeviceTypeFilter dtf = new DeviceTypeFilter(device.getType(), device.getName()); SingleEntityFilter dtf = new SingleEntityFilter();
dtf.setSingleEntity(device.getId());
String clientKeysStr = "clientStr,clientBool,clientDbl,clientLong,clientJson"; String clientKeysStr = "clientStr,clientBool,clientDbl,clientLong,clientJson";
String sharedKeysStr = "sharedStr,sharedBool,sharedDbl,sharedLong,sharedJson"; String sharedKeysStr = "sharedStr,sharedBool,sharedDbl,sharedLong,sharedJson";
List<String> clientKeysList = List.of(clientKeysStr.split(",")); List<String> clientKeysList = List.of(clientKeysStr.split(","));
@ -443,7 +446,8 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt
100); 100);
assertNotNull(device); assertNotNull(device);
DeviceTypeFilter dtf = new DeviceTypeFilter(device.getType(), device.getName()); SingleEntityFilter dtf = new SingleEntityFilter();
dtf.setSingleEntity(device.getId());
String sharedKeysStr = "sharedStr,sharedBool,sharedDbl,sharedLong,sharedJson"; String sharedKeysStr = "sharedStr,sharedBool,sharedDbl,sharedLong,sharedJson";
List<String> sharedKeysList = List.of(sharedKeysStr.split(",")); List<String> sharedKeysList = List.of(sharedKeysStr.split(","));
List<EntityKey> csKeys = getEntityKeys(clientKeysList, CLIENT_ATTRIBUTE); List<EntityKey> csKeys = getEntityKeys(clientKeysList, CLIENT_ATTRIBUTE);
@ -569,7 +573,7 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt
assertEquals(JacksonUtil.toJsonNode(expectedRequestPayload), JacksonUtil.fromBytes(callback.getPayloadBytes())); assertEquals(JacksonUtil.toJsonNode(expectedRequestPayload), JacksonUtil.fromBytes(callback.getPayloadBytes()));
} }
protected void validateProtoClientResponseGateway(MqttTestCallback callback, String deviceName) throws InterruptedException, InvalidProtocolBufferException { protected void validateProtoClientResponseGateway(MqttTestCallback callback, String deviceName) throws InterruptedException, InvalidProtocolBufferException {
callback.getSubscribeLatch().await(3, TimeUnit.SECONDS); callback.getSubscribeLatch().await(3, TimeUnit.SECONDS);
assertEquals(MqttQoS.AT_LEAST_ONCE.value(), callback.getQoS()); assertEquals(MqttQoS.AT_LEAST_ONCE.value(), callback.getQoS());
TransportApiProtos.GatewayAttributeResponseMsg expectedGatewayAttributeResponseMsg = getExpectedGatewayAttributeResponseMsg(deviceName, true); TransportApiProtos.GatewayAttributeResponseMsg expectedGatewayAttributeResponseMsg = getExpectedGatewayAttributeResponseMsg(deviceName, true);

View File

@ -141,6 +141,7 @@ public class CachedAttributesService implements AttributesService {
.filter(Objects::nonNull) .filter(Objects::nonNull)
.collect(Collectors.toList()); .collect(Collectors.toList());
if (wrappedCachedAttributes.size() == attributeKeys.size()) { if (wrappedCachedAttributes.size() == attributeKeys.size()) {
log.trace("[{}][{}] Found all attributes from cache: {}", entityId, scope, attributeKeys);
return Futures.immediateFuture(cachedAttributes); return Futures.immediateFuture(cachedAttributes);
} }
@ -152,6 +153,7 @@ public class CachedAttributesService implements AttributesService {
return cacheExecutor.submit(() -> { return cacheExecutor.submit(() -> {
var cacheTransaction = cache.newTransactionForKeys(notFoundKeys); var cacheTransaction = cache.newTransactionForKeys(notFoundKeys);
try { try {
log.trace("[{}][{}] Lookup attributes from db: {}", entityId, scope, notFoundAttributeKeys);
List<AttributeKvEntry> result = attributesDao.find(tenantId, entityId, scope, notFoundAttributeKeys); List<AttributeKvEntry> result = attributesDao.find(tenantId, entityId, scope, notFoundAttributeKeys);
for (AttributeKvEntry foundInDbAttribute : result) { for (AttributeKvEntry foundInDbAttribute : result) {
AttributeCacheKey attributeCacheKey = new AttributeCacheKey(scope, entityId, foundInDbAttribute.getKey()); AttributeCacheKey attributeCacheKey = new AttributeCacheKey(scope, entityId, foundInDbAttribute.getKey());
@ -164,6 +166,7 @@ public class CachedAttributesService implements AttributesService {
List<AttributeKvEntry> mergedAttributes = new ArrayList<>(cachedAttributes); List<AttributeKvEntry> mergedAttributes = new ArrayList<>(cachedAttributes);
mergedAttributes.addAll(result); mergedAttributes.addAll(result);
cacheTransaction.commit(); cacheTransaction.commit();
log.trace("[{}][{}] Commit cache transaction: {}", entityId, scope, notFoundAttributeKeys);
return mergedAttributes; return mergedAttributes;
} catch (Throwable e) { } catch (Throwable e) {
cacheTransaction.rollback(); cacheTransaction.rollback();
@ -211,7 +214,9 @@ public class CachedAttributesService implements AttributesService {
for (var attribute : attributes) { for (var attribute : attributes) {
ListenableFuture<String> future = attributesDao.save(tenantId, entityId, scope, attribute); ListenableFuture<String> future = attributesDao.save(tenantId, entityId, scope, attribute);
futures.add(Futures.transform(future, key -> { futures.add(Futures.transform(future, key -> {
log.trace("[{}][{}][{}] Before cache evict: {}", entityId, scope, key, attribute);
cache.evictOrPut(new AttributeCacheKey(scope, entityId, key), attribute); cache.evictOrPut(new AttributeCacheKey(scope, entityId, key), attribute);
log.trace("[{}][{}][{}] after cache evict.", entityId, scope, key);
return key; return key;
}, cacheExecutor)); }, cacheExecutor));
} }