diff --git a/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsService.java b/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsService.java index 024191a270..92ae38f7e9 100644 --- a/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsService.java +++ b/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsService.java @@ -39,6 +39,7 @@ import org.thingsboard.server.common.data.ObjectType; import org.thingsboard.server.common.data.edqs.EdqsEventType; import org.thingsboard.server.common.data.edqs.EdqsObject; import org.thingsboard.server.common.data.edqs.EdqsState; +import org.thingsboard.server.common.data.edqs.EdqsState.EdqsApiMode; import org.thingsboard.server.common.data.edqs.EdqsState.EdqsSyncStatus; import org.thingsboard.server.common.data.edqs.EdqsSyncRequest; import org.thingsboard.server.common.data.edqs.Entity; @@ -55,7 +56,8 @@ import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.edqs.processor.EdqsProducer; import org.thingsboard.server.edqs.state.EdqsPartitionService; -import org.thingsboard.server.edqs.util.EdqsConverter; +import org.thingsboard.server.edqs.util.DefaultEdqsMapper; +import org.thingsboard.server.edqs.util.EdqsMapper; import org.thingsboard.server.gen.transport.TransportProtos.EdqsEventMsg; import org.thingsboard.server.gen.transport.TransportProtos.ServiceInfo; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; @@ -82,7 +84,7 @@ import java.util.concurrent.TimeUnit; public class DefaultEdqsService implements EdqsService { private final EdqsClientQueueFactory queueFactory; - private final EdqsConverter edqsConverter; + private final EdqsMapper edqsMapper; private final EdqsSyncService edqsSyncService; private final EdqsApiService edqsApiService; private final DistributedLockService distributedLockService; @@ -182,7 +184,12 @@ public class DefaultEdqsService implements EdqsService { log.info("Processing system msg {}", msg); try { if (msg.getApiEnabled() != null) { - state.setApiEnabled(msg.getApiEnabled()); + if (msg.getApiEnabled()) { + state.setApiMode(EdqsApiMode.ENABLED); + } else { + state.setApiMode(EdqsApiMode.DISABLED); + } + log.info("New state: {}", state); } if (msg.getEdqsReady() != null) { onEdqsReady(msg.getEdqsReady()); @@ -252,8 +259,8 @@ public class DefaultEdqsService implements EdqsService { if (state.isApiReady()) { if (autoEnableApi) { - if (state.getApiEnabled() == null) { - state.setApiEnabled(true); + if (state.getApiMode() == null || state.getApiMode() == EdqsApiMode.AUTO_DISABLED) { + state.setApiMode(EdqsApiMode.AUTO_ENABLED); log.info("New state: {}. Auto-enabled EDQS API", state); } else { log.info("New state: {}. API mode left as is", state); @@ -263,7 +270,7 @@ public class DefaultEdqsService implements EdqsService { } } else { if (state.isApiEnabled()) { - state.setApiEnabled(false); + state.setApiMode(EdqsApiMode.AUTO_DISABLED); log.info("New state: {}. Disabled EDQS API", state); } else { log.info("New state: {}. API left disabled", state); @@ -284,7 +291,7 @@ public class DefaultEdqsService implements EdqsService { log.trace("[{}][{}] Ignoring update event, type {} not supported", tenantId, entityId, entityType); return; } - onUpdate(tenantId, objectType, edqsConverter.toEntity(entityType, entity)); + onUpdate(tenantId, objectType, DefaultEdqsMapper.toEntity(entityType, entity)); } @Override @@ -311,12 +318,11 @@ public class DefaultEdqsService implements EdqsService { protected void processEvent(TenantId tenantId, ObjectType objectType, EdqsEventType eventType, EdqsObject object) { executor.submit(() -> { try { - String key = object.key(); + String key = object.stringKey(); Long version = object.version(); EdqsEventMsg.Builder eventMsg = EdqsEventMsg.newBuilder() - .setKey(key) .setObjectType(objectType.name()) - .setData(ByteString.copyFrom(edqsConverter.serialize(objectType, object))) + .setData(ByteString.copyFrom(edqsMapper.serialize(object))) .setEventType(eventType.name()); if (version != null) { eventMsg.setVersion(version); diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index bebddd3f87..ea60d8dc44 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1782,6 +1782,8 @@ queue: max_request_timeout: "${TB_EDQS_MAX_REQUEST_TIMEOUT:20000}" # Thread pool size for EDQS requests executor request_executor_size: "${TB_EDQS_REQUEST_EXECUTOR_SIZE:50}" + # Time to live for EDQS versions cache in minutes. Must be bigger than the time taken for the sync process. + versions_cache_ttl: "${TB_EDQS_VERSIONS_CACHE_TTL_MINUTES:60}" # Strings longer than this threshold will be compressed string_compression_length_threshold: "${TB_EDQS_STRING_COMPRESSION_LENGTH_THRESHOLD:512}" stats: diff --git a/application/src/test/java/org/thingsboard/server/controller/EdqsEntityQueryControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/EdqsEntityQueryControllerTest.java index b84a324bf7..1bea463159 100644 --- a/application/src/test/java/org/thingsboard/server/controller/EdqsEntityQueryControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/EdqsEntityQueryControllerTest.java @@ -15,10 +15,15 @@ */ package org.thingsboard.server.controller; +import org.assertj.core.api.ThrowingConsumer; import org.junit.Before; +import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.mock.mockito.MockBean; import org.springframework.test.context.TestPropertySource; +import org.thingsboard.server.common.data.edqs.EdqsState; +import org.thingsboard.server.common.data.edqs.EdqsState.EdqsApiMode; +import org.thingsboard.server.common.data.edqs.ToCoreEdqsRequest; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.query.EntityCountQuery; import org.thingsboard.server.common.data.query.EntityData; @@ -26,9 +31,11 @@ import org.thingsboard.server.common.data.query.EntityDataQuery; import org.thingsboard.server.common.msg.edqs.EdqsService; import org.thingsboard.server.dao.service.DaoSqlTest; import org.thingsboard.server.edqs.util.EdqsRocksDb; +import org.thingsboard.server.queue.discovery.DiscoveryService; import java.util.concurrent.TimeUnit; +import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; @DaoSqlTest @@ -39,13 +46,16 @@ import static org.awaitility.Awaitility.await; "queue.edqs.api.supported=true", "queue.edqs.api.auto_enable=true", "queue.edqs.mode=local", - "queue.edqs.readiness_check_interval=1000" + "queue.edqs.readiness_check_interval=500" }) public class EdqsEntityQueryControllerTest extends EntityQueryControllerTest { @Autowired private EdqsService edqsService; + @Autowired + private DiscoveryService discoveryService; + @MockBean // so that we don't do backup for tests private EdqsRocksDb edqsRocksDb; @@ -66,4 +76,61 @@ public class EdqsEntityQueryControllerTest extends EntityQueryControllerTest { result -> result == expectedResult); } + @Test + public void testEdqsState() { + assertThat(edqsService.getState().getApiMode()).isEqualTo(EdqsApiMode.AUTO_ENABLED); + + // notifying EDQS is not ready: API should be auto-disabled + discoveryService.setReady(false); + verifyState(state -> { + assertThat(state.getApiMode()).isEqualTo(EdqsApiMode.AUTO_DISABLED); + assertThat(state.getEdqsReady()).isFalse(); + assertThat(state.isApiEnabled()).isFalse(); + }); + + // manually disabling API + edqsService.processSystemRequest(ToCoreEdqsRequest.builder() + .apiEnabled(false) + .build()); + verifyState(state -> { + assertThat(state.getApiMode()).isEqualTo(EdqsApiMode.DISABLED); + assertThat(state.getEdqsReady()).isFalse(); + assertThat(state.isApiEnabled()).isFalse(); + }); + + // notifying EDQS is ready: API should not be enabled automatically because manually disabled previously + discoveryService.setReady(true); + verifyState(state -> { + assertThat(state.getEdqsReady()).isTrue(); + assertThat(state.getApiMode()).isEqualTo(EdqsApiMode.DISABLED); + assertThat(state.isApiEnabled()).isFalse(); + }); + + // manually enabling API + edqsService.processSystemRequest(ToCoreEdqsRequest.builder() + .apiEnabled(true) + .build()); + verifyState(state -> { + assertThat(state.getApiMode()).isEqualTo(EdqsApiMode.ENABLED); + assertThat(state.getEdqsReady()).isTrue(); + assertThat(state.isApiEnabled()).isTrue(); + }); + + // notifying EDQS is not ready: API should be auto-disabled + discoveryService.setReady(false); + verifyState(state -> { + assertThat(state.getApiMode()).isEqualTo(EdqsApiMode.AUTO_DISABLED); + assertThat(state.getEdqsReady()).isFalse(); + assertThat(state.isApiEnabled()).isFalse(); + }); + + discoveryService.setReady(true); + } + + private void verifyState(ThrowingConsumer assertion) { + await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> { + assertThat(edqsService.getState()).satisfies(assertion); + }); + } + } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/edqs/AttributeKv.java b/common/data/src/main/java/org/thingsboard/server/common/data/edqs/AttributeKv.java index c162365257..3ffd6ad424 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/edqs/AttributeKv.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/edqs/AttributeKv.java @@ -25,6 +25,8 @@ import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.KvEntry; +import java.util.UUID; + @Data @AllArgsConstructor @NoArgsConstructor @@ -58,7 +60,7 @@ public class AttributeKv implements EdqsObject { } @Override - public String key() { + public String stringKey() { return "a_" + entityId + "_" + scope + "_" + key; } @@ -72,4 +74,6 @@ public class AttributeKv implements EdqsObject { return ObjectType.ATTRIBUTE_KV; } + public record Key(UUID entityId, AttributeScope scope, int key) implements EdqsObjectKey {} + } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/edqs/EdqsObject.java b/common/data/src/main/java/org/thingsboard/server/common/data/edqs/EdqsObject.java index a74c90208a..f4685f896a 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/edqs/EdqsObject.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/edqs/EdqsObject.java @@ -21,7 +21,7 @@ import org.thingsboard.server.common.data.ObjectType; public interface EdqsObject { @JsonIgnore - String key(); + String stringKey(); @JsonIgnore Long version(); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/edqs/EdqsObjectKey.java b/common/data/src/main/java/org/thingsboard/server/common/data/edqs/EdqsObjectKey.java new file mode 100644 index 0000000000..8d48bbd383 --- /dev/null +++ b/common/data/src/main/java/org/thingsboard/server/common/data/edqs/EdqsObjectKey.java @@ -0,0 +1,18 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.data.edqs; + +public interface EdqsObjectKey {} diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/edqs/EdqsState.java b/common/data/src/main/java/org/thingsboard/server/common/data/edqs/EdqsState.java index 52778ffd7e..7c3b328022 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/edqs/EdqsState.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/edqs/EdqsState.java @@ -18,6 +18,7 @@ package org.thingsboard.server.common.data.edqs; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import lombok.Getter; import lombok.NoArgsConstructor; +import lombok.Setter; import org.apache.commons.lang3.BooleanUtils; @Getter @@ -26,9 +27,10 @@ import org.apache.commons.lang3.BooleanUtils; public class EdqsState { private Boolean edqsReady; + @Setter private EdqsSyncStatus syncStatus; - - private Boolean apiEnabled; // null until auto-enabled or set manually + @Setter + private EdqsApiMode apiMode; public boolean setEdqsReady(boolean ready) { boolean changed = BooleanUtils.toBooleanDefaultIfNull(this.edqsReady, false) != ready; @@ -36,22 +38,12 @@ public class EdqsState { return changed; } - public void setSyncStatus(EdqsSyncStatus syncStatus) { - this.syncStatus = syncStatus; - } - - public boolean setApiEnabled(boolean apiEnabled) { - boolean changed = BooleanUtils.toBooleanDefaultIfNull(this.apiEnabled, false) != apiEnabled; - this.apiEnabled = apiEnabled; - return changed; - } - public boolean isApiReady() { return edqsReady && syncStatus == EdqsSyncStatus.FINISHED; } public boolean isApiEnabled() { - return apiEnabled != null && apiEnabled; + return apiMode != null && (apiMode == EdqsApiMode.ENABLED || apiMode == EdqsApiMode.AUTO_ENABLED); } @Override @@ -59,7 +51,7 @@ public class EdqsState { return '[' + "EDQS ready: " + edqsReady + ", sync status: " + syncStatus + - ", API enabled: " + apiEnabled + + ", API mode: " + apiMode + ']'; } @@ -70,4 +62,11 @@ public class EdqsState { FAILED } + public enum EdqsApiMode { + ENABLED, + AUTO_ENABLED, + DISABLED, + AUTO_DISABLED + } + } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/edqs/Entity.java b/common/data/src/main/java/org/thingsboard/server/common/data/edqs/Entity.java index c22ef147e3..96a5b26aee 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/edqs/Entity.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/edqs/Entity.java @@ -51,7 +51,7 @@ public class Entity implements EdqsObject { } @Override - public String key() { + public String stringKey() { return "e_" + fields.getId().toString(); } @@ -65,4 +65,6 @@ public class Entity implements EdqsObject { return ObjectType.fromEntityType(type); } + public record Key(UUID id) implements EdqsObjectKey {} + } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/edqs/LatestTsKv.java b/common/data/src/main/java/org/thingsboard/server/common/data/edqs/LatestTsKv.java index 8bd69c41a4..12695de423 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/edqs/LatestTsKv.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/edqs/LatestTsKv.java @@ -24,6 +24,8 @@ import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; +import java.util.UUID; + @Data @AllArgsConstructor @NoArgsConstructor @@ -53,7 +55,8 @@ public class LatestTsKv implements EdqsObject { this.version = version != null ? version : 0L; } - public String key() { + @Override + public String stringKey() { return "l_" + entityId + "_" + key; } @@ -67,4 +70,6 @@ public class LatestTsKv implements EdqsObject { return ObjectType.LATEST_TS_KV; } + public record Key(UUID entityId, int key) implements EdqsObjectKey {} + } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/edqs/fields/EntityFields.java b/common/data/src/main/java/org/thingsboard/server/common/data/edqs/fields/EntityFields.java index 1b0975542c..2eae806f43 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/edqs/fields/EntityFields.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/edqs/fields/EntityFields.java @@ -29,9 +29,7 @@ public interface EntityFields { Logger log = LoggerFactory.getLogger(EntityFields.class); - default UUID getId() { - return null; - } + UUID getId(); default UUID getTenantId() { return null; @@ -147,6 +145,7 @@ public interface EntityFields { default String getAsString(String key) { return switch (key) { + case "id" -> getId().toString(); case "createdTime" -> Long.toString(getCreatedTime()); case "title" -> getName(); case "type" -> getType(); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/relation/EntityRelation.java b/common/data/src/main/java/org/thingsboard/server/common/data/relation/EntityRelation.java index 8980d0e634..017c9e5010 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/relation/EntityRelation.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/relation/EntityRelation.java @@ -27,10 +27,12 @@ import org.thingsboard.server.common.data.BaseDataWithAdditionalInfo; import org.thingsboard.server.common.data.HasVersion; import org.thingsboard.server.common.data.ObjectType; import org.thingsboard.server.common.data.edqs.EdqsObject; +import org.thingsboard.server.common.data.edqs.EdqsObjectKey; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.validation.Length; import java.io.Serializable; +import java.util.UUID; @Slf4j @Schema @@ -118,8 +120,8 @@ public class EntityRelation implements HasVersion, Serializable, EdqsObject { BaseDataWithAdditionalInfo.setJson(addInfo, json -> this.additionalInfo = json, bytes -> this.additionalInfoBytes = bytes); } - @JsonIgnore - public String key() { + @Override + public String stringKey() { return "r_" + from + "_" + to + "_" + typeGroup + "_" + type; } @@ -133,4 +135,6 @@ public class EntityRelation implements HasVersion, Serializable, EdqsObject { return ObjectType.RELATION; } + public record Key(UUID from, UUID to, RelationTypeGroup typeGroup, String type) implements EdqsObjectKey {} + } diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java index 7b30fbaaa5..12256f56ca 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java @@ -45,7 +45,7 @@ import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.edqs.repo.EdqsRepository; import org.thingsboard.server.edqs.state.EdqsPartitionService; import org.thingsboard.server.edqs.state.EdqsStateService; -import org.thingsboard.server.edqs.util.EdqsConverter; +import org.thingsboard.server.edqs.util.EdqsMapper; import org.thingsboard.server.edqs.util.VersionsStore; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.EdqsEventMsg; @@ -81,7 +81,7 @@ import static org.thingsboard.server.common.msg.queue.TopicPartitionInfo.withTop public class EdqsProcessor implements TbQueueHandler, TbProtoQueueMsg> { private final EdqsQueueFactory queueFactory; - private final EdqsConverter converter; + private final EdqsMapper mapper; private final EdqsRepository repository; private final EdqsConfig config; private final EdqsExecutors edqsExecutors; @@ -92,8 +92,7 @@ public class EdqsProcessor implements TbQueueHandler, private PartitionedQueueConsumerManager> eventConsumer; private PartitionedQueueResponseTemplate, TbProtoQueueMsg> responseTemplate; private ListeningExecutorService requestExecutor; - - private final VersionsStore versionsStore = new VersionsStore(); + private VersionsStore versionsStore; private final AtomicInteger counter = new AtomicInteger(); @Getter @@ -110,6 +109,7 @@ public class EdqsProcessor implements TbQueueHandler, } }; requestExecutor = edqsExecutors.getRequestExecutor(); + versionsStore = new VersionsStore(config.getVersionsCacheTtl()); eventConsumer = PartitionedQueueConsumerManager.>create() .queueKey(new QueueKey(ServiceType.EDQS, config.getEventsTopic())) @@ -224,22 +224,20 @@ public class EdqsProcessor implements TbQueueHandler, TenantId tenantId = getTenantId(edqsMsg); ObjectType objectType = ObjectType.valueOf(eventMsg.getObjectType()); EdqsEventType eventType = EdqsEventType.valueOf(eventMsg.getEventType()); - String key = eventMsg.getKey(); Long version = eventMsg.hasVersion() ? eventMsg.getVersion() : null; + EdqsObject object = mapper.deserialize(objectType, eventMsg.getData().toByteArray(), false); if (version != null) { - if (!versionsStore.isNew(key, version)) { + if (!versionsStore.isNew(mapper.getKey(object), version)) { return; } } else if (!ObjectType.unversionedTypes.contains(objectType)) { - log.warn("[{}] {} {} doesn't have version", tenantId, objectType, key); + log.warn("[{}] {} doesn't have version: {}", tenantId, objectType, object); } if (backup) { - stateService.save(tenantId, objectType, key, eventType, edqsMsg); + stateService.save(tenantId, objectType, object.stringKey(), eventType, edqsMsg); } - EdqsObject object = converter.deserialize(objectType, eventMsg.getData().toByteArray()); - log.debug("[{}] Processing event [{}] [{}] [{}] [{}]", tenantId, objectType, eventType, key, version); int count = counter.incrementAndGet(); if (count % 100000 == 0) { log.info("Processed {} events", count); @@ -251,6 +249,7 @@ public class EdqsProcessor implements TbQueueHandler, .eventType(eventType) .object(object) .build(); + log.debug("Processing event: {}", event); repository.processEvent(event); } } diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java index 7ad0258cbb..b506c10473 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java @@ -22,11 +22,13 @@ import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; import org.thingsboard.server.common.data.ObjectType; import org.thingsboard.server.common.data.edqs.EdqsEventType; +import org.thingsboard.server.common.data.edqs.EdqsObject; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.edqs.processor.EdqsProcessor; import org.thingsboard.server.edqs.processor.EdqsProducer; +import org.thingsboard.server.edqs.util.EdqsMapper; import org.thingsboard.server.edqs.util.VersionsStore; import org.thingsboard.server.gen.transport.TransportProtos.EdqsEventMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg; @@ -63,6 +65,7 @@ public class KafkaEdqsStateService implements EdqsStateService { private final KafkaEdqsQueueFactory queueFactory; private final DiscoveryService discoveryService; private final EdqsExecutors edqsExecutors; + private final EdqsMapper mapper; @Autowired @Lazy private EdqsProcessor edqsProcessor; @@ -71,8 +74,8 @@ public class KafkaEdqsStateService implements EdqsStateService { private KafkaQueueStateService, TbProtoQueueMsg> queueStateService; private QueueConsumerManager> eventsToBackupConsumer; private EdqsProducer stateProducer; + private VersionsStore versionsStore; - private final VersionsStore versionsStore = new VersionsStore(); private final AtomicInteger stateReadCount = new AtomicInteger(); private final AtomicInteger eventsReadCount = new AtomicInteger(); @@ -80,6 +83,7 @@ public class KafkaEdqsStateService implements EdqsStateService { @Override public void init(PartitionedQueueConsumerManager> eventConsumer, List> otherConsumers) { + versionsStore = new VersionsStore(config.getVersionsCacheTtl()); TbKafkaAdmin queueAdmin = queueFactory.getEdqsQueueAdmin(); stateConsumer = PartitionedQueueConsumerManager.>create() .queueKey(new QueueKey(ServiceType.EDQS, config.getStateTopic())) @@ -122,22 +126,25 @@ public class KafkaEdqsStateService implements EdqsStateService { if (msg.hasEventMsg()) { EdqsEventMsg eventMsg = msg.getEventMsg(); - String key = eventMsg.getKey(); - int count = eventsReadCount.incrementAndGet(); - if (count % 100000 == 0) { - log.info("[events-to-backup] Processed {} msgs", count); - } + ObjectType objectType = ObjectType.valueOf(eventMsg.getObjectType()); + EdqsObject object = mapper.deserialize(objectType, eventMsg.getData().toByteArray(), true); + if (eventMsg.hasVersion()) { - if (!versionsStore.isNew(key, eventMsg.getVersion())) { + if (!versionsStore.isNew(mapper.getKey(object), eventMsg.getVersion())) { continue; } } TenantId tenantId = getTenantId(msg); - ObjectType objectType = ObjectType.valueOf(eventMsg.getObjectType()); EdqsEventType eventType = EdqsEventType.valueOf(eventMsg.getEventType()); + String key = object.stringKey(); log.trace("[{}] Saving to backup [{}] [{}] [{}]", tenantId, objectType, eventType, key); - stateProducer.send(tenantId, objectType, key, msg); + stateProducer.send(tenantId, objectType, object.stringKey(), msg); + + int count = eventsReadCount.incrementAndGet(); + if (count % 100000 == 0) { + log.info("[events-to-backup] Processed {} msgs", count); + } } } catch (Throwable t) { log.error("Failed to process message: {}", queueMsg, t); diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/util/EdqsConverter.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/util/DefaultEdqsMapper.java similarity index 80% rename from common/edqs/src/main/java/org/thingsboard/server/edqs/util/EdqsConverter.java rename to common/edqs/src/main/java/org/thingsboard/server/edqs/util/DefaultEdqsMapper.java index 9a84436f36..0d3fc35b08 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/util/EdqsConverter.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/util/DefaultEdqsMapper.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.deser.std.StdDeserializer; -import com.fasterxml.jackson.databind.json.JsonMapper; import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.protobuf.ByteString; import lombok.RequiredArgsConstructor; @@ -36,6 +35,7 @@ import org.thingsboard.server.common.data.ObjectType; import org.thingsboard.server.common.data.edqs.AttributeKv; import org.thingsboard.server.common.data.edqs.DataPoint; import org.thingsboard.server.common.data.edqs.EdqsObject; +import org.thingsboard.server.common.data.edqs.EdqsObjectKey; import org.thingsboard.server.common.data.edqs.Entity; import org.thingsboard.server.common.data.edqs.LatestTsKv; import org.thingsboard.server.common.data.edqs.fields.FieldsUtil; @@ -52,6 +52,7 @@ import org.thingsboard.server.edqs.data.dp.DoubleDataPoint; import org.thingsboard.server.edqs.data.dp.JsonDataPoint; import org.thingsboard.server.edqs.data.dp.LongDataPoint; import org.thingsboard.server.edqs.data.dp.StringDataPoint; +import org.thingsboard.server.edqs.repo.KeyDictionary; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.DataPointProto; import org.xerial.snappy.Snappy; @@ -65,19 +66,29 @@ import java.util.UUID; @Service @RequiredArgsConstructor @Slf4j -public class EdqsConverter { +public class DefaultEdqsMapper implements EdqsMapper { private final EdqsStatsService edqsStatsService; @Value("${queue.edqs.string_compression_length_threshold:512}") private int stringCompressionLengthThreshold; - private final Map> converters = new HashMap<>(); - private final Converter defaultConverter = new JsonConverter<>(Entity.class); + private final Map> mappers = new HashMap<>(); + private final Mapper defaultMapper = new JsonMapper<>(Entity.class) { + @Override + public EdqsObjectKey getKey(Entity entity) { + return new Entity.Key(entity.getFields().getId()); + } + }; { - converters.put(ObjectType.RELATION, new JsonConverter<>(EntityRelation.class)); - converters.put(ObjectType.ATTRIBUTE_KV, new Converter() { + mappers.put(ObjectType.RELATION, new JsonMapper<>(EntityRelation.class) { + @Override + public EdqsObjectKey getKey(EntityRelation relation) { + return new EntityRelation.Key(relation.getFrom().getId(), relation.getTo().getId(), relation.getTypeGroup(), relation.getType()); + } + }); + mappers.put(ObjectType.ATTRIBUTE_KV, new Mapper() { @Override public byte[] serialize(ObjectType type, AttributeKv attributeKv) { var proto = TransportProtos.AttributeKvProto.newBuilder() @@ -94,12 +105,12 @@ public class EdqsConverter { } @Override - public AttributeKv deserialize(ObjectType type, byte[] bytes) throws Exception { + public AttributeKv deserialize(ObjectType type, byte[] bytes, boolean onlyKey) throws Exception { TransportProtos.AttributeKvProto proto = TransportProtos.AttributeKvProto.parseFrom(bytes); EntityId entityId = EntityIdFactory.getByTypeAndUuid(ProtoUtils.fromProto(proto.getEntityType()), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB())); AttributeScope scope = AttributeScope.values()[proto.getScope().getNumber()]; - DataPoint dataPoint = proto.hasDataPoint() ? fromDataPointProto(proto.getDataPoint()) : null; + DataPoint dataPoint = onlyKey || !proto.hasDataPoint() ? null : fromDataPointProto(proto.getDataPoint()); return AttributeKv.builder() .entityId(entityId) .scope(scope) @@ -108,8 +119,13 @@ public class EdqsConverter { .dataPoint(dataPoint) .build(); } + + @Override + public EdqsObjectKey getKey(AttributeKv attributeKv) { + return new AttributeKv.Key(attributeKv.getEntityId().getId(), attributeKv.getScope(), KeyDictionary.get(attributeKv.getKey())); + } }); - converters.put(ObjectType.LATEST_TS_KV, new Converter() { + mappers.put(ObjectType.LATEST_TS_KV, new Mapper() { @Override public byte[] serialize(ObjectType type, LatestTsKv latestTsKv) { var proto = TransportProtos.LatestTsKvProto.newBuilder() @@ -125,11 +141,11 @@ public class EdqsConverter { } @Override - public LatestTsKv deserialize(ObjectType type, byte[] bytes) throws Exception { + public LatestTsKv deserialize(ObjectType type, byte[] bytes, boolean onlyKey) throws Exception { TransportProtos.LatestTsKvProto proto = TransportProtos.LatestTsKvProto.parseFrom(bytes); EntityId entityId = EntityIdFactory.getByTypeAndUuid(ProtoUtils.fromProto(proto.getEntityType()), new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB())); - DataPoint dataPoint = proto.hasDataPoint() ? fromDataPointProto(proto.getDataPoint()) : null; + DataPoint dataPoint = onlyKey || !proto.hasDataPoint() ? null : fromDataPointProto(proto.getDataPoint()); return LatestTsKv.builder() .entityId(entityId) .key(proto.getKey()) @@ -137,6 +153,11 @@ public class EdqsConverter { .dataPoint(dataPoint) .build(); } + + @Override + public EdqsObjectKey getKey(LatestTsKv latestTsKv) { + return new LatestTsKv.Key(latestTsKv.getEntityId().getId(), KeyDictionary.get(latestTsKv.getKey())); + } }); } @@ -223,30 +244,30 @@ public class EdqsConverter { @SuppressWarnings("unchecked") @SneakyThrows - public byte[] serialize(ObjectType type, T value) { - Converter converter = (Converter) converters.get(type); - if (converter != null) { - return converter.serialize(type, value); - } else { - return defaultConverter.serialize(type, (Entity) value); - } + public byte[] serialize(T value) { + ObjectType type = value.type(); + Mapper mapper = (Mapper) mappers.getOrDefault(type, defaultMapper); + return mapper.serialize(type, value); } @SneakyThrows - public EdqsObject deserialize(ObjectType type, byte[] bytes) { - Converter converter = converters.get(type); - if (converter != null) { - return converter.deserialize(type, bytes); - } else { - return defaultConverter.deserialize(type, bytes); - } + public EdqsObject deserialize(ObjectType type, byte[] bytes, boolean onlyKey) { + Mapper mapper = mappers.getOrDefault(type, defaultMapper); + return mapper.deserialize(type, bytes, onlyKey); + } + + @SuppressWarnings("unchecked") + @SneakyThrows + public EdqsObjectKey getKey(T object) { + Mapper mapper = (Mapper) mappers.getOrDefault(object.type(), defaultMapper); + return mapper.getKey(object); } @RequiredArgsConstructor - private static class JsonConverter implements Converter { + private static abstract class JsonMapper implements Mapper { private static final SimpleModule module = new SimpleModule(); - private static final ObjectMapper mapper = JsonMapper.builder() + private static final ObjectMapper mapper = com.fasterxml.jackson.databind.json.JsonMapper.builder() .visibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY) .visibility(PropertyAccessor.GETTER, JsonAutoDetect.Visibility.NONE) .visibility(PropertyAccessor.IS_GETTER, JsonAutoDetect.Visibility.NONE) @@ -267,17 +288,19 @@ public class EdqsConverter { @SneakyThrows @Override - public T deserialize(ObjectType objectType, byte[] bytes) { + public T deserialize(ObjectType objectType, byte[] bytes, boolean onlyKey) { return mapper.readValue(bytes, this.type); } } - private interface Converter { + private interface Mapper { byte[] serialize(ObjectType type, T value) throws Exception; - T deserialize(ObjectType type, byte[] bytes) throws Exception; + T deserialize(ObjectType type, byte[] bytes, boolean onlyKey) throws Exception; + + EdqsObjectKey getKey(T object); } diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/util/EdqsMapper.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/util/EdqsMapper.java new file mode 100644 index 0000000000..bd6ed22cd1 --- /dev/null +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/util/EdqsMapper.java @@ -0,0 +1,30 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.edqs.util; + +import org.thingsboard.server.common.data.ObjectType; +import org.thingsboard.server.common.data.edqs.EdqsObject; +import org.thingsboard.server.common.data.edqs.EdqsObjectKey; + +public interface EdqsMapper { + + byte[] serialize(T value); + + EdqsObject deserialize(ObjectType type, byte[] bytes, boolean onlyKey); + + EdqsObjectKey getKey(T object); + +} diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/util/VersionsStore.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/util/VersionsStore.java index 4a09a76e9a..ba3263eec2 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/util/VersionsStore.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/util/VersionsStore.java @@ -18,6 +18,7 @@ package org.thingsboard.server.edqs.util; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.edqs.EdqsObjectKey; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -25,11 +26,15 @@ import java.util.concurrent.atomic.AtomicBoolean; @Slf4j public class VersionsStore { - private final Cache versions = Caffeine.newBuilder() - .expireAfterWrite(24, TimeUnit.HOURS) - .build(); + private final Cache versions; - public boolean isNew(String key, Long version) { + public VersionsStore(int ttlMinutes) { + this.versions = Caffeine.newBuilder() + .expireAfterWrite(ttlMinutes, TimeUnit.MINUTES) + .build(); + } + + public boolean isNew(EdqsObjectKey key, Long version) { AtomicBoolean isNew = new AtomicBoolean(false); versions.asMap().compute(key, (k, prevVersion) -> { if (prevVersion == null || prevVersion <= version) { diff --git a/common/proto/src/main/proto/queue.proto b/common/proto/src/main/proto/queue.proto index e874c81444..cc5fc88f72 100644 --- a/common/proto/src/main/proto/queue.proto +++ b/common/proto/src/main/proto/queue.proto @@ -1833,7 +1833,6 @@ message FromEdqsMsg { } message EdqsEventMsg { - string key = 1; string objectType = 2; bytes data = 3; string eventType = 4; diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsConfig.java b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsConfig.java index 4a9b9f2a28..18f46bd358 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsConfig.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/edqs/EdqsConfig.java @@ -46,6 +46,8 @@ public class EdqsConfig { private int maxRequestTimeout; @Value("${queue.edqs.request_executor_size:50}") private int requestExecutorSize; + @Value("${queue.edqs.versions_cache_ttl:60}") + private int versionsCacheTtl; public String getLabel() { if (partitioningStrategy == EdqsPartitioningStrategy.NONE) { diff --git a/edqs/src/main/resources/edqs.yml b/edqs/src/main/resources/edqs.yml index 0815187704..9ac8455abb 100644 --- a/edqs/src/main/resources/edqs.yml +++ b/edqs/src/main/resources/edqs.yml @@ -73,6 +73,8 @@ queue: max_request_timeout: "${TB_EDQS_MAX_REQUEST_TIMEOUT:20000}" # Thread pool size for EDQS requests executor request_executor_size: "${TB_EDQS_REQUEST_EXECUTOR_SIZE:50}" + # Time to live for EDQS versions cache in minutes. Must be bigger than the time taken for the sync process. + versions_cache_ttl: "${TB_EDQS_VERSIONS_CACHE_TTL_MINUTES:60}" # Strings longer than this threshold will be compressed string_compression_length_threshold: "${TB_EDQS_STRING_COMPRESSION_LENGTH_THRESHOLD:512}" stats: diff --git a/edqs/src/test/java/org/thingsboard/server/edqs/repo/AbstractEDQTest.java b/edqs/src/test/java/org/thingsboard/server/edqs/repo/AbstractEDQTest.java index 330d22a3c6..a7f80fdffc 100644 --- a/edqs/src/test/java/org/thingsboard/server/edqs/repo/AbstractEDQTest.java +++ b/edqs/src/test/java/org/thingsboard/server/edqs/repo/AbstractEDQTest.java @@ -60,7 +60,8 @@ import org.thingsboard.server.common.data.query.StringFilterPredicate; import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.RelationTypeGroup; import org.thingsboard.server.common.stats.DummyEdqsStatsService; -import org.thingsboard.server.edqs.util.EdqsConverter; +import org.thingsboard.server.edqs.util.DefaultEdqsMapper; +import org.thingsboard.server.edqs.util.EdqsMapper; import java.util.Collections; import java.util.List; @@ -79,7 +80,7 @@ public abstract class AbstractEDQTest { @Autowired protected DefaultEdqsRepository repository; @Autowired - protected EdqsConverter edqsConverter; + protected EdqsMapper edqsMapper; @MockBean private DummyEdqsStatsService edqsStatsService; @@ -244,12 +245,12 @@ public abstract class AbstractEDQTest { } protected void addOrUpdate(EntityType entityType, Object entity) { - addOrUpdate(EdqsConverter.toEntity(entityType, entity)); + addOrUpdate(DefaultEdqsMapper.toEntity(entityType, entity)); } protected void addOrUpdate(EdqsObject edqsObject) { - byte[] serialized = edqsConverter.serialize(edqsObject.type(), edqsObject); - edqsObject = edqsConverter.deserialize(edqsObject.type(), serialized); + byte[] serialized = edqsMapper.serialize(edqsObject); + edqsObject = edqsMapper.deserialize(edqsObject.type(), serialized, false); repository.get(tenantId).addOrUpdate(edqsObject); }