Merge remote-tracking branch 'upstream/master' into mqttConnectCommandFix

This commit is contained in:
dashevchenko 2023-11-07 12:29:19 +02:00
commit 921a441b9a
7 changed files with 170 additions and 101 deletions

View File

@ -192,7 +192,7 @@ public class DefaultTransportApiService implements TransportApiService {
final String certChain = msg.getCertificateChain(); final String certChain = msg.getCertificateChain();
result = handlerExecutor.submit(() -> validateOrCreateDeviceX509Certificate(certChain)); result = handlerExecutor.submit(() -> validateOrCreateDeviceX509Certificate(certChain));
} else if (transportApiRequestMsg.hasGetOrCreateDeviceRequestMsg()) { } else if (transportApiRequestMsg.hasGetOrCreateDeviceRequestMsg()) {
result = handle(transportApiRequestMsg.getGetOrCreateDeviceRequestMsg()); result = handlerExecutor.submit(() -> handle(transportApiRequestMsg.getGetOrCreateDeviceRequestMsg()));
} else if (transportApiRequestMsg.hasEntityProfileRequestMsg()) { } else if (transportApiRequestMsg.hasEntityProfileRequestMsg()) {
result = handle(transportApiRequestMsg.getEntityProfileRequestMsg()); result = handle(transportApiRequestMsg.getEntityProfileRequestMsg());
} else if (transportApiRequestMsg.hasLwM2MRequestMsg()) { } else if (transportApiRequestMsg.hasLwM2MRequestMsg()) {
@ -223,7 +223,6 @@ public class DefaultTransportApiService implements TransportApiService {
} }
private TransportApiResponseMsg validateCredentials(String credentialsId, DeviceCredentialsType credentialsType) { private TransportApiResponseMsg validateCredentials(String credentialsId, DeviceCredentialsType credentialsType) {
//TODO: Make async and enable caching
DeviceCredentials credentials = deviceCredentialsService.findDeviceCredentialsByCredentialsId(credentialsId); DeviceCredentials credentials = deviceCredentialsService.findDeviceCredentialsByCredentialsId(credentialsId);
if (credentials != null && credentials.getCredentialsType() == credentialsType) { if (credentials != null && credentials.getCredentialsType() == credentialsType) {
return getDeviceInfo(credentials); return getDeviceInfo(credentials);
@ -336,10 +335,9 @@ public class DefaultTransportApiService implements TransportApiService {
return VALID; return VALID;
} }
private ListenableFuture<TransportApiResponseMsg> handle(GetOrCreateDeviceFromGatewayRequestMsg requestMsg) { private TransportApiResponseMsg handle(GetOrCreateDeviceFromGatewayRequestMsg requestMsg) {
DeviceId gatewayId = new DeviceId(new UUID(requestMsg.getGatewayIdMSB(), requestMsg.getGatewayIdLSB())); DeviceId gatewayId = new DeviceId(new UUID(requestMsg.getGatewayIdMSB(), requestMsg.getGatewayIdLSB()));
ListenableFuture<Device> gatewayFuture = deviceService.findDeviceByIdAsync(TenantId.SYS_TENANT_ID, gatewayId); Device gateway = deviceService.findDeviceById(TenantId.SYS_TENANT_ID, gatewayId);
return Futures.transform(gatewayFuture, gateway -> {
Lock deviceCreationLock = deviceCreationLocks.computeIfAbsent(requestMsg.getDeviceName(), id -> new ReentrantLock()); Lock deviceCreationLock = deviceCreationLocks.computeIfAbsent(requestMsg.getDeviceName(), id -> new ReentrantLock());
deviceCreationLock.lock(); deviceCreationLock.lock();
try { try {
@ -405,7 +403,6 @@ public class DefaultTransportApiService implements TransportApiService {
} finally { } finally {
deviceCreationLock.unlock(); deviceCreationLock.unlock();
} }
}, dbCallbackExecutorService);
} }
private ListenableFuture<TransportApiResponseMsg> handle(ProvisionDeviceRequestMsg requestMsg) { private ListenableFuture<TransportApiResponseMsg> handle(ProvisionDeviceRequestMsg requestMsg) {

View File

@ -121,18 +121,47 @@ public class DeviceConnectivityControllerTest extends AbstractControllerTest {
loginSysAdmin(); loginSysAdmin();
AdminSettings adminSettings = doGet("/api/admin/settings/connectivity", AdminSettings.class); ObjectNode config = JacksonUtil.newObjectNode();
JsonNode connectivity = adminSettings.getJsonValue();
((ObjectNode)connectivity.get("http")).put("port", 8080); ObjectNode http = JacksonUtil.newObjectNode();
((ObjectNode)connectivity.get("http")).put("enabled", true); http.put("enabled", true);
((ObjectNode)connectivity.get("https")).put("enabled", true); http.put("host", "");
((ObjectNode)connectivity.get("https")).put("port", 444); http.put("port", 8080);
((ObjectNode)connectivity.get("mqtt")).put("enabled", true); config.set("http", http);
((ObjectNode)connectivity.get("mqtts")).put("enabled", true);
((ObjectNode)connectivity.get("coap")).put("enabled", true); ObjectNode https = JacksonUtil.newObjectNode();
((ObjectNode)connectivity.get("coaps")).put("enabled", true); https.put("enabled", true);
doPost("/api/admin/settings", adminSettings); https.put("host", "");
https.put("port", 444);
config.set("https", https);
ObjectNode mqtt = JacksonUtil.newObjectNode();
mqtt.put("enabled", true);
mqtt.put("host", "");
mqtt.put("port", 1883);
config.set("mqtt", mqtt);
ObjectNode mqtts = JacksonUtil.newObjectNode();
mqtts.put("enabled", true);
mqtts.put("host", "");
mqtts.put("port", 8883);
config.set("mqtts", mqtts);
ObjectNode coap = JacksonUtil.newObjectNode();
coap.put("enabled", true);
coap.put("host", "");
coap.put("port", 5683);
config.set("coap", coap);
ObjectNode coaps = JacksonUtil.newObjectNode();
coaps.put("enabled", true);
coaps.put("host", "");
coaps.put("port", 5684);
config.set("coaps", coaps);
AdminSettings adminSettings = doGet("/api/admin/settings/connectivity", AdminSettings.class);
adminSettings.setJsonValue(config);
doPost("/api/admin/settings", adminSettings).andExpect(status().isOk());
Tenant tenant = new Tenant(); Tenant tenant = new Tenant();
tenant.setTitle("My tenant"); tenant.setTitle("My tenant");

View File

@ -29,6 +29,7 @@ import org.mockito.Mockito;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary; import org.springframework.context.annotation.Primary;
import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.ContextConfiguration;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.common.data.AdminSettings; import org.thingsboard.server.common.data.AdminSettings;
import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.Device;
@ -66,16 +67,47 @@ public class DeviceConnectivityControllerWithDefaultPortTest extends AbstractCon
loginSysAdmin(); loginSysAdmin();
AdminSettings adminSettings = doGet("/api/admin/settings/connectivity", AdminSettings.class); ObjectNode config = JacksonUtil.newObjectNode();
JsonNode connectivity = adminSettings.getJsonValue();
((ObjectNode) connectivity.get("http")).put("port", 80); ObjectNode http = JacksonUtil.newObjectNode();
((ObjectNode) connectivity.get("https")).put("enabled", true); http.put("enabled", true);
((ObjectNode) connectivity.get("mqtt")).put("enabled", false); http.put("host", "");
((ObjectNode) connectivity.get("mqtts")).put("enabled", false); http.put("port", 80);
((ObjectNode) connectivity.get("coaps")).put("enabled", false); config.set("http", http);
((ObjectNode) connectivity.get("coap")).put("enabled", false);
doPost("/api/admin/settings", adminSettings); ObjectNode https = JacksonUtil.newObjectNode();
https.put("enabled", true);
https.put("host", "");
https.put("port", 443);
config.set("https", https);
ObjectNode mqtt = JacksonUtil.newObjectNode();
mqtt.put("enabled", false);
mqtt.put("host", "");
mqtt.put("port", 1883);
config.set("mqtt", mqtt);
ObjectNode mqtts = JacksonUtil.newObjectNode();
mqtts.put("enabled", false);
mqtts.put("host", "");
mqtts.put("port", 8883);
config.set("mqtts", mqtts);
ObjectNode coap = JacksonUtil.newObjectNode();
coap.put("enabled", false);
coap.put("host", "");
coap.put("port", 5683);
config.set("coap", coap);
ObjectNode coaps = JacksonUtil.newObjectNode();
coaps.put("enabled", false);
coaps.put("host", "");
coaps.put("port", 5684);
config.set("coaps", coaps);
AdminSettings adminSettings = doGet("/api/admin/settings/connectivity", AdminSettings.class);
adminSettings.setJsonValue(config);
doPost("/api/admin/settings", adminSettings).andExpect(status().isOk());
Tenant tenant = new Tenant(); Tenant tenant = new Tenant();
tenant.setTitle("My tenant"); tenant.setTitle("My tenant");

View File

@ -268,6 +268,13 @@ public class WidgetsBundleControllerTest extends AbstractControllerTest {
Collections.sort(loadedWidgetsBundles2, idComparator); Collections.sort(loadedWidgetsBundles2, idComparator);
Assert.assertEquals(tenantWidgetsBundles, loadedWidgetsBundles2); Assert.assertEquals(tenantWidgetsBundles, loadedWidgetsBundles2);
// cleanup
loginSysAdmin();
for (WidgetsBundle sysWidgetsBundle : sysWidgetsBundles) {
doDelete("/api/widgetsBundle/" + sysWidgetsBundle.getId().getId().toString())
.andExpect(status().isOk());
}
} }
@Test @Test

View File

@ -458,7 +458,7 @@ public class TbRuleEngineQueueConsumerManagerTest {
verify(consumer2, never()).unsubscribe(); verify(consumer2, never()).unsubscribe();
int msgCount = totalConsumedMsgs.get(); int msgCount = totalConsumedMsgs.get();
await().atLeast(4, TimeUnit.SECONDS) // based on topicDeletionDelayInSec await().atLeast(2, TimeUnit.SECONDS) // based on topicDeletionDelayInSec(5) = 5 - ( 3 seconds the code may execute starting consumerManager.delete() call)
.atMost(7, TimeUnit.SECONDS) .atMost(7, TimeUnit.SECONDS)
.untilAsserted(() -> { .untilAsserted(() -> {
partitions.stream() partitions.stream()
@ -498,7 +498,7 @@ public class TbRuleEngineQueueConsumerManagerTest {
verify(consumer, never()).unsubscribe(); verify(consumer, never()).unsubscribe();
int msgCount = totalConsumedMsgs.get(); int msgCount = totalConsumedMsgs.get();
await().atLeast(4, TimeUnit.SECONDS) await().atLeast(2, TimeUnit.SECONDS) // based on topicDeletionDelayInSec(5) = 5 - ( 3 seconds the code may execute starting consumerManager.delete() call)
.atMost(7, TimeUnit.SECONDS) .atMost(7, TimeUnit.SECONDS)
.untilAsserted(() -> { .untilAsserted(() -> {
partitions.stream() partitions.stream()

View File

@ -38,17 +38,22 @@ public class DeviceCacheKey implements Serializable {
this(null, deviceId, null); this(null, deviceId, null);
} }
public DeviceCacheKey(TenantId tenantId, DeviceId deviceId) {
this(tenantId, deviceId, null);
}
public DeviceCacheKey(TenantId tenantId, String deviceName) { public DeviceCacheKey(TenantId tenantId, String deviceName) {
this(tenantId, null, deviceName); this(tenantId, null, deviceName);
} }
@Override @Override
public String toString() { public String toString() {
if (deviceId != null) { if (deviceId == null) {
return tenantId + "_n_" + deviceName;
} else if (tenantId == null) {
return deviceId.toString(); return deviceId.toString();
} else { } else {
return tenantId + "_n_" + deviceName; return tenantId + "_" + deviceId;
} }
} }
} }

View File

@ -130,15 +130,13 @@ public class DeviceServiceImpl extends AbstractCachedEntityService<DeviceCacheKe
public Device findDeviceById(TenantId tenantId, DeviceId deviceId) { public Device findDeviceById(TenantId tenantId, DeviceId deviceId) {
log.trace("Executing findDeviceById [{}]", deviceId); log.trace("Executing findDeviceById [{}]", deviceId);
validateId(deviceId, INCORRECT_DEVICE_ID + deviceId); validateId(deviceId, INCORRECT_DEVICE_ID + deviceId);
return cache.getAndPutInTransaction(new DeviceCacheKey(deviceId),
() -> {
//TODO: possible bug source since sometimes we need to clear cache by tenant id and sometimes by sys tenant id?
if (TenantId.SYS_TENANT_ID.equals(tenantId)) { if (TenantId.SYS_TENANT_ID.equals(tenantId)) {
return deviceDao.findById(tenantId, deviceId.getId()); return cache.getAndPutInTransaction(new DeviceCacheKey(deviceId),
() -> deviceDao.findById(tenantId, deviceId.getId()), true);
} else { } else {
return deviceDao.findDeviceByTenantIdAndId(tenantId, deviceId.getId()); return cache.getAndPutInTransaction(new DeviceCacheKey(tenantId, deviceId),
() -> deviceDao.findDeviceByTenantIdAndId(tenantId, deviceId.getId()), true);
} }
}, true);
} }
@Override @Override
@ -259,6 +257,7 @@ public class DeviceServiceImpl extends AbstractCachedEntityService<DeviceCacheKe
keys.add(new DeviceCacheKey(event.getTenantId(), event.getNewName())); keys.add(new DeviceCacheKey(event.getTenantId(), event.getNewName()));
if (event.getDeviceId() != null) { if (event.getDeviceId() != null) {
keys.add(new DeviceCacheKey(event.getDeviceId())); keys.add(new DeviceCacheKey(event.getDeviceId()));
keys.add(new DeviceCacheKey(event.getTenantId(), event.getDeviceId()));
} }
if (StringUtils.isNotEmpty(event.getOldName()) && !event.getOldName().equals(event.getNewName())) { if (StringUtils.isNotEmpty(event.getOldName()) && !event.getOldName().equals(event.getNewName())) {
keys.add(new DeviceCacheKey(event.getTenantId(), event.getOldName())); keys.add(new DeviceCacheKey(event.getTenantId(), event.getOldName()));