Merge branch 'rc' of github.com:thingsboard/thingsboard into rc

This commit is contained in:
Igor Kulikov 2025-03-20 19:16:47 +02:00
commit 9e0cf30fb9
17 changed files with 350 additions and 133 deletions

View File

@ -177,8 +177,8 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
if (strategy.sendWsUpdate()) {
addWsCallback(resultFuture, success -> onTimeSeriesUpdate(tenantId, entityId, request.getEntries()));
}
if (strategy.saveLatest()) {
copyLatestToEntityViews(tenantId, entityId, request.getEntries());
if (strategy.saveLatest() && entityId.getEntityType().isOneOf(EntityType.DEVICE, EntityType.ASSET)) {
addMainCallback(resultFuture, __ -> copyLatestToEntityViews(tenantId, entityId, request.getEntries()));
}
return resultFuture;
}
@ -333,58 +333,56 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
}
private void copyLatestToEntityViews(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts) {
if (EntityType.DEVICE.equals(entityId.getEntityType()) || EntityType.ASSET.equals(entityId.getEntityType())) {
Futures.addCallback(this.tbEntityViewService.findEntityViewsByTenantIdAndEntityIdAsync(tenantId, entityId),
new FutureCallback<>() {
@Override
public void onSuccess(@Nullable List<EntityView> result) {
if (result != null && !result.isEmpty()) {
Map<String, List<TsKvEntry>> tsMap = new HashMap<>();
for (TsKvEntry entry : ts) {
tsMap.computeIfAbsent(entry.getKey(), s -> new ArrayList<>()).add(entry);
Futures.addCallback(tbEntityViewService.findEntityViewsByTenantIdAndEntityIdAsync(tenantId, entityId),
new FutureCallback<>() {
@Override
public void onSuccess(@Nullable List<EntityView> result) {
if (result != null && !result.isEmpty()) {
Map<String, List<TsKvEntry>> tsMap = new HashMap<>();
for (TsKvEntry entry : ts) {
tsMap.computeIfAbsent(entry.getKey(), s -> new ArrayList<>()).add(entry);
}
for (EntityView entityView : result) {
List<String> keys = entityView.getKeys() != null && entityView.getKeys().getTimeseries() != null ?
entityView.getKeys().getTimeseries() : new ArrayList<>(tsMap.keySet());
List<TsKvEntry> entityViewLatest = new ArrayList<>();
long startTs = entityView.getStartTimeMs();
long endTs = entityView.getEndTimeMs() == 0 ? Long.MAX_VALUE : entityView.getEndTimeMs();
for (String key : keys) {
List<TsKvEntry> entries = tsMap.get(key);
if (entries != null) {
Optional<TsKvEntry> tsKvEntry = entries.stream()
.filter(entry -> entry.getTs() > startTs && entry.getTs() <= endTs)
.max(comparingLong(TsKvEntry::getTs));
tsKvEntry.ifPresent(entityViewLatest::add);
}
}
for (EntityView entityView : result) {
List<String> keys = entityView.getKeys() != null && entityView.getKeys().getTimeseries() != null ?
entityView.getKeys().getTimeseries() : new ArrayList<>(tsMap.keySet());
List<TsKvEntry> entityViewLatest = new ArrayList<>();
long startTs = entityView.getStartTimeMs();
long endTs = entityView.getEndTimeMs() == 0 ? Long.MAX_VALUE : entityView.getEndTimeMs();
for (String key : keys) {
List<TsKvEntry> entries = tsMap.get(key);
if (entries != null) {
Optional<TsKvEntry> tsKvEntry = entries.stream()
.filter(entry -> entry.getTs() > startTs && entry.getTs() <= endTs)
.max(comparingLong(TsKvEntry::getTs));
tsKvEntry.ifPresent(entityViewLatest::add);
}
}
if (!entityViewLatest.isEmpty()) {
saveTimeseries(TimeseriesSaveRequest.builder()
.tenantId(tenantId)
.entityId(entityView.getId())
.entries(entityViewLatest)
.strategy(TimeseriesSaveRequest.Strategy.LATEST_AND_WS)
.callback(new FutureCallback<>() {
@Override
public void onSuccess(@Nullable Void tmp) {}
if (!entityViewLatest.isEmpty()) {
saveTimeseries(TimeseriesSaveRequest.builder()
.tenantId(tenantId)
.entityId(entityView.getId())
.entries(entityViewLatest)
.strategy(TimeseriesSaveRequest.Strategy.LATEST_AND_WS)
.callback(new FutureCallback<>() {
@Override
public void onSuccess(@Nullable Void tmp) {}
@Override
public void onFailure(Throwable t) {
log.error("[{}][{}] Failed to save entity view latest timeseries: {}", tenantId, entityView.getId(), entityViewLatest, t);
}
})
.build());
}
@Override
public void onFailure(Throwable t) {
log.error("[{}][{}] Failed to save entity view latest timeseries: {}", tenantId, entityView.getId(), entityViewLatest, t);
}
})
.build());
}
}
}
}
@Override
public void onFailure(Throwable t) {
log.error("Error while finding entity views by tenantId and entityId", t);
}
}, MoreExecutors.directExecutor());
}
@Override
public void onFailure(Throwable t) {
log.error("Error while finding entity views by tenantId and entityId", t);
}
}, MoreExecutors.directExecutor());
}
private void onAttributesUpdate(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes) {

View File

@ -1419,6 +1419,8 @@ device:
host: "${DEVICE_CONNECTIVITY_COAPS_HOST:}"
# Port of coap transport service. If empty, the default port for coaps will be used.
port: "${DEVICE_CONNECTIVITY_COAPS_PORT:5684}"
# Path to the COAP CA root certificate file
pem_cert_file: "${DEVICE_CONNECTIVITY_COAPS_CA_ROOT_CERT:cafile.pem}"
# Edges parameters
edges:

View File

@ -231,16 +231,19 @@ public class DeviceConnectivityControllerTest extends AbstractControllerTest {
credentials.getCredentialsId()));
JsonNode linuxCoapCommands = commands.get(COAP);
assertThat(linuxCoapCommands.get(COAP).asText()).isEqualTo(String.format("coap-client -v 6 -m POST coap://localhost:5683/api/v1/%s/telemetry " +
"-t json -e \"{temperature:25}\"", credentials.getCredentialsId()));
assertThat(linuxCoapCommands.get(COAPS).asText()).isEqualTo(String.format("coap-client-openssl -v 6 -m POST coaps://localhost:5684/api/v1/%s/telemetry" +
" -t json -e \"{temperature:25}\"", credentials.getCredentialsId()));
assertThat(linuxCoapCommands.get(COAP).asText()).isEqualTo(String.format("coap-client -v 6 -m POST " +
"-t \"application/json\" -e \"{temperature:25}\" coap://localhost:5683/api/v1/%s/telemetry", credentials.getCredentialsId()));
assertThat(linuxCoapCommands.get(COAPS).get(1).asText()).isEqualTo(String.format("coap-client-openssl -v 6 -m POST " +
"-R " + CA_ROOT_CERT_PEM + " -t \"application/json\" -e \"{temperature:25}\" coaps://localhost:5684/api/v1/%s/telemetry", credentials.getCredentialsId()));
JsonNode dockerCoapCommands = commands.get(COAP).get(DOCKER);
assertThat(dockerCoapCommands.get(COAP).asText()).isEqualTo(String.format("docker run --rm -it --add-host=host.docker.internal:host-gateway" +
" thingsboard/coap-clients coap-client -v 6 -m POST coap://host.docker.internal:5683/api/v1/%s/telemetry -t json -e \"{temperature:25}\"", credentials.getCredentialsId()));
" thingsboard/coap-clients coap-client -v 6 -m POST -t \"application/json\" -e \"{temperature:25}\" coap://host.docker.internal:5683/api/v1/%s/telemetry", credentials.getCredentialsId()));
assertThat(dockerCoapCommands.get(COAPS).asText()).isEqualTo(String.format("docker run --rm -it --add-host=host.docker.internal:host-gateway" +
" thingsboard/coap-clients coap-client-openssl -v 6 -m POST coaps://host.docker.internal:5684/api/v1/%s/telemetry -t json -e \"{temperature:25}\"", credentials.getCredentialsId()));
" thingsboard/coap-clients " +
"/bin/sh -c \"curl -f -S -o " + CA_ROOT_CERT_PEM + " http://localhost:80/api/device-connectivity/coaps/certificate/download && " +
"coap-client-openssl -v 6 -m POST -R " + CA_ROOT_CERT_PEM + " -t \"application/json\" -e \"{temperature:25}\" coaps://host.docker.internal:5684/api/v1/%s/telemetry\"", credentials.getCredentialsId()));
}
@Test
@ -376,16 +379,19 @@ public class DeviceConnectivityControllerTest extends AbstractControllerTest {
credentials.getCredentialsId()));
JsonNode linuxCoapCommands = commands.get(COAP);
assertThat(linuxCoapCommands.get(COAP).asText()).isEqualTo(String.format("coap-client -v 6 -m POST coap://[::1]:5683/api/v1/%s/telemetry " +
"-t json -e \"{temperature:25}\"", credentials.getCredentialsId()));
assertThat(linuxCoapCommands.get(COAPS).asText()).isEqualTo(String.format("coap-client-openssl -v 6 -m POST coaps://[::1]:5684/api/v1/%s/telemetry" +
" -t json -e \"{temperature:25}\"", credentials.getCredentialsId()));
assertThat(linuxCoapCommands.get(COAP).asText()).isEqualTo(String.format("coap-client -v 6 -m POST " +
"-t \"application/json\" -e \"{temperature:25}\" coap://[::1]:5683/api/v1/%s/telemetry", credentials.getCredentialsId()));
assertThat(linuxCoapCommands.get(COAPS).get(0).asText()).isEqualTo("curl -f -S -o " + CA_ROOT_CERT_PEM + " http://localhost:80/api/device-connectivity/coaps/certificate/download");
assertThat(linuxCoapCommands.get(COAPS).get(1).asText()).isEqualTo(String.format("coap-client-openssl -v 6 -m POST " +
"-R " + CA_ROOT_CERT_PEM + " -t \"application/json\" -e \"{temperature:25}\" coaps://[::1]:5684/api/v1/%s/telemetry", credentials.getCredentialsId()));
JsonNode dockerCoapCommands = commands.get(COAP).get(DOCKER);
assertThat(dockerCoapCommands.get(COAP).asText()).isEqualTo(String.format("docker run --rm -it --add-host=host.docker.internal:host-gateway" +
" thingsboard/coap-clients coap-client -v 6 -m POST coap://host.docker.internal:5683/api/v1/%s/telemetry -t json -e \"{temperature:25}\"", credentials.getCredentialsId()));
" thingsboard/coap-clients coap-client -v 6 -m POST -t \"application/json\" -e \"{temperature:25}\" coap://host.docker.internal:5683/api/v1/%s/telemetry", credentials.getCredentialsId()));
assertThat(dockerCoapCommands.get(COAPS).asText()).isEqualTo(String.format("docker run --rm -it --add-host=host.docker.internal:host-gateway" +
" thingsboard/coap-clients coap-client-openssl -v 6 -m POST coaps://host.docker.internal:5684/api/v1/%s/telemetry -t json -e \"{temperature:25}\"", credentials.getCredentialsId()));
" thingsboard/coap-clients " +
"/bin/sh -c \"curl -f -S -o " + CA_ROOT_CERT_PEM + " http://localhost:80/api/device-connectivity/coaps/certificate/download && " +
"coap-client-openssl -v 6 -m POST -R " + CA_ROOT_CERT_PEM + " -t \"application/json\" -e \"{temperature:25}\" coaps://host.docker.internal:5684/api/v1/%s/telemetry\"", credentials.getCredentialsId()));
}
@Test
@ -430,16 +436,19 @@ public class DeviceConnectivityControllerTest extends AbstractControllerTest {
credentials.getCredentialsId()));
JsonNode linuxCoapCommands = commands.get(COAP);
assertThat(linuxCoapCommands.get(COAP).asText()).isEqualTo(String.format("coap-client -v 6 -m POST coap://[1:1:1:1:1:1:1:1]:5683/api/v1/%s/telemetry " +
"-t json -e \"{temperature:25}\"", credentials.getCredentialsId()));
assertThat(linuxCoapCommands.get(COAPS).asText()).isEqualTo(String.format("coap-client-openssl -v 6 -m POST coaps://[1:1:1:1:1:1:1:1]:5684/api/v1/%s/telemetry" +
" -t json -e \"{temperature:25}\"", credentials.getCredentialsId()));
assertThat(linuxCoapCommands.get(COAP).asText()).isEqualTo(String.format("coap-client -v 6 -m POST " +
"-t \"application/json\" -e \"{temperature:25}\" coap://[1:1:1:1:1:1:1:1]:5683/api/v1/%s/telemetry", credentials.getCredentialsId()));
assertThat(linuxCoapCommands.get(COAPS).get(0).asText()).isEqualTo("curl -f -S -o " + CA_ROOT_CERT_PEM + " http://localhost:80/api/device-connectivity/coaps/certificate/download");
assertThat(linuxCoapCommands.get(COAPS).get(1).asText()).isEqualTo(String.format("coap-client-openssl -v 6 -m POST -R " + CA_ROOT_CERT_PEM +
" -t \"application/json\" -e \"{temperature:25}\" coaps://[1:1:1:1:1:1:1:1]:5684/api/v1/%s/telemetry", credentials.getCredentialsId()));
JsonNode dockerCoapCommands = commands.get(COAP).get(DOCKER);
assertThat(dockerCoapCommands.get(COAP).asText()).isEqualTo(String.format("docker run --rm -it" +
" thingsboard/coap-clients coap-client -v 6 -m POST coap://[1:1:1:1:1:1:1:1]:5683/api/v1/%s/telemetry -t json -e \"{temperature:25}\"", credentials.getCredentialsId()));
" thingsboard/coap-clients coap-client -v 6 -m POST -t \"application/json\" -e \"{temperature:25}\" coap://[1:1:1:1:1:1:1:1]:5683/api/v1/%s/telemetry", credentials.getCredentialsId()));
assertThat(dockerCoapCommands.get(COAPS).asText()).isEqualTo(String.format("docker run --rm -it" +
" thingsboard/coap-clients coap-client-openssl -v 6 -m POST coaps://[1:1:1:1:1:1:1:1]:5684/api/v1/%s/telemetry -t json -e \"{temperature:25}\"", credentials.getCredentialsId()));
" thingsboard/coap-clients " +
"/bin/sh -c \"curl -f -S -o " + CA_ROOT_CERT_PEM + " http://localhost:80/api/device-connectivity/coaps/certificate/download && " +
"coap-client-openssl -v 6 -m POST -R " + CA_ROOT_CERT_PEM + " -t \"application/json\" -e \"{temperature:25}\" coaps://[1:1:1:1:1:1:1:1]:5684/api/v1/%s/telemetry\"", credentials.getCredentialsId()));
}
@ -552,9 +561,10 @@ public class DeviceConnectivityControllerTest extends AbstractControllerTest {
assertThat(commands).hasSize(1);
JsonNode linuxCommands = commands.get(COAP);
assertThat(linuxCommands.get(COAP).asText()).isEqualTo(String.format("coap-client -v 6 -m POST coap://localhost:5683/api/v1/%s/telemetry -t json -e \"{temperature:25}\"",
assertThat(linuxCommands.get(COAP).asText()).isEqualTo(String.format("coap-client -v 6 -m POST -t \"application/json\" -e \"{temperature:25}\" coap://localhost:5683/api/v1/%s/telemetry",
credentials.getCredentialsId()));
assertThat(linuxCommands.get(COAPS).asText()).isEqualTo(String.format("coap-client-openssl -v 6 -m POST coaps://localhost:5684/api/v1/%s/telemetry -t json -e \"{temperature:25}\"",
assertThat(linuxCommands.get(COAPS).get(0).asText()).isEqualTo("curl -f -S -o " + CA_ROOT_CERT_PEM + " http://localhost:80/api/device-connectivity/coaps/certificate/download");
assertThat(linuxCommands.get(COAPS).get(1).asText()).isEqualTo(String.format("coap-client-openssl -v 6 -m POST -R " + CA_ROOT_CERT_PEM + " -t \"application/json\" -e \"{temperature:25}\" coaps://localhost:5684/api/v1/%s/telemetry",
credentials.getCredentialsId()));
}
@ -772,16 +782,18 @@ public class DeviceConnectivityControllerTest extends AbstractControllerTest {
"mosquitto_pub -d -q 1 --cafile " + CA_ROOT_CERT_PEM + " -h host.docker.internal -t v1/devices/me/telemetry -u \"%s\" -m \"{temperature:25}\"\"", credentials.getCredentialsId()));
JsonNode linuxCoapCommands = commands.get(COAP);
assertThat(linuxCoapCommands.get(COAP).asText()).isEqualTo(String.format("coap-client -v 6 -m POST coap://localhost/api/v1/%s/telemetry " +
"-t json -e \"{temperature:25}\"", credentials.getCredentialsId()));
assertThat(linuxCoapCommands.get(COAPS).asText()).isEqualTo(String.format("coap-client-openssl -v 6 -m POST coaps://localhost/api/v1/%s/telemetry" +
" -t json -e \"{temperature:25}\"", credentials.getCredentialsId()));
assertThat(linuxCoapCommands.get(COAP).asText()).isEqualTo(String.format("coap-client -v 6 -m POST " +
"-t \"application/json\" -e \"{temperature:25}\" coap://localhost/api/v1/%s/telemetry", credentials.getCredentialsId()));
assertThat(linuxCoapCommands.get(COAPS).get(0).asText()).isEqualTo("curl -f -S -o " + CA_ROOT_CERT_PEM + " http://localhost:80/api/device-connectivity/coaps/certificate/download");
assertThat(linuxCoapCommands.get(COAPS).get(1).asText()).isEqualTo(String.format("coap-client-openssl -v 6 -m POST " +
"-R " + CA_ROOT_CERT_PEM + " -t \"application/json\" -e \"{temperature:25}\" coaps://localhost/api/v1/%s/telemetry", credentials.getCredentialsId()));
JsonNode dockerCoapCommands = commands.get(COAP).get(DOCKER);
assertThat(dockerCoapCommands.get(COAP).asText()).isEqualTo(String.format("docker run --rm -it --add-host=host.docker.internal:host-gateway" +
" thingsboard/coap-clients coap-client -v 6 -m POST coap://host.docker.internal/api/v1/%s/telemetry -t json -e \"{temperature:25}\"", credentials.getCredentialsId()));
" thingsboard/coap-clients coap-client -v 6 -m POST -t \"application/json\" -e \"{temperature:25}\" coap://host.docker.internal/api/v1/%s/telemetry", credentials.getCredentialsId()));
assertThat(dockerCoapCommands.get(COAPS).asText()).isEqualTo(String.format("docker run --rm -it --add-host=host.docker.internal:host-gateway" +
" thingsboard/coap-clients coap-client-openssl -v 6 -m POST coaps://host.docker.internal/api/v1/%s/telemetry -t json -e \"{temperature:25}\"", credentials.getCredentialsId()));
" thingsboard/coap-clients /bin/sh -c \"curl -f -S -o " + CA_ROOT_CERT_PEM + " http://localhost:80/api/device-connectivity/coaps/certificate/download && " +
"coap-client-openssl -v 6 -m POST -R " + CA_ROOT_CERT_PEM + " -t \"application/json\" -e \"{temperature:25}\" coaps://host.docker.internal/api/v1/%s/telemetry\"", credentials.getCredentialsId()));
}
@Test
@ -831,16 +843,18 @@ public class DeviceConnectivityControllerTest extends AbstractControllerTest {
credentials.getCredentialsId()));
JsonNode linuxCoapCommands = commands.get(COAP);
assertThat(linuxCoapCommands.get(COAP).asText()).isEqualTo(String.format("coap-client -v 6 -m POST coap://test.domain:5683/api/v1/%s/telemetry " +
"-t json -e \"{temperature:25}\"", credentials.getCredentialsId()));
assertThat(linuxCoapCommands.get(COAPS).asText()).isEqualTo(String.format("coap-client-openssl -v 6 -m POST coaps://test.domain:5684/api/v1/%s/telemetry" +
" -t json -e \"{temperature:25}\"", credentials.getCredentialsId()));
assertThat(linuxCoapCommands.get(COAP).asText()).isEqualTo(String.format("coap-client -v 6 -m POST " +
"-t \"application/json\" -e \"{temperature:25}\" coap://test.domain:5683/api/v1/%s/telemetry", credentials.getCredentialsId()));
assertThat(linuxCoapCommands.get(COAPS).get(0).asText()).isEqualTo("curl -f -S -o " + CA_ROOT_CERT_PEM + " http://localhost:80/api/device-connectivity/coaps/certificate/download");
assertThat(linuxCoapCommands.get(COAPS).get(1).asText()).isEqualTo(String.format("coap-client-openssl -v 6 -m POST " +
"-R "+ CA_ROOT_CERT_PEM + " -t \"application/json\" -e \"{temperature:25}\" coaps://test.domain:5684/api/v1/%s/telemetry", credentials.getCredentialsId()));
JsonNode dockerCoapCommands = commands.get(COAP).get(DOCKER);
assertThat(dockerCoapCommands.get(COAP).asText()).isEqualTo(String.format("docker run --rm -it " +
"thingsboard/coap-clients coap-client -v 6 -m POST coap://test.domain:5683/api/v1/%s/telemetry -t json -e \"{temperature:25}\"", credentials.getCredentialsId()));
"thingsboard/coap-clients coap-client -v 6 -m POST -t \"application/json\" -e \"{temperature:25}\" coap://test.domain:5683/api/v1/%s/telemetry", credentials.getCredentialsId()));
assertThat(dockerCoapCommands.get(COAPS).asText()).isEqualTo(String.format("docker run --rm -it " +
"thingsboard/coap-clients coap-client-openssl -v 6 -m POST coaps://test.domain:5684/api/v1/%s/telemetry -t json -e \"{temperature:25}\"", credentials.getCredentialsId()));
"thingsboard/coap-clients /bin/sh -c \"curl -f -S -o " + CA_ROOT_CERT_PEM + " http://localhost:80/api/device-connectivity/coaps/certificate/download && " +
"coap-client-openssl -v 6 -m POST -R " + CA_ROOT_CERT_PEM + " -t \"application/json\" -e \"{temperature:25}\" coaps://test.domain:5684/api/v1/%s/telemetry\"", credentials.getCredentialsId()));
}
@Test
@ -917,12 +931,17 @@ public class DeviceConnectivityControllerTest extends AbstractControllerTest {
assertThat(dockerMqttCommands.get(MQTTS).asText()).isEqualTo(String.format("docker run --rm -it --add-host=host.docker.internal:host-gateway thingsboard/mosquitto-clients /bin/sh -c \"curl -f -S -o " + CA_ROOT_CERT_PEM + " http://localhost:80/api/device-connectivity/mqtts/certificate/download && mosquitto_pub -d -q 1 --cafile " + CA_ROOT_CERT_PEM + " -h host.docker.internal -t v1/devices/me/telemetry -u \"%s\" -m \"{temperature:25}\"\"", credentials.getCredentialsId()));
JsonNode coapCommands = commands.get(COAP);
assertThat(coapCommands.get(COAP).asText()).isEqualTo(String.format("coap-client -v 6 -m POST coap://localhost/api/v1/%s/telemetry -t json -e \"{temperature:25}\"", credentials.getCredentialsId()));
assertThat(coapCommands.get(COAPS).asText()).isEqualTo(String.format("coap-client-openssl -v 6 -m POST coaps://localhost/api/v1/%s/telemetry -t json -e \"{temperature:25}\"", credentials.getCredentialsId()));
assertThat(coapCommands.get(COAP).asText()).isEqualTo(String.format("coap-client -v 6 -m POST -t \"application/json\" -e \"{temperature:25}\" coap://localhost/api/v1/%s/telemetry", credentials.getCredentialsId()));
assertThat(coapCommands.get(COAPS).get(0).asText()).isEqualTo("curl -f -S -o " + CA_ROOT_CERT_PEM + " http://localhost:80/api/device-connectivity/coaps/certificate/download");
assertThat(coapCommands.get(COAPS).get(1).asText()).isEqualTo(String.format("coap-client-openssl -v 6 -m POST " +
"-R " + CA_ROOT_CERT_PEM + " -t \"application/json\" -e \"{temperature:25}\" coaps://localhost/api/v1/%s/telemetry", credentials.getCredentialsId()));
JsonNode dockerCoapCommands = coapCommands.get(DOCKER);
assertThat(dockerCoapCommands.get(COAP).asText()).isEqualTo(String.format("docker run --rm -it --add-host=host.docker.internal:host-gateway thingsboard/coap-clients coap-client -v 6 -m POST coap://host.docker.internal/api/v1/%s/telemetry -t json -e \"{temperature:25}\"", credentials.getCredentialsId()));
assertThat(dockerCoapCommands.get(COAPS).asText()).isEqualTo(String.format("docker run --rm -it --add-host=host.docker.internal:host-gateway thingsboard/coap-clients coap-client-openssl -v 6 -m POST coaps://host.docker.internal/api/v1/%s/telemetry -t json -e \"{temperature:25}\"", credentials.getCredentialsId()));
assertThat(dockerCoapCommands.get(COAP).asText()).isEqualTo(String.format("docker run --rm -it --add-host=host.docker.internal:host-gateway " +
"thingsboard/coap-clients coap-client -v 6 -m POST -t \"application/json\" -e \"{temperature:25}\" coap://host.docker.internal/api/v1/%s/telemetry", credentials.getCredentialsId()));
assertThat(dockerCoapCommands.get(COAPS).asText()).isEqualTo(String.format("docker run --rm -it --add-host=host.docker.internal:host-gateway " +
"thingsboard/coap-clients /bin/sh -c \"curl -f -S -o " + CA_ROOT_CERT_PEM + " http://localhost:80/api/device-connectivity/coaps/certificate/download && " +
"coap-client-openssl -v 6 -m POST -R " + CA_ROOT_CERT_PEM + " -t \"application/json\" -e \"{temperature:25}\" coaps://host.docker.internal/api/v1/%s/telemetry\"", credentials.getCredentialsId()));
}

View File

@ -0,0 +1,130 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.controller;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.junit.Before;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.test.context.TestPropertySource;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.edqs.EdqsSyncRequest;
import org.thingsboard.server.common.data.edqs.ToCoreEdqsRequest;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.query.DeviceTypeFilter;
import org.thingsboard.server.common.data.query.EntityData;
import org.thingsboard.server.common.data.query.EntityDataPageLink;
import org.thingsboard.server.common.data.query.EntityDataQuery;
import org.thingsboard.server.common.data.query.EntityKey;
import org.thingsboard.server.common.data.query.EntityKeyType;
import org.thingsboard.server.common.data.query.EntityKeyValueType;
import org.thingsboard.server.common.data.query.FilterPredicateValue;
import org.thingsboard.server.common.data.query.KeyFilter;
import org.thingsboard.server.common.data.query.StringFilterPredicate;
import org.thingsboard.server.dao.service.DaoSqlTest;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import static org.awaitility.Awaitility.await;
@DaoSqlTest
@TestPropertySource(properties = {
"queue.edqs.sync.enabled=true",
"queue.edqs.api.supported=true",
"queue.edqs.api.auto_enable=true",
"queue.edqs.mode=local"
})
public class EdqsControllerTest extends AbstractControllerTest {
@Autowired
private JdbcTemplate jdbcTemplate;
@Before
public void beforeEdqsControllerTest() throws Exception {
loginTenantAdmin();
}
@Test
public void testEdqsSync() throws Exception {
List<Device> devices = new ArrayList<>();
for (int i = 0; i < 3; i++) {
Device device = new Device();
device.setName("Device" + i);
device.setType("default");
device.setLabel("testLabel" + (int) (Math.random() * 1000));
ObjectNode additionalInfo = JacksonUtil.newObjectNode();
additionalInfo.put("gateway", true);
device.setAdditionalInfo(additionalInfo);
devices.add(doPost("/api/device", device, Device.class));
Thread.sleep(1);
}
DeviceTypeFilter filter = new DeviceTypeFilter();
filter.setDeviceTypes(List.of("default"));
filter.setDeviceNameFilter("");
List<EntityKey> entityFields = Collections.singletonList(new EntityKey(EntityKeyType.ENTITY_FIELD, "name"));
EntityDataPageLink pageLink = new EntityDataPageLink(10, 0, null, null);
EntityDataQuery query = new EntityDataQuery(filter, pageLink, entityFields, null, Collections.singletonList(getGatewayFilter()));
await().atMost(TIMEOUT, TimeUnit.SECONDS)
.until(() -> doPostWithTypedResponse("/api/entitiesQuery/find", query, new TypeReference<PageData<EntityData>>() {
}), result -> result.getTotalElements() == 3);
// update db
Device device1 = devices.get(0);
device1.setAdditionalInfo(JacksonUtil.newObjectNode());
jdbcTemplate.execute("update device set additional_info = '{}' where id = '" + device1.getId().getId().toString() + "'");
// do edqs sync
loginSysAdmin();
ToCoreEdqsRequest syncRequest = new ToCoreEdqsRequest(new EdqsSyncRequest(), null);
doPost("/api/edqs/system/request", syncRequest);
//check sync is finished
await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> {
Optional<AttributeKvEntry> attribute = attributesService.find(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, AttributeScope.SERVER_SCOPE, "edqsSyncState").get();
return attribute.isPresent() && attribute.get().getJsonValue().isPresent() &&
attribute.get().getJsonValue().get().contains("\"status\":\"FINISHED\"");
});
// check if the count is updated
loginTenantAdmin();
await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> doPostWithTypedResponse("/api/entitiesQuery/find", query, new TypeReference<PageData<EntityData>>() {
}), result -> result.getTotalElements() == 2);
}
private KeyFilter getGatewayFilter() {
KeyFilter additionalInfoFilter = new KeyFilter();
additionalInfoFilter.setKey(new EntityKey(EntityKeyType.ENTITY_FIELD, "additionalInfo"));
additionalInfoFilter.setValueType(EntityKeyValueType.STRING);
StringFilterPredicate predicate = new StringFilterPredicate();
predicate.setValue(FilterPredicateValue.fromString("\"gateway\":true"));
predicate.setOperation(StringFilterPredicate.StringOperation.CONTAINS);
additionalInfoFilter.setPredicate(predicate);
return additionalInfoFilter;
}
}

View File

@ -359,6 +359,45 @@ class DefaultTelemetrySubscriptionServiceTest {
then(subscriptionManagerService).shouldHaveNoInteractions();
}
@Test
void shouldNotCopyLatestToEntityViewWhenTimeseriesSaveFailedOnMainEntity() {
// GIVEN
var entityView = new EntityView(new EntityViewId(UUID.randomUUID()));
entityView.setTenantId(tenantId);
entityView.setCustomerId(customerId);
entityView.setEntityId(entityId);
entityView.setKeys(new TelemetryEntityView(sampleTimeseries.stream().map(KvEntry::getKey).toList(), new AttributesEntityView()));
// mock that there is one entity view
lenient().when(tbEntityViewService.findEntityViewsByTenantIdAndEntityIdAsync(tenantId, entityId)).thenReturn(immediateFuture(List.of(entityView)));
// mock that save latest call for entity view is successful
lenient().when(tsService.saveLatest(tenantId, entityView.getId(), sampleTimeseries)).thenReturn(immediateFuture(TimeseriesSaveResult.of(sampleTimeseries.size(), listOfNNumbers(sampleTimeseries.size()))));
// mock TPI for entity view
lenient().when(partitionService.resolve(ServiceType.TB_CORE, tenantId, entityView.getId())).thenReturn(tpi);
var request = TimeseriesSaveRequest.builder()
.tenantId(tenantId)
.customerId(customerId)
.entityId(entityId)
.entries(sampleTimeseries)
.ttl(sampleTtl)
.strategy(new TimeseriesSaveRequest.Strategy(true, true, false, false))
.build();
given(tsService.save(tenantId, entityId, sampleTimeseries, sampleTtl)).willReturn(immediateFailedFuture(new RuntimeException("failed to save data on main entity")));
// WHEN
telemetryService.saveTimeseries(request);
// THEN
// should save only time series for the main entity
then(tsService).should().save(tenantId, entityId, sampleTimeseries, sampleTtl);
then(tsService).shouldHaveNoMoreInteractions();
// should not send any WS updates
then(subscriptionManagerService).shouldHaveNoInteractions();
}
@ParameterizedTest
@MethodSource("allCombinationsOfFourBooleans")
void shouldCallCorrectSaveTimeseriesApiBasedOnBooleanFlagsInTheSaveRequest(boolean saveTimeseries, boolean saveLatest, boolean sendWsUpdate, boolean processCalculatedFields) {

View File

@ -97,4 +97,5 @@ public enum EntityType {
}
return false;
}
}

View File

@ -35,12 +35,12 @@ public class ApiUsageStateData extends BaseEntityData<ApiUsageStateFields> {
@Override
public String getEntityName() {
return getEntityOwnerName();
return getOwnerName();
}
@Override
public String getEntityOwnerName() {
return repo.getOwnerName(fields.getEntityId());
public String getOwnerName() {
return repo.getOwnerEntityName(fields.getEntityId());
}
}

View File

@ -98,8 +98,13 @@ public abstract class BaseEntityData<T extends EntityFields> implements EntityDa
}
@Override
public EntityType getOwnerType() {
return customerId != null ? EntityType.CUSTOMER : EntityType.TENANT;
public String getOwnerName() {
return repo.getOwnerEntityName(isTenantEntity() ? repo.getTenantId() : new CustomerId(getCustomerId()));
}
@Override
public String getOwnerType() {
return isTenantEntity() ? EntityType.TENANT.name() : EntityType.CUSTOMER.name();
}
@Override
@ -132,22 +137,21 @@ public abstract class BaseEntityData<T extends EntityFields> implements EntityDa
}
return switch (name) {
case "name" -> getEntityName();
case "ownerName" -> getEntityOwnerName();
case "ownerType" -> customerId != null ? EntityType.CUSTOMER.name() : EntityType.TENANT.name();
case "ownerName" -> getOwnerName();
case "ownerType" -> getOwnerType();
case "entityType" -> Optional.ofNullable(getEntityType()).map(EntityType::name).orElse("");
default -> fields.getAsString(name);
};
}
public String getEntityOwnerName() {
return repo.getOwnerName(getCustomerId() == null || CustomerId.NULL_UUID.equals(getCustomerId()) ? null :
new CustomerId(getCustomerId()));
}
public String getEntityName() {
return getFields().getName();
}
private boolean isTenantEntity() {
return getCustomerId() == null || CustomerId.NULL_UUID.equals(getCustomerId());
}
private String getRelatedParentId(QueryContext ctx) {
return Optional.ofNullable(ctx.getRelatedParentIdMap().get(getId()))
.map(UUID::toString)

View File

@ -54,7 +54,9 @@ public interface EntityData<T extends EntityFields> {
boolean removeTs(Integer keyId);
EntityType getOwnerType();
String getOwnerName();
String getOwnerType();
DataPoint getDataPoint(DataKey key, QueryContext queryContext);

View File

@ -421,14 +421,7 @@ public class TenantRepo {
return relations.computeIfAbsent(relationTypeGroup, type -> new RelationsRepo());
}
public String getOwnerName(EntityId ownerId) {
if (ownerId == null || (ownerId.getEntityType() == EntityType.CUSTOMER && ownerId.isNullUid())) {
return getOwnerEntityName(tenantId);
}
return getOwnerEntityName(ownerId);
}
private String getOwnerEntityName(EntityId entityId) {
public String getOwnerEntityName(EntityId entityId) {
EntityType entityType = entityId.getEntityType();
return switch (entityType) {
case CUSTOMER, TENANT -> {

View File

@ -67,10 +67,10 @@ import static org.thingsboard.server.common.data.query.ComplexFilterPredicate.Co
@Slf4j
public class RepositoryUtils {
public static final Comparator<SortableEntityData> SORT_ASC = Comparator.comparing((SortableEntityData sed) -> Optional.ofNullable(sed.getSortValue()).orElse(""))
public static final Comparator<SortableEntityData> SORT_ASC = Comparator.comparing((SortableEntityData sed) -> Optional.ofNullable(sed.getSortValue()).orElse(""), String.CASE_INSENSITIVE_ORDER)
.thenComparing(sp -> sp.getId().toString());
public static final Comparator<SortableEntityData> SORT_DESC = Comparator.comparing((SortableEntityData sed) -> Optional.ofNullable(sed.getSortValue()).orElse(""))
public static final Comparator<SortableEntityData> SORT_DESC = Comparator.comparing((SortableEntityData sed) -> Optional.ofNullable(sed.getSortValue()).orElse(""), String.CASE_INSENSITIVE_ORDER)
.thenComparing(sp -> sp.getId().toString()).reversed();
public static EntityType resolveEntityType(EntityFilter entityFilter) {

View File

@ -26,19 +26,17 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class VersionsStore {
private final Cache<String, Long> versions = Caffeine.newBuilder()
.expireAfterWrite(1, TimeUnit.HOURS)
.expireAfterWrite(24, TimeUnit.HOURS)
.build();
public boolean isNew(String key, Long version) {
AtomicBoolean isNew = new AtomicBoolean(false);
versions.asMap().compute(key, (k, prevVersion) -> {
if (prevVersion == null || prevVersion < version) {
if (prevVersion == null || prevVersion <= version) {
isNew.set(true);
return version;
} else {
if (version < prevVersion) {
log.info("[{}] Version {} is outdated, the latest is {}", key, version, prevVersion);
}
log.info("[{}] Version {} is outdated, the latest is {}", key, version, prevVersion);
return prevVersion;
}
});

View File

@ -41,6 +41,7 @@ import org.thingsboard.server.common.data.security.DeviceCredentialsType;
import org.thingsboard.server.dao.settings.AdminSettingsService;
import org.thingsboard.server.dao.util.DeviceConnectivityUtil;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URISyntaxException;
@ -83,6 +84,8 @@ public class DeviceConnectivityServiceImpl implements DeviceConnectivityService
@Value("${device.connectivity.mqtts.pem_cert_file:}")
private String mqttsPemCertFile;
@Value("${device.connectivity.coaps.pem_cert_file:}")
private String coapsPemCertFile;
@Override
public JsonNode findDevicePublishTelemetryCommands(String baseUrl, Device device) throws URISyntaxException {
@ -133,22 +136,19 @@ public class DeviceConnectivityServiceImpl implements DeviceConnectivityService
public Resource getPemCertFile(String protocol) {
return certs.computeIfAbsent(protocol, key -> {
DeviceConnectivityInfo connectivity = getConnectivity(protocol);
if (!MQTTS.equals(protocol) || connectivity == null) {
if (connectivity == null) {
log.warn("Unknown connectivity protocol: {}", protocol);
return null;
}
if (StringUtils.isNotBlank(mqttsPemCertFile) && ResourceUtils.resourceExists(this, mqttsPemCertFile)) {
try {
return getCert(mqttsPemCertFile);
} catch (Exception e) {
String msg = String.format("Failed to read %s server certificate!", protocol);
log.warn(msg);
throw new RuntimeException(msg, e);
return switch (protocol) {
case COAPS -> getCert(coapsPemCertFile);
case MQTTS -> getCert(mqttsPemCertFile);
default -> {
log.warn("Unsupported secure protocol: {}", protocol);
yield null;
}
} else {
return null;
}
};
});
}
@ -174,7 +174,11 @@ public class DeviceConnectivityServiceImpl implements DeviceConnectivityService
return info != null && info.isEnabled();
}
private Resource getCert(String path) throws Exception {
private Resource getCert(String path) {
if (StringUtils.isBlank(path) || !ResourceUtils.resourceExists(this, path)) {
return null;
}
StringBuilder pemContentBuilder = new StringBuilder();
try (InputStream inStream = ResourceUtils.getInputStream(this, path);
@ -197,6 +201,10 @@ public class DeviceConnectivityServiceImpl implements DeviceConnectivityService
pemContentBuilder.append("-----END CERTIFICATE-----\n");
}
}
} catch (Exception e) {
String msg = String.format("Failed to read %s server certificate!", path);
log.warn(msg);
throw new RuntimeException(msg, e);
}
return new ByteArrayResource(pemContentBuilder.toString().getBytes(StandardCharsets.UTF_8));
@ -311,8 +319,11 @@ public class DeviceConnectivityServiceImpl implements DeviceConnectivityService
}
if (isEnabled(COAPS)) {
ArrayNode coapsCommands = coapCommands.putArray(COAPS);
Optional.ofNullable(DeviceConnectivityUtil.getCurlPemCertCommand(baseUrl, COAPS))
.ifPresent(coapsCommands::add);
Optional.ofNullable(getCoapPublishCommand(COAPS, baseUrl, deviceCredentials))
.ifPresent(v -> coapCommands.put(COAPS, v));
.ifPresent(coapsCommands::add);
Optional.ofNullable(getDockerCoapPublishCommand(COAPS, baseUrl, deviceCredentials))
.ifPresent(v -> dockerCoapCommands.put(COAPS, v));
@ -336,7 +347,7 @@ public class DeviceConnectivityServiceImpl implements DeviceConnectivityService
DeviceConnectivityInfo properties = getConnectivity(protocol);
String host = getHost(baseUrl, properties, protocol);
String port = StringUtils.isBlank(properties.getPort()) ? "" : ":" + properties.getPort();
return DeviceConnectivityUtil.getDockerCoapPublishCommand(protocol, host, port, deviceCredentials);
return DeviceConnectivityUtil.getDockerCoapPublishCommand(protocol, baseUrl, host, port, deviceCredentials);
}
}

View File

@ -199,19 +199,39 @@ public class DeviceConnectivityUtil {
switch (deviceCredentials.getCredentialsType()) {
case ACCESS_TOKEN:
String client = COAPS.equals(protocol) ? "coap-client-openssl" : "coap-client";
return String.format("%s -v 6 -m POST %s://%s%s/api/v1/%s/telemetry -t json -e %s",
client, protocol, host, port, deviceCredentials.getCredentialsId(), JSON_EXAMPLE_PAYLOAD);
String certificate = COAPS.equals(protocol) ? " -R " + CA_ROOT_CERT_PEM : "";
return String.format("%s -v 6 -m POST%s -t \"application/json\" -e %s %s://%s%s/api/v1/%s/telemetry",
client, certificate, JSON_EXAMPLE_PAYLOAD, protocol, host, port, deviceCredentials.getCredentialsId());
default:
return null;
}
}
public static String getDockerCoapPublishCommand(String protocol, String host, String port, DeviceCredentials deviceCredentials) {
public static String getDockerCoapPublishCommand(String protocol, String baseUrl, String host, String port, DeviceCredentials deviceCredentials) {
String coapCommand = getCoapPublishCommand(protocol, host, port, deviceCredentials);
if (coapCommand != null && isLocalhost(host)) {
if (coapCommand == null) {
return null;
}
StringBuilder coapDockerCommand = new StringBuilder();
coapDockerCommand.append(DOCKER_RUN).append(isLocalhost(host) ? ADD_DOCKER_INTERNAL_HOST : "").append(COAP_IMAGE);
if (isLocalhost(host)) {
coapCommand = coapCommand.replace(host, HOST_DOCKER_INTERNAL);
}
return coapCommand != null ? String.format("%s%s%s", DOCKER_RUN + (isLocalhost(host) ? ADD_DOCKER_INTERNAL_HOST : ""), COAP_IMAGE, coapCommand) : null;
if (COAPS.equals(protocol)) {
coapDockerCommand.append("/bin/sh -c \"")
.append(getCurlPemCertCommand(baseUrl, protocol))
.append(" && ")
.append(coapCommand)
.append("\"");
} else {
coapDockerCommand.append(coapCommand);
}
return coapDockerCommand.toString();
}
public static String getHost(String baseUrl, DeviceConnectivityInfo properties, String protocol) throws URISyntaxException {

View File

@ -36,7 +36,7 @@
<pkg.disabled>false</pkg.disabled>
<pkg.process-resources.phase>process-resources</pkg.process-resources.phase>
<pkg.package.phase>package</pkg.package.phase>
<pkg.name>edqs</pkg.name>
<pkg.name>tb-edqs</pkg.name>
<pkg.win.dist>${project.build.directory}/windows</pkg.win.dist>
<pkg.copyInstallScripts>true</pkg.copyInstallScripts>
<pkg.implementationTitle>ThingsBoard Entity Data Query Service</pkg.implementationTitle>

View File

@ -34,7 +34,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<main.dir>${basedir}/../..</main.dir>
<pkg.name>edqs</pkg.name>
<pkg.name>tb-edqs</pkg.name>
<docker.name>tb-edqs</docker.name>
<pkg.logFolder>/var/log/${pkg.name}</pkg.logFolder>
<pkg.installFolder>/usr/share/${pkg.name}</pkg.installFolder>