Merge branch 'develop/3.5' of github.com:thingsboard/thingsboard into develop/3.5

This commit is contained in:
Andrii Shvaika 2023-04-18 14:12:32 +03:00
commit a1e816ada4
10 changed files with 102 additions and 85 deletions

View File

@ -288,7 +288,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
if (session != null) {
boolean success = false;
if (session.isConnected()) {
session.startSyncProcess(tenantId, edgeId, true);
session.startSyncProcess(true);
success = true;
}
clusterService.pushEdgeSyncResponseToCore(new FromEdgeSyncResponse(requestId, tenantId, edgeId, success));

View File

@ -148,7 +148,7 @@ public final class EdgeGrpcSession implements Closeable {
if (requestMsg.getSyncRequestMsg().hasFullSync()) {
fullSync = requestMsg.getSyncRequestMsg().getFullSync();
}
startSyncProcess(edge.getTenantId(), edge.getId(), fullSync);
startSyncProcess(fullSync);
} else {
syncCompleted = true;
}
@ -192,10 +192,10 @@ public final class EdgeGrpcSession implements Closeable {
};
}
public void startSyncProcess(TenantId tenantId, EdgeId edgeId, boolean fullSync) {
log.trace("[{}][{}] Staring edge sync process", tenantId, edgeId);
public void startSyncProcess(boolean fullSync) {
log.trace("[{}][{}][{}] Staring edge sync process", edge.getTenantId(), edge.getId(), this.sessionId);
syncCompleted = false;
interruptGeneralProcessingOnSync(tenantId, edgeId);
interruptGeneralProcessingOnSync();
doSync(new EdgeSyncCursor(ctx, edge, fullSync));
}
@ -221,9 +221,9 @@ public final class EdgeGrpcSession implements Closeable {
.setDownlinkMsgId(EdgeUtils.nextPositiveInt())
.setSyncCompletedMsg(SyncCompletedMsg.newBuilder().build())
.build();
Futures.addCallback(sendDownlinkMsgsPack(Collections.singletonList(syncCompleteDownlinkMsg)), new FutureCallback<Void>() {
Futures.addCallback(sendDownlinkMsgsPack(Collections.singletonList(syncCompleteDownlinkMsg)), new FutureCallback<>() {
@Override
public void onSuccess(Void result) {
public void onSuccess(Boolean isInterrupted) {
syncCompleted = true;
ctx.getClusterService().onEdgeEventUpdate(edge.getTenantId(), edge.getId());
}
@ -272,7 +272,7 @@ public final class EdgeGrpcSession implements Closeable {
}
if (sessionState.getPendingMsgsMap().isEmpty()) {
log.debug("[{}] Pending msgs map is empty. Stopping current iteration", edge.getRoutingKey());
stopCurrentSendDownlinkMsgsTask(null);
stopCurrentSendDownlinkMsgsTask(false);
}
} catch (Exception e) {
log.error("[{}] Can't process downlink response message [{}]", this.sessionId, msg, e);
@ -367,14 +367,19 @@ public final class EdgeGrpcSession implements Closeable {
if (isConnected() && !pageData.getData().isEmpty()) {
log.trace("[{}] [{}] event(s) are going to be processed.", this.sessionId, pageData.getData().size());
List<DownlinkMsg> downlinkMsgsPack = convertToDownlinkMsgsPack(pageData.getData());
Futures.addCallback(sendDownlinkMsgsPack(downlinkMsgsPack), new FutureCallback<Void>() {
Futures.addCallback(sendDownlinkMsgsPack(downlinkMsgsPack), new FutureCallback<>() {
@Override
public void onSuccess(@Nullable Void tmp) {
if (isConnected() && pageData.hasNext()) {
processEdgeEvents(fetcher, pageLink.nextPageLink(), result);
public void onSuccess(@Nullable Boolean isInterrupted) {
if (Boolean.TRUE.equals(isInterrupted)) {
log.debug("[{}][{}][{}] Send downlink messages task was interrupted", edge.getTenantId(), edge.getId(), sessionId);
result.set(null);
} else {
UUID ifOffset = pageData.getData().get(pageData.getData().size() - 1).getUuidId();
result.set(ifOffset);
if (isConnected() && pageData.hasNext()) {
processEdgeEvents(fetcher, pageLink.nextPageLink(), result);
} else {
UUID ifOffset = pageData.getData().get(pageData.getData().size() - 1).getUuidId();
result.set(ifOffset);
}
}
}
@ -394,7 +399,7 @@ public final class EdgeGrpcSession implements Closeable {
}
}
private ListenableFuture<Void> sendDownlinkMsgsPack(List<DownlinkMsg> downlinkMsgsPack) {
private ListenableFuture<Boolean> sendDownlinkMsgsPack(List<DownlinkMsg> downlinkMsgsPack) {
interruptPreviousSendDownlinkMsgsTask();
sessionState.setSendDownlinkMsgsFuture(SettableFuture.create());
@ -433,14 +438,14 @@ public final class EdgeGrpcSession implements Closeable {
} else {
log.warn("[{}] Failed to deliver the batch after {} attempts. Next messages are going to be discarded {}",
this.sessionId, MAX_DOWNLINK_ATTEMPTS, copy);
stopCurrentSendDownlinkMsgsTask(null);
stopCurrentSendDownlinkMsgsTask(false);
}
} else {
stopCurrentSendDownlinkMsgsTask(null);
stopCurrentSendDownlinkMsgsTask(false);
}
} catch (Exception e) {
log.warn("[{}] Failed to send downlink msgs. Error msg {}", this.sessionId, e.getMessage(), e);
stopCurrentSendDownlinkMsgsTask(e);
stopCurrentSendDownlinkMsgsTask(true);
}
};
@ -688,23 +693,18 @@ public final class EdgeGrpcSession implements Closeable {
}
private void interruptPreviousSendDownlinkMsgsTask() {
String msg = String.format("[%s] Previous send downlink future was not properly completed, stopping it now!", this.sessionId);
stopCurrentSendDownlinkMsgsTask(new RuntimeException(msg));
log.debug("[{}][{}][{}] Previous send downlink future was not properly completed, stopping it now!", edge.getTenantId(), edge.getId(), this.sessionId);
stopCurrentSendDownlinkMsgsTask(true);
}
private void interruptGeneralProcessingOnSync(TenantId tenantId, EdgeId edgeId) {
String msg = String.format("[%s][%s] Sync process started. General processing interrupted!", tenantId, edgeId);
stopCurrentSendDownlinkMsgsTask(new RuntimeException(msg));
private void interruptGeneralProcessingOnSync() {
log.debug("[{}][{}][{}] Sync process started. General processing interrupted!", edge.getTenantId(), edge.getId(), this.sessionId);
stopCurrentSendDownlinkMsgsTask(true);
}
public void stopCurrentSendDownlinkMsgsTask(Exception e) {
public void stopCurrentSendDownlinkMsgsTask(Boolean isInterrupted) {
if (sessionState.getSendDownlinkMsgsFuture() != null && !sessionState.getSendDownlinkMsgsFuture().isDone()) {
if (e != null) {
log.debug(e.getMessage());
sessionState.getSendDownlinkMsgsFuture().setException(e);
} else {
sessionState.getSendDownlinkMsgsFuture().set(null);
}
sessionState.getSendDownlinkMsgsFuture().set(isInterrupted);
}
if (sessionState.getScheduledSendDownlinkTask() != null) {
sessionState.getScheduledSendDownlinkTask().cancel(true);

View File

@ -28,6 +28,6 @@ import java.util.concurrent.ScheduledFuture;
public class EdgeSessionState {
private final Map<Integer, DownlinkMsg> pendingMsgsMap = Collections.synchronizedMap(new LinkedHashMap<>());
private SettableFuture<Void> sendDownlinkMsgsFuture;
private SettableFuture<Boolean> sendDownlinkMsgsFuture;
private ScheduledFuture<?> scheduledSendDownlinkTask;
}

View File

@ -23,7 +23,6 @@ import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.transaction.annotation.Transactional;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.EntityType;
@ -87,7 +86,6 @@ public abstract class BaseEntityImportService<I extends EntityId, E extends Expo
@Autowired
protected TbNotificationEntityService entityNotificationService;
@Transactional(rollbackFor = Exception.class)
@Override
public EntityImportResult<E> importEntity(EntitiesImportCtx ctx, D exportData) throws ThingsboardException {
EntityImportResult<E> importResult = new EntityImportResult<>();
@ -336,27 +334,37 @@ public abstract class BaseEntityImportService<I extends EntityId, E extends Expo
if (externalUuid.equals(EntityId.NULL_UUID)) return Optional.empty();
for (EntityType entityType : EntityType.values()) {
Optional<EntityId> externalIdOpt = buildEntityId(entityType, externalUuid);
if (!externalIdOpt.isPresent()) {
Optional<EntityId> externalId = buildEntityId(entityType, externalUuid);
if (externalId.isEmpty()) {
continue;
}
EntityId internalId = ctx.getInternalId(externalIdOpt.get());
EntityId internalId = ctx.getInternalId(externalId.get());
if (internalId != null) {
return Optional.of(internalId);
}
}
if (fetchAllUUIDs) {
for (EntityType entityType : hints) {
Optional<EntityId> internalId = lookupInDb(externalUuid, entityType);
if (internalId.isPresent()) return internalId;
}
Set<EntityType> processLast = Set.of(EntityType.TENANT);
List<EntityType> entityTypes = new ArrayList<>(hints);
for (EntityType entityType : EntityType.values()) {
if (hints.contains(entityType)) {
if (!hints.contains(entityType) && !processLast.contains(entityType)) {
entityTypes.add(entityType);
}
}
entityTypes.addAll(processLast);
for (EntityType entityType : entityTypes) {
Optional<EntityId> externalId = buildEntityId(entityType, externalUuid);
if (externalId.isEmpty() || ctx.isNotFound(externalId.get())) {
continue;
}
Optional<EntityId> internalId = lookupInDb(externalUuid, entityType);
if (internalId.isPresent()) return internalId;
EntityId internalId = getInternalId(externalId.get(), false);
if (internalId != null) {
return Optional.of(internalId);
} else {
ctx.registerNotFound(externalId.get());
}
}
}
@ -364,20 +372,6 @@ public abstract class BaseEntityImportService<I extends EntityId, E extends Expo
return Optional.empty();
}
private Optional<EntityId> lookupInDb(UUID externalUuid, EntityType entityType) {
Optional<EntityId> externalIdOpt = buildEntityId(entityType, externalUuid);
if (externalIdOpt.isEmpty() || ctx.isNotFound(externalIdOpt.get())) {
return Optional.empty();
}
EntityId internalId = getInternalId(externalIdOpt.get(), false);
if (internalId != null) {
return Optional.of(internalId);
} else {
ctx.registerNotFound(externalIdOpt.get());
}
return Optional.empty();
}
private Optional<EntityId> buildEntityId(EntityType entityType, UUID externalUuid) {
try {
return Optional.of(EntityIdFactory.getByTypeAndUuid(entityType, externalUuid));

View File

@ -65,10 +65,10 @@ public class DashboardImportService extends BaseEntityImportService<DashboardId,
@Override
protected Dashboard prepare(EntitiesImportCtx ctx, Dashboard dashboard, Dashboard old, EntityExportData<Dashboard> exportData, IdProvider idProvider) {
for (JsonNode entityAlias : dashboard.getEntityAliasesConfig()) {
replaceIdsRecursively(ctx, idProvider, entityAlias, Collections.emptySet(), HINTS);
replaceIdsRecursively(ctx, idProvider, entityAlias, Set.of("id"), HINTS);
}
for (JsonNode widgetConfig : dashboard.getWidgetsConfig()) {
replaceIdsRecursively(ctx, idProvider, JacksonUtil.getSafely(widgetConfig, "config", "actions"), Collections.singleton("id"), HINTS);
replaceIdsRecursively(ctx, idProvider, JacksonUtil.getSafely(widgetConfig, "config", "actions"), Set.of("id"), HINTS);
}
return dashboard;
}

View File

@ -44,7 +44,6 @@ import org.thingsboard.server.common.data.id.HasId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.util.ThrowingRunnable;
import org.thingsboard.server.common.data.sync.ie.EntityExportData;
import org.thingsboard.server.common.data.sync.ie.EntityExportSettings;
import org.thingsboard.server.common.data.sync.ie.EntityImportResult;
@ -69,6 +68,7 @@ import org.thingsboard.server.common.data.sync.vc.request.load.EntityTypeVersion
import org.thingsboard.server.common.data.sync.vc.request.load.SingleEntityVersionLoadRequest;
import org.thingsboard.server.common.data.sync.vc.request.load.VersionLoadConfig;
import org.thingsboard.server.common.data.sync.vc.request.load.VersionLoadRequest;
import org.thingsboard.server.common.data.util.ThrowingRunnable;
import org.thingsboard.server.dao.DaoUtil;
import org.thingsboard.server.dao.edge.EdgeService;
import org.thingsboard.server.dao.exception.DeviceCredentialsValidationException;
@ -259,7 +259,7 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont
return gitServiceQueue.listEntitiesAtVersion(tenantId, versionId);
}
@SuppressWarnings({"UnstableApiUsage", "rawtypes"})
@SuppressWarnings({"rawtypes"})
@Override
public UUID loadEntitiesVersion(User user, VersionLoadRequest request) throws Exception {
EntitiesImportCtx ctx = new EntitiesImportCtx(UUID.randomUUID(), user, request.getVersionId());
@ -402,7 +402,7 @@ public class DefaultEntitiesVersionControlService implements EntitiesVersionCont
ctx.getImportedEntities().computeIfAbsent(entityType, t -> new HashSet<>())
.add(importResult.getSavedEntity().getId());
}
log.debug("Imported {} pack for tenant {}", entityType, ctx.getTenantId());
log.debug("Imported {} pack ({}) for tenant {}", entityType, entityDataList.size(), ctx.getTenantId());
offset += limit;
} while (entityDataList.size() == limit);
}

View File

@ -68,7 +68,6 @@ import org.thingsboard.server.service.ota.OtaPackageStateService;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -254,21 +253,25 @@ public class ExportImportServiceSqlTest extends BaseExportImportServiceTest {
Asset asset1 = createAsset(tenantId1, null, assetProfile.getId(), "Asset 1");
Asset asset2 = createAsset(tenantId1, null, assetProfile.getId(), "Asset 2");
Dashboard dashboard = createDashboard(tenantId1, null, "Dashboard 1");
DeviceProfile existingDeviceProfile = createDeviceProfile(tenantId2, null, null, "Existing");
String aliasId = "23c4185d-1497-9457-30b2-6d91e69a5b2c";
String unknownUuid = "ea0dc8b0-3d85-11ed-9200-77fc04fa14fa";
String entityAliases = "{\n" +
"\t\"23c4185d-1497-9457-30b2-6d91e69a5b2c\": {\n" +
"\t\t\"alias\": \"assets\",\n" +
"\t\t\"filter\": {\n" +
"\t\t\t\"entityList\": [\n" +
"\t\t\t\t\"" + asset1.getId().toString() + "\",\n" +
"\t\t\t\t\"" + asset2.getId().toString() + "\"\n" +
"\t\t\t],\n" +
"\t\t\t\"entityType\": \"ASSET\",\n" +
"\t\t\t\"resolveMultiple\": true,\n" +
"\t\t\t\"type\": \"entityList\"\n" +
"\t\t},\n" +
"\t\t\"id\": \"23c4185d-1497-9457-30b2-6d91e69a5b2c\"\n" +
"\t}\n" +
"\"" + aliasId + "\": {\n" +
"\"alias\": \"assets\",\n" +
"\"filter\": {\n" +
"\"entityList\": [\n" +
"\"" + asset1.getId().toString() + "\",\n" +
"\"" + asset2.getId().toString() + "\",\n" +
"\"" + tenantId1.getId().toString() + "\",\n" +
"\"" + existingDeviceProfile.getId().toString() + "\",\n" +
"\"" + unknownUuid + "\"\n" +
"],\n" +
"\"resolveMultiple\": true\n" +
"},\n" +
"\"id\": \"" + aliasId + "\"\n" +
"}\n" +
"}";
ObjectNode dashboardConfiguration = JacksonUtil.newObjectNode();
dashboardConfiguration.set("entityAliases", JacksonUtil.toJsonNode(entityAliases));
@ -287,11 +290,23 @@ public class ExportImportServiceSqlTest extends BaseExportImportServiceTest {
Asset importedAsset2 = importEntity(tenantAdmin2, asset2ExportData).getSavedEntity();
Dashboard importedDashboard = importEntity(tenantAdmin2, dashboardExportData).getSavedEntity();
Set<String> entityAliasEntitiesIds = Streams.stream(importedDashboard.getConfiguration()
.get("entityAliases").elements().next().get("filter").get("entityList").elements())
.map(JsonNode::asText).collect(Collectors.toSet());
assertThat(entityAliasEntitiesIds).doesNotContain(asset1.getId().toString(), asset2.getId().toString());
assertThat(entityAliasEntitiesIds).contains(importedAsset1.getId().toString(), importedAsset2.getId().toString());
Map.Entry<String, JsonNode> entityAlias = importedDashboard.getConfiguration().get("entityAliases").fields().next();
assertThat(entityAlias.getKey()).isEqualTo(aliasId);
assertThat(entityAlias.getValue().get("id").asText()).isEqualTo(aliasId);
List<String> aliasEntitiesIds = Streams.stream(entityAlias.getValue().get("filter").get("entityList").elements())
.map(JsonNode::asText).collect(Collectors.toList());
assertThat(aliasEntitiesIds).size().isEqualTo(5);
assertThat(aliasEntitiesIds).element(0).as("external asset 1 was replaced with imported one")
.isEqualTo(importedAsset1.getId().toString());
assertThat(aliasEntitiesIds).element(1).as("external asset 2 was replaced with imported one")
.isEqualTo(importedAsset2.getId().toString());
assertThat(aliasEntitiesIds).element(2).as("external tenant id was replaced with new tenant id")
.isEqualTo(tenantId2.toString());
assertThat(aliasEntitiesIds).element(3).as("existing device profile id was left as is")
.isEqualTo(existingDeviceProfile.getId().toString());
assertThat(aliasEntitiesIds).element(4).as("unresolved uuid was replaced with tenant id")
.isEqualTo(tenantId2.toString());
}
@ -469,7 +484,7 @@ public class ExportImportServiceSqlTest extends BaseExportImportServiceTest {
Device device = createDevice(tenantId1, null, deviceProfile.getId(), "Device 1");
Map<EntityType, EntityExportData> entitiesExportData = Stream.of(customer.getId(), asset.getId(), device.getId(),
ruleChain.getId(), dashboard.getId(), assetProfile.getId(), deviceProfile.getId())
ruleChain.getId(), dashboard.getId(), assetProfile.getId(), deviceProfile.getId())
.map(entityId -> {
try {
return exportEntity(tenantAdmin1, entityId, EntityExportSettings.builder()

View File

@ -160,9 +160,12 @@ public class DeviceProfileServiceImpl extends AbstractCachedEntityService<Device
handleEvictEvent(new DeviceProfileEvictEvent(deviceProfile.getTenantId(), deviceProfile.getName(),
oldDeviceProfile != null ? oldDeviceProfile.getName() : null, null, deviceProfile.isDefault(),
oldDeviceProfile != null ? oldDeviceProfile.getProvisionDeviceKey() : null));
String unqProvisionKeyErrorMsg = DeviceProfileProvisionType.X509_CERTIFICATE_CHAIN.equals(deviceProfile.getProvisionType())
? "Device profile with such certificate already exists!"
: "Device profile with such provision device key already exists!";
checkConstraintViolation(t,
Map.of("device_profile_name_unq_key", DEVICE_PROFILE_WITH_SUCH_NAME_ALREADY_EXISTS,
"device_provision_key_unq_key", "Device profile with such provision device key already exists!",
"device_provision_key_unq_key", unqProvisionKeyErrorMsg,
"device_profile_external_id_unq_key", "Device profile with such external id already exists!"));
throw t;
}

View File

@ -33,14 +33,19 @@
<pattern>%d{ISO8601} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{ISO8601} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<logger name="org" level="WARN"/>
<logger name="org.thingsboard.server" level="INFO"/>
<logger name="org.thingsboard.monitoring" level="DEBUG"/>
<logger name="org.thingsboard.monitoring.client" level="INFO"/>
<root level="INFO">
<appender-ref ref="fileLogAppender"/>
<appender-ref ref="STDOUT"/>
</root>
</configuration>

View File

@ -107,7 +107,7 @@ export class DeviceProfileProvisionConfigurationComponent implements ControlValu
}
const certificateRegExPattern: string = this.provisionConfigurationFormGroup.get('certificateRegExPattern').value;
if (!certificateRegExPattern || !certificateRegExPattern.length) {
this.provisionConfigurationFormGroup.get('certificateRegExPattern').patchValue('[\\w]*', {emitEvent: false});
this.provisionConfigurationFormGroup.get('certificateRegExPattern').patchValue('(.*)', {emitEvent: false});
}
const allowCreateNewDevicesByX509Certificate: boolean | null = this.provisionConfigurationFormGroup.get('allowCreateNewDevicesByX509Certificate').value;
if (!isBoolean(allowCreateNewDevicesByX509Certificate)) {