From dae9a422a15e02999ca6447d3c3aa0375a3f37d8 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Wed, 12 Apr 2023 16:44:24 +0300 Subject: [PATCH 1/3] Fix entities ids replaced with tenant id in dashboard config when importing --- .../impl/BaseEntityImportService.java | 46 +++++++--------- .../impl/DashboardImportService.java | 4 +- .../DefaultEntitiesVersionControlService.java | 6 +- .../sync/ie/ExportImportServiceSqlTest.java | 55 ++++++++++++------- monitoring/src/main/conf/logback.xml | 7 ++- 5 files changed, 66 insertions(+), 52 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/BaseEntityImportService.java b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/BaseEntityImportService.java index f72ac8a11f..610f1ee0b5 100644 --- a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/BaseEntityImportService.java +++ b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/BaseEntityImportService.java @@ -23,7 +23,6 @@ import lombok.extern.slf4j.Slf4j; import org.checkerframework.checker.nullness.qual.Nullable; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; -import org.springframework.transaction.annotation.Transactional; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.EntityType; @@ -87,7 +86,6 @@ public abstract class BaseEntityImportService importEntity(EntitiesImportCtx ctx, D exportData) throws ThingsboardException { EntityImportResult importResult = new EntityImportResult<>(); @@ -336,27 +334,37 @@ public abstract class BaseEntityImportService externalIdOpt = buildEntityId(entityType, externalUuid); - if (!externalIdOpt.isPresent()) { + Optional externalId = buildEntityId(entityType, externalUuid); + if (externalId.isEmpty()) { continue; } - EntityId internalId = ctx.getInternalId(externalIdOpt.get()); + EntityId internalId = ctx.getInternalId(externalId.get()); if (internalId != null) { return Optional.of(internalId); } } if (fetchAllUUIDs) { - for (EntityType entityType : hints) { - Optional internalId = lookupInDb(externalUuid, entityType); - if (internalId.isPresent()) return internalId; - } + Set processLast = Set.of(EntityType.TENANT); + List entityTypes = new ArrayList<>(hints); for (EntityType entityType : EntityType.values()) { - if (hints.contains(entityType)) { + if (!hints.contains(entityType) && !processLast.contains(entityType)) { + entityTypes.add(entityType); + } + } + entityTypes.addAll(processLast); + + for (EntityType entityType : entityTypes) { + Optional externalId = buildEntityId(entityType, externalUuid); + if (externalId.isEmpty() || ctx.isNotFound(externalId.get())) { continue; } - Optional internalId = lookupInDb(externalUuid, entityType); - if (internalId.isPresent()) return internalId; + EntityId internalId = getInternalId(externalId.get(), false); + if (internalId != null) { + return Optional.of(internalId); + } else { + ctx.registerNotFound(externalId.get()); + } } } @@ -364,20 +372,6 @@ public abstract class BaseEntityImportService lookupInDb(UUID externalUuid, EntityType entityType) { - Optional externalIdOpt = buildEntityId(entityType, externalUuid); - if (externalIdOpt.isEmpty() || ctx.isNotFound(externalIdOpt.get())) { - return Optional.empty(); - } - EntityId internalId = getInternalId(externalIdOpt.get(), false); - if (internalId != null) { - return Optional.of(internalId); - } else { - ctx.registerNotFound(externalIdOpt.get()); - } - return Optional.empty(); - } - private Optional buildEntityId(EntityType entityType, UUID externalUuid) { try { return Optional.of(EntityIdFactory.getByTypeAndUuid(entityType, externalUuid)); diff --git a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/DashboardImportService.java b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/DashboardImportService.java index 611b536808..250026c71c 100644 --- a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/DashboardImportService.java +++ b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/DashboardImportService.java @@ -65,10 +65,10 @@ public class DashboardImportService extends BaseEntityImportService exportData, IdProvider idProvider) { for (JsonNode entityAlias : dashboard.getEntityAliasesConfig()) { - replaceIdsRecursively(ctx, idProvider, entityAlias, Collections.emptySet(), HINTS); + replaceIdsRecursively(ctx, idProvider, entityAlias, Set.of("id"), HINTS); } for (JsonNode widgetConfig : dashboard.getWidgetsConfig()) { - replaceIdsRecursively(ctx, idProvider, JacksonUtil.getSafely(widgetConfig, "config", "actions"), Collections.singleton("id"), HINTS); + replaceIdsRecursively(ctx, idProvider, JacksonUtil.getSafely(widgetConfig, "config", "actions"), Set.of("id"), HINTS); } return dashboard; } diff --git a/application/src/main/java/org/thingsboard/server/service/sync/vc/DefaultEntitiesVersionControlService.java b/application/src/main/java/org/thingsboard/server/service/sync/vc/DefaultEntitiesVersionControlService.java index 8ffd4796fe..0e08ee9c48 100644 --- a/application/src/main/java/org/thingsboard/server/service/sync/vc/DefaultEntitiesVersionControlService.java +++ b/application/src/main/java/org/thingsboard/server/service/sync/vc/DefaultEntitiesVersionControlService.java @@ -44,7 +44,6 @@ import org.thingsboard.server.common.data.id.HasId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageLink; -import org.thingsboard.server.common.data.util.ThrowingRunnable; import org.thingsboard.server.common.data.sync.ie.EntityExportData; import org.thingsboard.server.common.data.sync.ie.EntityExportSettings; import org.thingsboard.server.common.data.sync.ie.EntityImportResult; @@ -69,6 +68,7 @@ import org.thingsboard.server.common.data.sync.vc.request.load.EntityTypeVersion import org.thingsboard.server.common.data.sync.vc.request.load.SingleEntityVersionLoadRequest; import org.thingsboard.server.common.data.sync.vc.request.load.VersionLoadConfig; import org.thingsboard.server.common.data.sync.vc.request.load.VersionLoadRequest; +import org.thingsboard.server.common.data.util.ThrowingRunnable; import org.thingsboard.server.dao.DaoUtil; import org.thingsboard.server.dao.edge.EdgeService; import org.thingsboard.server.dao.exception.DeviceCredentialsValidationException; @@ -259,7 +259,7 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont return gitServiceQueue.listEntitiesAtVersion(tenantId, versionId); } - @SuppressWarnings({"UnstableApiUsage", "rawtypes"}) + @SuppressWarnings({"rawtypes"}) @Override public UUID loadEntitiesVersion(User user, VersionLoadRequest request) throws Exception { EntitiesImportCtx ctx = new EntitiesImportCtx(UUID.randomUUID(), user, request.getVersionId()); @@ -402,7 +402,7 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont ctx.getImportedEntities().computeIfAbsent(entityType, t -> new HashSet<>()) .add(importResult.getSavedEntity().getId()); } - log.debug("Imported {} pack for tenant {}", entityType, ctx.getTenantId()); + log.debug("Imported {} pack ({}) for tenant {}", entityType, entityDataList.size(), ctx.getTenantId()); offset += limit; } while (entityDataList.size() == limit); } diff --git a/application/src/test/java/org/thingsboard/server/service/sync/ie/ExportImportServiceSqlTest.java b/application/src/test/java/org/thingsboard/server/service/sync/ie/ExportImportServiceSqlTest.java index 26e8b3e92a..e493916b8a 100644 --- a/application/src/test/java/org/thingsboard/server/service/sync/ie/ExportImportServiceSqlTest.java +++ b/application/src/test/java/org/thingsboard/server/service/sync/ie/ExportImportServiceSqlTest.java @@ -68,7 +68,6 @@ import org.thingsboard.server.service.ota.OtaPackageStateService; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -254,21 +253,25 @@ public class ExportImportServiceSqlTest extends BaseExportImportServiceTest { Asset asset1 = createAsset(tenantId1, null, assetProfile.getId(), "Asset 1"); Asset asset2 = createAsset(tenantId1, null, assetProfile.getId(), "Asset 2"); Dashboard dashboard = createDashboard(tenantId1, null, "Dashboard 1"); + DeviceProfile existingDeviceProfile = createDeviceProfile(tenantId2, null, null, "Existing"); + String aliasId = "23c4185d-1497-9457-30b2-6d91e69a5b2c"; + String unknownUuid = "ea0dc8b0-3d85-11ed-9200-77fc04fa14fa"; String entityAliases = "{\n" + - "\t\"23c4185d-1497-9457-30b2-6d91e69a5b2c\": {\n" + - "\t\t\"alias\": \"assets\",\n" + - "\t\t\"filter\": {\n" + - "\t\t\t\"entityList\": [\n" + - "\t\t\t\t\"" + asset1.getId().toString() + "\",\n" + - "\t\t\t\t\"" + asset2.getId().toString() + "\"\n" + - "\t\t\t],\n" + - "\t\t\t\"entityType\": \"ASSET\",\n" + - "\t\t\t\"resolveMultiple\": true,\n" + - "\t\t\t\"type\": \"entityList\"\n" + - "\t\t},\n" + - "\t\t\"id\": \"23c4185d-1497-9457-30b2-6d91e69a5b2c\"\n" + - "\t}\n" + + "\"" + aliasId + "\": {\n" + + "\"alias\": \"assets\",\n" + + "\"filter\": {\n" + + "\"entityList\": [\n" + + "\"" + asset1.getId().toString() + "\",\n" + + "\"" + asset2.getId().toString() + "\",\n" + + "\"" + tenantId1.getId().toString() + "\",\n" + + "\"" + existingDeviceProfile.getId().toString() + "\",\n" + + "\"" + unknownUuid + "\"\n" + + "],\n" + + "\"resolveMultiple\": true\n" + + "},\n" + + "\"id\": \"" + aliasId + "\"\n" + + "}\n" + "}"; ObjectNode dashboardConfiguration = JacksonUtil.newObjectNode(); dashboardConfiguration.set("entityAliases", JacksonUtil.toJsonNode(entityAliases)); @@ -287,11 +290,23 @@ public class ExportImportServiceSqlTest extends BaseExportImportServiceTest { Asset importedAsset2 = importEntity(tenantAdmin2, asset2ExportData).getSavedEntity(); Dashboard importedDashboard = importEntity(tenantAdmin2, dashboardExportData).getSavedEntity(); - Set entityAliasEntitiesIds = Streams.stream(importedDashboard.getConfiguration() - .get("entityAliases").elements().next().get("filter").get("entityList").elements()) - .map(JsonNode::asText).collect(Collectors.toSet()); - assertThat(entityAliasEntitiesIds).doesNotContain(asset1.getId().toString(), asset2.getId().toString()); - assertThat(entityAliasEntitiesIds).contains(importedAsset1.getId().toString(), importedAsset2.getId().toString()); + Map.Entry entityAlias = importedDashboard.getConfiguration().get("entityAliases").fields().next(); + assertThat(entityAlias.getKey()).isEqualTo(aliasId); + assertThat(entityAlias.getValue().get("id").asText()).isEqualTo(aliasId); + + List aliasEntitiesIds = Streams.stream(entityAlias.getValue().get("filter").get("entityList").elements()) + .map(JsonNode::asText).collect(Collectors.toList()); + assertThat(aliasEntitiesIds).size().isEqualTo(5); + assertThat(aliasEntitiesIds).element(0).as("external asset 1 was replaced with imported one") + .isEqualTo(importedAsset1.getId().toString()); + assertThat(aliasEntitiesIds).element(1).as("external asset 2 was replaced with imported one") + .isEqualTo(importedAsset2.getId().toString()); + assertThat(aliasEntitiesIds).element(2).as("external tenant id was replaced with new tenant id") + .isEqualTo(tenantId2.toString()); + assertThat(aliasEntitiesIds).element(3).as("existing device profile id was left as is") + .isEqualTo(existingDeviceProfile.getId().toString()); + assertThat(aliasEntitiesIds).element(4).as("unresolved uuid was replaced with tenant id") + .isEqualTo(tenantId2.toString()); } @@ -469,7 +484,7 @@ public class ExportImportServiceSqlTest extends BaseExportImportServiceTest { Device device = createDevice(tenantId1, null, deviceProfile.getId(), "Device 1"); Map entitiesExportData = Stream.of(customer.getId(), asset.getId(), device.getId(), - ruleChain.getId(), dashboard.getId(), assetProfile.getId(), deviceProfile.getId()) + ruleChain.getId(), dashboard.getId(), assetProfile.getId(), deviceProfile.getId()) .map(entityId -> { try { return exportEntity(tenantAdmin1, entityId, EntityExportSettings.builder() diff --git a/monitoring/src/main/conf/logback.xml b/monitoring/src/main/conf/logback.xml index 58e49c7639..3c1e1382b7 100644 --- a/monitoring/src/main/conf/logback.xml +++ b/monitoring/src/main/conf/logback.xml @@ -33,14 +33,19 @@ %d{ISO8601} [%thread] %-5level %logger{36} - %msg%n + + + %d{ISO8601} [%thread] %-5level %logger{36} - %msg%n + + - + From a5cfe079996e53d38b3a57038c7013915878a00f Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Mon, 17 Apr 2023 16:36:33 +0300 Subject: [PATCH 2/3] Edge - improved logic of interruption of sending downlink tasks --- .../service/edge/rpc/EdgeGrpcService.java | 2 +- .../service/edge/rpc/EdgeGrpcSession.java | 58 +++++++++---------- .../service/edge/rpc/EdgeSessionState.java | 2 +- 3 files changed, 31 insertions(+), 31 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java index f273517b74..5171a6037f 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java @@ -288,7 +288,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i if (session != null) { boolean success = false; if (session.isConnected()) { - session.startSyncProcess(tenantId, edgeId, true); + session.startSyncProcess(true); success = true; } clusterService.pushEdgeSyncResponseToCore(new FromEdgeSyncResponse(requestId, tenantId, edgeId, success)); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index 9e3a22f64f..33aed92682 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -148,7 +148,7 @@ public final class EdgeGrpcSession implements Closeable { if (requestMsg.getSyncRequestMsg().hasFullSync()) { fullSync = requestMsg.getSyncRequestMsg().getFullSync(); } - startSyncProcess(edge.getTenantId(), edge.getId(), fullSync); + startSyncProcess(fullSync); } else { syncCompleted = true; } @@ -192,10 +192,10 @@ public final class EdgeGrpcSession implements Closeable { }; } - public void startSyncProcess(TenantId tenantId, EdgeId edgeId, boolean fullSync) { - log.trace("[{}][{}] Staring edge sync process", tenantId, edgeId); + public void startSyncProcess(boolean fullSync) { + log.trace("[{}][{}][{}] Staring edge sync process", edge.getTenantId(), edge.getId(), this.sessionId); syncCompleted = false; - interruptGeneralProcessingOnSync(tenantId, edgeId); + interruptGeneralProcessingOnSync(); doSync(new EdgeSyncCursor(ctx, edge, fullSync)); } @@ -221,9 +221,9 @@ public final class EdgeGrpcSession implements Closeable { .setDownlinkMsgId(EdgeUtils.nextPositiveInt()) .setSyncCompletedMsg(SyncCompletedMsg.newBuilder().build()) .build(); - Futures.addCallback(sendDownlinkMsgsPack(Collections.singletonList(syncCompleteDownlinkMsg)), new FutureCallback() { + Futures.addCallback(sendDownlinkMsgsPack(Collections.singletonList(syncCompleteDownlinkMsg)), new FutureCallback<>() { @Override - public void onSuccess(Void result) { + public void onSuccess(Boolean isInterrupted) { syncCompleted = true; ctx.getClusterService().onEdgeEventUpdate(edge.getTenantId(), edge.getId()); } @@ -272,7 +272,7 @@ public final class EdgeGrpcSession implements Closeable { } if (sessionState.getPendingMsgsMap().isEmpty()) { log.debug("[{}] Pending msgs map is empty. Stopping current iteration", edge.getRoutingKey()); - stopCurrentSendDownlinkMsgsTask(null); + stopCurrentSendDownlinkMsgsTask(false); } } catch (Exception e) { log.error("[{}] Can't process downlink response message [{}]", this.sessionId, msg, e); @@ -367,14 +367,19 @@ public final class EdgeGrpcSession implements Closeable { if (isConnected() && !pageData.getData().isEmpty()) { log.trace("[{}] [{}] event(s) are going to be processed.", this.sessionId, pageData.getData().size()); List downlinkMsgsPack = convertToDownlinkMsgsPack(pageData.getData()); - Futures.addCallback(sendDownlinkMsgsPack(downlinkMsgsPack), new FutureCallback() { + Futures.addCallback(sendDownlinkMsgsPack(downlinkMsgsPack), new FutureCallback<>() { @Override - public void onSuccess(@Nullable Void tmp) { - if (isConnected() && pageData.hasNext()) { - processEdgeEvents(fetcher, pageLink.nextPageLink(), result); + public void onSuccess(@Nullable Boolean isInterrupted) { + if (Boolean.TRUE.equals(isInterrupted)) { + log.debug("[{}][{}][{}] Send downlink messages task was interrupted", edge.getTenantId(), edge.getId(), sessionId); + result.set(null); } else { - UUID ifOffset = pageData.getData().get(pageData.getData().size() - 1).getUuidId(); - result.set(ifOffset); + if (isConnected() && pageData.hasNext()) { + processEdgeEvents(fetcher, pageLink.nextPageLink(), result); + } else { + UUID ifOffset = pageData.getData().get(pageData.getData().size() - 1).getUuidId(); + result.set(ifOffset); + } } } @@ -394,7 +399,7 @@ public final class EdgeGrpcSession implements Closeable { } } - private ListenableFuture sendDownlinkMsgsPack(List downlinkMsgsPack) { + private ListenableFuture sendDownlinkMsgsPack(List downlinkMsgsPack) { interruptPreviousSendDownlinkMsgsTask(); sessionState.setSendDownlinkMsgsFuture(SettableFuture.create()); @@ -433,14 +438,14 @@ public final class EdgeGrpcSession implements Closeable { } else { log.warn("[{}] Failed to deliver the batch after {} attempts. Next messages are going to be discarded {}", this.sessionId, MAX_DOWNLINK_ATTEMPTS, copy); - stopCurrentSendDownlinkMsgsTask(null); + stopCurrentSendDownlinkMsgsTask(false); } } else { - stopCurrentSendDownlinkMsgsTask(null); + stopCurrentSendDownlinkMsgsTask(false); } } catch (Exception e) { log.warn("[{}] Failed to send downlink msgs. Error msg {}", this.sessionId, e.getMessage(), e); - stopCurrentSendDownlinkMsgsTask(e); + stopCurrentSendDownlinkMsgsTask(true); } }; @@ -688,23 +693,18 @@ public final class EdgeGrpcSession implements Closeable { } private void interruptPreviousSendDownlinkMsgsTask() { - String msg = String.format("[%s] Previous send downlink future was not properly completed, stopping it now!", this.sessionId); - stopCurrentSendDownlinkMsgsTask(new RuntimeException(msg)); + log.debug("[{}][{}][{}] Previous send downlink future was not properly completed, stopping it now!", edge.getTenantId(), edge.getId(), this.sessionId); + stopCurrentSendDownlinkMsgsTask(true); } - private void interruptGeneralProcessingOnSync(TenantId tenantId, EdgeId edgeId) { - String msg = String.format("[%s][%s] Sync process started. General processing interrupted!", tenantId, edgeId); - stopCurrentSendDownlinkMsgsTask(new RuntimeException(msg)); + private void interruptGeneralProcessingOnSync() { + log.debug("[{}][{}][{}] Sync process started. General processing interrupted!", edge.getTenantId(), edge.getId(), this.sessionId); + stopCurrentSendDownlinkMsgsTask(true); } - public void stopCurrentSendDownlinkMsgsTask(Exception e) { + public void stopCurrentSendDownlinkMsgsTask(Boolean isInterrupted) { if (sessionState.getSendDownlinkMsgsFuture() != null && !sessionState.getSendDownlinkMsgsFuture().isDone()) { - if (e != null) { - log.debug(e.getMessage()); - sessionState.getSendDownlinkMsgsFuture().setException(e); - } else { - sessionState.getSendDownlinkMsgsFuture().set(null); - } + sessionState.getSendDownlinkMsgsFuture().set(isInterrupted); } if (sessionState.getScheduledSendDownlinkTask() != null) { sessionState.getScheduledSendDownlinkTask().cancel(true); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeSessionState.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeSessionState.java index 59910c829f..09a7c5add8 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeSessionState.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeSessionState.java @@ -28,6 +28,6 @@ import java.util.concurrent.ScheduledFuture; public class EdgeSessionState { private final Map pendingMsgsMap = Collections.synchronizedMap(new LinkedHashMap<>()); - private SettableFuture sendDownlinkMsgsFuture; + private SettableFuture sendDownlinkMsgsFuture; private ScheduledFuture scheduledSendDownlinkTask; } From 9c1e57a8c134a7db86cdbc177054fcb45e5e9d6f Mon Sep 17 00:00:00 2001 From: Andrii Landiak Date: Mon, 17 Apr 2023 18:26:11 +0300 Subject: [PATCH 3/3] Add correct validation message for X.509 certificate in Device Profile --- .../server/dao/device/DeviceProfileServiceImpl.java | 5 ++++- .../device-profile-provision-configuration.component.ts | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceProfileServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceProfileServiceImpl.java index a5b3e56341..391695f2bf 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceProfileServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceProfileServiceImpl.java @@ -160,9 +160,12 @@ public class DeviceProfileServiceImpl extends AbstractCachedEntityService