Refactoring and improvements for EDQS
This commit is contained in:
parent
d21b7fe064
commit
069f052122
@ -39,6 +39,7 @@ import org.thingsboard.server.common.data.ObjectType;
|
||||
import org.thingsboard.server.common.data.edqs.EdqsEventType;
|
||||
import org.thingsboard.server.common.data.edqs.EdqsObject;
|
||||
import org.thingsboard.server.common.data.edqs.EdqsState;
|
||||
import org.thingsboard.server.common.data.edqs.EdqsState.EdqsApiMode;
|
||||
import org.thingsboard.server.common.data.edqs.EdqsState.EdqsSyncStatus;
|
||||
import org.thingsboard.server.common.data.edqs.EdqsSyncRequest;
|
||||
import org.thingsboard.server.common.data.edqs.Entity;
|
||||
@ -55,7 +56,8 @@ import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||
import org.thingsboard.server.dao.attributes.AttributesService;
|
||||
import org.thingsboard.server.edqs.processor.EdqsProducer;
|
||||
import org.thingsboard.server.edqs.state.EdqsPartitionService;
|
||||
import org.thingsboard.server.edqs.util.EdqsConverter;
|
||||
import org.thingsboard.server.edqs.util.DefaultEdqsMapper;
|
||||
import org.thingsboard.server.edqs.util.EdqsMapper;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.EdqsEventMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ServiceInfo;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
|
||||
@ -82,7 +84,7 @@ import java.util.concurrent.TimeUnit;
|
||||
public class DefaultEdqsService implements EdqsService {
|
||||
|
||||
private final EdqsClientQueueFactory queueFactory;
|
||||
private final EdqsConverter edqsConverter;
|
||||
private final EdqsMapper edqsMapper;
|
||||
private final EdqsSyncService edqsSyncService;
|
||||
private final EdqsApiService edqsApiService;
|
||||
private final DistributedLockService distributedLockService;
|
||||
@ -182,7 +184,12 @@ public class DefaultEdqsService implements EdqsService {
|
||||
log.info("Processing system msg {}", msg);
|
||||
try {
|
||||
if (msg.getApiEnabled() != null) {
|
||||
state.setApiEnabled(msg.getApiEnabled());
|
||||
if (msg.getApiEnabled()) {
|
||||
state.setApiMode(EdqsApiMode.ENABLED);
|
||||
} else {
|
||||
state.setApiMode(EdqsApiMode.DISABLED);
|
||||
}
|
||||
log.info("New state: {}", state);
|
||||
}
|
||||
if (msg.getEdqsReady() != null) {
|
||||
onEdqsReady(msg.getEdqsReady());
|
||||
@ -252,8 +259,8 @@ public class DefaultEdqsService implements EdqsService {
|
||||
|
||||
if (state.isApiReady()) {
|
||||
if (autoEnableApi) {
|
||||
if (state.getApiEnabled() == null) {
|
||||
state.setApiEnabled(true);
|
||||
if (state.getApiMode() == null || state.getApiMode() == EdqsApiMode.AUTO_DISABLED) {
|
||||
state.setApiMode(EdqsApiMode.AUTO_ENABLED);
|
||||
log.info("New state: {}. Auto-enabled EDQS API", state);
|
||||
} else {
|
||||
log.info("New state: {}. API mode left as is", state);
|
||||
@ -263,7 +270,7 @@ public class DefaultEdqsService implements EdqsService {
|
||||
}
|
||||
} else {
|
||||
if (state.isApiEnabled()) {
|
||||
state.setApiEnabled(false);
|
||||
state.setApiMode(EdqsApiMode.AUTO_DISABLED);
|
||||
log.info("New state: {}. Disabled EDQS API", state);
|
||||
} else {
|
||||
log.info("New state: {}. API left disabled", state);
|
||||
@ -284,7 +291,7 @@ public class DefaultEdqsService implements EdqsService {
|
||||
log.trace("[{}][{}] Ignoring update event, type {} not supported", tenantId, entityId, entityType);
|
||||
return;
|
||||
}
|
||||
onUpdate(tenantId, objectType, edqsConverter.toEntity(entityType, entity));
|
||||
onUpdate(tenantId, objectType, DefaultEdqsMapper.toEntity(entityType, entity));
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -311,12 +318,11 @@ public class DefaultEdqsService implements EdqsService {
|
||||
protected void processEvent(TenantId tenantId, ObjectType objectType, EdqsEventType eventType, EdqsObject object) {
|
||||
executor.submit(() -> {
|
||||
try {
|
||||
String key = object.key();
|
||||
String key = object.stringKey();
|
||||
Long version = object.version();
|
||||
EdqsEventMsg.Builder eventMsg = EdqsEventMsg.newBuilder()
|
||||
.setKey(key)
|
||||
.setObjectType(objectType.name())
|
||||
.setData(ByteString.copyFrom(edqsConverter.serialize(objectType, object)))
|
||||
.setData(ByteString.copyFrom(edqsMapper.serialize(object)))
|
||||
.setEventType(eventType.name());
|
||||
if (version != null) {
|
||||
eventMsg.setVersion(version);
|
||||
|
||||
@ -1782,6 +1782,8 @@ queue:
|
||||
max_request_timeout: "${TB_EDQS_MAX_REQUEST_TIMEOUT:20000}"
|
||||
# Thread pool size for EDQS requests executor
|
||||
request_executor_size: "${TB_EDQS_REQUEST_EXECUTOR_SIZE:50}"
|
||||
# Time to live for EDQS versions cache in minutes. Must be bigger than the time taken for the sync process.
|
||||
versions_cache_ttl: "${TB_EDQS_VERSIONS_CACHE_TTL_MINUTES:60}"
|
||||
# Strings longer than this threshold will be compressed
|
||||
string_compression_length_threshold: "${TB_EDQS_STRING_COMPRESSION_LENGTH_THRESHOLD:512}"
|
||||
stats:
|
||||
|
||||
@ -15,10 +15,15 @@
|
||||
*/
|
||||
package org.thingsboard.server.controller;
|
||||
|
||||
import org.assertj.core.api.ThrowingConsumer;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.mock.mockito.MockBean;
|
||||
import org.springframework.test.context.TestPropertySource;
|
||||
import org.thingsboard.server.common.data.edqs.EdqsState;
|
||||
import org.thingsboard.server.common.data.edqs.EdqsState.EdqsApiMode;
|
||||
import org.thingsboard.server.common.data.edqs.ToCoreEdqsRequest;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.query.EntityCountQuery;
|
||||
import org.thingsboard.server.common.data.query.EntityData;
|
||||
@ -26,9 +31,11 @@ import org.thingsboard.server.common.data.query.EntityDataQuery;
|
||||
import org.thingsboard.server.common.msg.edqs.EdqsService;
|
||||
import org.thingsboard.server.dao.service.DaoSqlTest;
|
||||
import org.thingsboard.server.edqs.util.EdqsRocksDb;
|
||||
import org.thingsboard.server.queue.discovery.DiscoveryService;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.awaitility.Awaitility.await;
|
||||
|
||||
@DaoSqlTest
|
||||
@ -39,13 +46,16 @@ import static org.awaitility.Awaitility.await;
|
||||
"queue.edqs.api.supported=true",
|
||||
"queue.edqs.api.auto_enable=true",
|
||||
"queue.edqs.mode=local",
|
||||
"queue.edqs.readiness_check_interval=1000"
|
||||
"queue.edqs.readiness_check_interval=500"
|
||||
})
|
||||
public class EdqsEntityQueryControllerTest extends EntityQueryControllerTest {
|
||||
|
||||
@Autowired
|
||||
private EdqsService edqsService;
|
||||
|
||||
@Autowired
|
||||
private DiscoveryService discoveryService;
|
||||
|
||||
@MockBean // so that we don't do backup for tests
|
||||
private EdqsRocksDb edqsRocksDb;
|
||||
|
||||
@ -66,4 +76,61 @@ public class EdqsEntityQueryControllerTest extends EntityQueryControllerTest {
|
||||
result -> result == expectedResult);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEdqsState() {
|
||||
assertThat(edqsService.getState().getApiMode()).isEqualTo(EdqsApiMode.AUTO_ENABLED);
|
||||
|
||||
// notifying EDQS is not ready: API should be auto-disabled
|
||||
discoveryService.setReady(false);
|
||||
verifyState(state -> {
|
||||
assertThat(state.getApiMode()).isEqualTo(EdqsApiMode.AUTO_DISABLED);
|
||||
assertThat(state.getEdqsReady()).isFalse();
|
||||
assertThat(state.isApiEnabled()).isFalse();
|
||||
});
|
||||
|
||||
// manually disabling API
|
||||
edqsService.processSystemRequest(ToCoreEdqsRequest.builder()
|
||||
.apiEnabled(false)
|
||||
.build());
|
||||
verifyState(state -> {
|
||||
assertThat(state.getApiMode()).isEqualTo(EdqsApiMode.DISABLED);
|
||||
assertThat(state.getEdqsReady()).isFalse();
|
||||
assertThat(state.isApiEnabled()).isFalse();
|
||||
});
|
||||
|
||||
// notifying EDQS is ready: API should not be enabled automatically because manually disabled previously
|
||||
discoveryService.setReady(true);
|
||||
verifyState(state -> {
|
||||
assertThat(state.getEdqsReady()).isTrue();
|
||||
assertThat(state.getApiMode()).isEqualTo(EdqsApiMode.DISABLED);
|
||||
assertThat(state.isApiEnabled()).isFalse();
|
||||
});
|
||||
|
||||
// manually enabling API
|
||||
edqsService.processSystemRequest(ToCoreEdqsRequest.builder()
|
||||
.apiEnabled(true)
|
||||
.build());
|
||||
verifyState(state -> {
|
||||
assertThat(state.getApiMode()).isEqualTo(EdqsApiMode.ENABLED);
|
||||
assertThat(state.getEdqsReady()).isTrue();
|
||||
assertThat(state.isApiEnabled()).isTrue();
|
||||
});
|
||||
|
||||
// notifying EDQS is not ready: API should be auto-disabled
|
||||
discoveryService.setReady(false);
|
||||
verifyState(state -> {
|
||||
assertThat(state.getApiMode()).isEqualTo(EdqsApiMode.AUTO_DISABLED);
|
||||
assertThat(state.getEdqsReady()).isFalse();
|
||||
assertThat(state.isApiEnabled()).isFalse();
|
||||
});
|
||||
|
||||
discoveryService.setReady(true);
|
||||
}
|
||||
|
||||
private void verifyState(ThrowingConsumer<EdqsState> assertion) {
|
||||
await().atMost(TIMEOUT, TimeUnit.SECONDS).untilAsserted(() -> {
|
||||
assertThat(edqsService.getState()).satisfies(assertion);
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -25,6 +25,8 @@ import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
|
||||
import org.thingsboard.server.common.data.kv.KvEntry;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
@ -58,7 +60,7 @@ public class AttributeKv implements EdqsObject {
|
||||
}
|
||||
|
||||
@Override
|
||||
public String key() {
|
||||
public String stringKey() {
|
||||
return "a_" + entityId + "_" + scope + "_" + key;
|
||||
}
|
||||
|
||||
@ -72,4 +74,6 @@ public class AttributeKv implements EdqsObject {
|
||||
return ObjectType.ATTRIBUTE_KV;
|
||||
}
|
||||
|
||||
public record Key(UUID entityId, AttributeScope scope, int key) implements EdqsObjectKey {}
|
||||
|
||||
}
|
||||
|
||||
@ -21,7 +21,7 @@ import org.thingsboard.server.common.data.ObjectType;
|
||||
public interface EdqsObject {
|
||||
|
||||
@JsonIgnore
|
||||
String key();
|
||||
String stringKey();
|
||||
|
||||
@JsonIgnore
|
||||
Long version();
|
||||
|
||||
@ -0,0 +1,18 @@
|
||||
/**
|
||||
* Copyright © 2016-2025 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.common.data.edqs;
|
||||
|
||||
public interface EdqsObjectKey {}
|
||||
@ -18,6 +18,7 @@ package org.thingsboard.server.common.data.edqs;
|
||||
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.Setter;
|
||||
import org.apache.commons.lang3.BooleanUtils;
|
||||
|
||||
@Getter
|
||||
@ -26,9 +27,10 @@ import org.apache.commons.lang3.BooleanUtils;
|
||||
public class EdqsState {
|
||||
|
||||
private Boolean edqsReady;
|
||||
@Setter
|
||||
private EdqsSyncStatus syncStatus;
|
||||
|
||||
private Boolean apiEnabled; // null until auto-enabled or set manually
|
||||
@Setter
|
||||
private EdqsApiMode apiMode;
|
||||
|
||||
public boolean setEdqsReady(boolean ready) {
|
||||
boolean changed = BooleanUtils.toBooleanDefaultIfNull(this.edqsReady, false) != ready;
|
||||
@ -36,22 +38,12 @@ public class EdqsState {
|
||||
return changed;
|
||||
}
|
||||
|
||||
public void setSyncStatus(EdqsSyncStatus syncStatus) {
|
||||
this.syncStatus = syncStatus;
|
||||
}
|
||||
|
||||
public boolean setApiEnabled(boolean apiEnabled) {
|
||||
boolean changed = BooleanUtils.toBooleanDefaultIfNull(this.apiEnabled, false) != apiEnabled;
|
||||
this.apiEnabled = apiEnabled;
|
||||
return changed;
|
||||
}
|
||||
|
||||
public boolean isApiReady() {
|
||||
return edqsReady && syncStatus == EdqsSyncStatus.FINISHED;
|
||||
}
|
||||
|
||||
public boolean isApiEnabled() {
|
||||
return apiEnabled != null && apiEnabled;
|
||||
return apiMode != null && (apiMode == EdqsApiMode.ENABLED || apiMode == EdqsApiMode.AUTO_ENABLED);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -59,7 +51,7 @@ public class EdqsState {
|
||||
return '[' +
|
||||
"EDQS ready: " + edqsReady +
|
||||
", sync status: " + syncStatus +
|
||||
", API enabled: " + apiEnabled +
|
||||
", API mode: " + apiMode +
|
||||
']';
|
||||
}
|
||||
|
||||
@ -70,4 +62,11 @@ public class EdqsState {
|
||||
FAILED
|
||||
}
|
||||
|
||||
public enum EdqsApiMode {
|
||||
ENABLED,
|
||||
AUTO_ENABLED,
|
||||
DISABLED,
|
||||
AUTO_DISABLED
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -51,7 +51,7 @@ public class Entity implements EdqsObject {
|
||||
}
|
||||
|
||||
@Override
|
||||
public String key() {
|
||||
public String stringKey() {
|
||||
return "e_" + fields.getId().toString();
|
||||
}
|
||||
|
||||
@ -65,4 +65,6 @@ public class Entity implements EdqsObject {
|
||||
return ObjectType.fromEntityType(type);
|
||||
}
|
||||
|
||||
public record Key(UUID id) implements EdqsObjectKey {}
|
||||
|
||||
}
|
||||
|
||||
@ -24,6 +24,8 @@ import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.kv.KvEntry;
|
||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
@ -53,7 +55,8 @@ public class LatestTsKv implements EdqsObject {
|
||||
this.version = version != null ? version : 0L;
|
||||
}
|
||||
|
||||
public String key() {
|
||||
@Override
|
||||
public String stringKey() {
|
||||
return "l_" + entityId + "_" + key;
|
||||
}
|
||||
|
||||
@ -67,4 +70,6 @@ public class LatestTsKv implements EdqsObject {
|
||||
return ObjectType.LATEST_TS_KV;
|
||||
}
|
||||
|
||||
public record Key(UUID entityId, int key) implements EdqsObjectKey {}
|
||||
|
||||
}
|
||||
|
||||
@ -29,9 +29,7 @@ public interface EntityFields {
|
||||
|
||||
Logger log = LoggerFactory.getLogger(EntityFields.class);
|
||||
|
||||
default UUID getId() {
|
||||
return null;
|
||||
}
|
||||
UUID getId();
|
||||
|
||||
default UUID getTenantId() {
|
||||
return null;
|
||||
@ -147,6 +145,7 @@ public interface EntityFields {
|
||||
|
||||
default String getAsString(String key) {
|
||||
return switch (key) {
|
||||
case "id" -> getId().toString();
|
||||
case "createdTime" -> Long.toString(getCreatedTime());
|
||||
case "title" -> getName();
|
||||
case "type" -> getType();
|
||||
|
||||
@ -27,10 +27,12 @@ import org.thingsboard.server.common.data.BaseDataWithAdditionalInfo;
|
||||
import org.thingsboard.server.common.data.HasVersion;
|
||||
import org.thingsboard.server.common.data.ObjectType;
|
||||
import org.thingsboard.server.common.data.edqs.EdqsObject;
|
||||
import org.thingsboard.server.common.data.edqs.EdqsObjectKey;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.validation.Length;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.UUID;
|
||||
|
||||
@Slf4j
|
||||
@Schema
|
||||
@ -118,8 +120,8 @@ public class EntityRelation implements HasVersion, Serializable, EdqsObject {
|
||||
BaseDataWithAdditionalInfo.setJson(addInfo, json -> this.additionalInfo = json, bytes -> this.additionalInfoBytes = bytes);
|
||||
}
|
||||
|
||||
@JsonIgnore
|
||||
public String key() {
|
||||
@Override
|
||||
public String stringKey() {
|
||||
return "r_" + from + "_" + to + "_" + typeGroup + "_" + type;
|
||||
}
|
||||
|
||||
@ -133,4 +135,6 @@ public class EntityRelation implements HasVersion, Serializable, EdqsObject {
|
||||
return ObjectType.RELATION;
|
||||
}
|
||||
|
||||
public record Key(UUID from, UUID to, RelationTypeGroup typeGroup, String type) implements EdqsObjectKey {}
|
||||
|
||||
}
|
||||
|
||||
@ -45,7 +45,7 @@ import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||
import org.thingsboard.server.edqs.repo.EdqsRepository;
|
||||
import org.thingsboard.server.edqs.state.EdqsPartitionService;
|
||||
import org.thingsboard.server.edqs.state.EdqsStateService;
|
||||
import org.thingsboard.server.edqs.util.EdqsConverter;
|
||||
import org.thingsboard.server.edqs.util.EdqsMapper;
|
||||
import org.thingsboard.server.edqs.util.VersionsStore;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.EdqsEventMsg;
|
||||
@ -81,7 +81,7 @@ import static org.thingsboard.server.common.msg.queue.TopicPartitionInfo.withTop
|
||||
public class EdqsProcessor implements TbQueueHandler<TbProtoQueueMsg<ToEdqsMsg>, TbProtoQueueMsg<FromEdqsMsg>> {
|
||||
|
||||
private final EdqsQueueFactory queueFactory;
|
||||
private final EdqsConverter converter;
|
||||
private final EdqsMapper mapper;
|
||||
private final EdqsRepository repository;
|
||||
private final EdqsConfig config;
|
||||
private final EdqsExecutors edqsExecutors;
|
||||
@ -92,8 +92,7 @@ public class EdqsProcessor implements TbQueueHandler<TbProtoQueueMsg<ToEdqsMsg>,
|
||||
private PartitionedQueueConsumerManager<TbProtoQueueMsg<ToEdqsMsg>> eventConsumer;
|
||||
private PartitionedQueueResponseTemplate<TbProtoQueueMsg<ToEdqsMsg>, TbProtoQueueMsg<FromEdqsMsg>> responseTemplate;
|
||||
private ListeningExecutorService requestExecutor;
|
||||
|
||||
private final VersionsStore versionsStore = new VersionsStore();
|
||||
private VersionsStore versionsStore;
|
||||
private final AtomicInteger counter = new AtomicInteger();
|
||||
|
||||
@Getter
|
||||
@ -110,6 +109,7 @@ public class EdqsProcessor implements TbQueueHandler<TbProtoQueueMsg<ToEdqsMsg>,
|
||||
}
|
||||
};
|
||||
requestExecutor = edqsExecutors.getRequestExecutor();
|
||||
versionsStore = new VersionsStore(config.getVersionsCacheTtl());
|
||||
|
||||
eventConsumer = PartitionedQueueConsumerManager.<TbProtoQueueMsg<ToEdqsMsg>>create()
|
||||
.queueKey(new QueueKey(ServiceType.EDQS, config.getEventsTopic()))
|
||||
@ -224,22 +224,20 @@ public class EdqsProcessor implements TbQueueHandler<TbProtoQueueMsg<ToEdqsMsg>,
|
||||
TenantId tenantId = getTenantId(edqsMsg);
|
||||
ObjectType objectType = ObjectType.valueOf(eventMsg.getObjectType());
|
||||
EdqsEventType eventType = EdqsEventType.valueOf(eventMsg.getEventType());
|
||||
String key = eventMsg.getKey();
|
||||
Long version = eventMsg.hasVersion() ? eventMsg.getVersion() : null;
|
||||
EdqsObject object = mapper.deserialize(objectType, eventMsg.getData().toByteArray(), false);
|
||||
|
||||
if (version != null) {
|
||||
if (!versionsStore.isNew(key, version)) {
|
||||
if (!versionsStore.isNew(mapper.getKey(object), version)) {
|
||||
return;
|
||||
}
|
||||
} else if (!ObjectType.unversionedTypes.contains(objectType)) {
|
||||
log.warn("[{}] {} {} doesn't have version", tenantId, objectType, key);
|
||||
log.warn("[{}] {} doesn't have version: {}", tenantId, objectType, object);
|
||||
}
|
||||
if (backup) {
|
||||
stateService.save(tenantId, objectType, key, eventType, edqsMsg);
|
||||
stateService.save(tenantId, objectType, object.stringKey(), eventType, edqsMsg);
|
||||
}
|
||||
|
||||
EdqsObject object = converter.deserialize(objectType, eventMsg.getData().toByteArray());
|
||||
log.debug("[{}] Processing event [{}] [{}] [{}] [{}]", tenantId, objectType, eventType, key, version);
|
||||
int count = counter.incrementAndGet();
|
||||
if (count % 100000 == 0) {
|
||||
log.info("Processed {} events", count);
|
||||
@ -251,6 +249,7 @@ public class EdqsProcessor implements TbQueueHandler<TbProtoQueueMsg<ToEdqsMsg>,
|
||||
.eventType(eventType)
|
||||
.object(object)
|
||||
.build();
|
||||
log.debug("Processing event: {}", event);
|
||||
repository.processEvent(event);
|
||||
}
|
||||
}
|
||||
|
||||
@ -22,11 +22,13 @@ import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.server.common.data.ObjectType;
|
||||
import org.thingsboard.server.common.data.edqs.EdqsEventType;
|
||||
import org.thingsboard.server.common.data.edqs.EdqsObject;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||
import org.thingsboard.server.edqs.processor.EdqsProcessor;
|
||||
import org.thingsboard.server.edqs.processor.EdqsProducer;
|
||||
import org.thingsboard.server.edqs.util.EdqsMapper;
|
||||
import org.thingsboard.server.edqs.util.VersionsStore;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.EdqsEventMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg;
|
||||
@ -63,6 +65,7 @@ public class KafkaEdqsStateService implements EdqsStateService {
|
||||
private final KafkaEdqsQueueFactory queueFactory;
|
||||
private final DiscoveryService discoveryService;
|
||||
private final EdqsExecutors edqsExecutors;
|
||||
private final EdqsMapper mapper;
|
||||
@Autowired
|
||||
@Lazy
|
||||
private EdqsProcessor edqsProcessor;
|
||||
@ -71,8 +74,8 @@ public class KafkaEdqsStateService implements EdqsStateService {
|
||||
private KafkaQueueStateService<TbProtoQueueMsg<ToEdqsMsg>, TbProtoQueueMsg<ToEdqsMsg>> queueStateService;
|
||||
private QueueConsumerManager<TbProtoQueueMsg<ToEdqsMsg>> eventsToBackupConsumer;
|
||||
private EdqsProducer stateProducer;
|
||||
private VersionsStore versionsStore;
|
||||
|
||||
private final VersionsStore versionsStore = new VersionsStore();
|
||||
private final AtomicInteger stateReadCount = new AtomicInteger();
|
||||
private final AtomicInteger eventsReadCount = new AtomicInteger();
|
||||
|
||||
@ -80,6 +83,7 @@ public class KafkaEdqsStateService implements EdqsStateService {
|
||||
|
||||
@Override
|
||||
public void init(PartitionedQueueConsumerManager<TbProtoQueueMsg<ToEdqsMsg>> eventConsumer, List<PartitionedQueueConsumerManager<?>> otherConsumers) {
|
||||
versionsStore = new VersionsStore(config.getVersionsCacheTtl());
|
||||
TbKafkaAdmin queueAdmin = queueFactory.getEdqsQueueAdmin();
|
||||
stateConsumer = PartitionedQueueConsumerManager.<TbProtoQueueMsg<ToEdqsMsg>>create()
|
||||
.queueKey(new QueueKey(ServiceType.EDQS, config.getStateTopic()))
|
||||
@ -122,22 +126,25 @@ public class KafkaEdqsStateService implements EdqsStateService {
|
||||
|
||||
if (msg.hasEventMsg()) {
|
||||
EdqsEventMsg eventMsg = msg.getEventMsg();
|
||||
String key = eventMsg.getKey();
|
||||
int count = eventsReadCount.incrementAndGet();
|
||||
if (count % 100000 == 0) {
|
||||
log.info("[events-to-backup] Processed {} msgs", count);
|
||||
}
|
||||
ObjectType objectType = ObjectType.valueOf(eventMsg.getObjectType());
|
||||
EdqsObject object = mapper.deserialize(objectType, eventMsg.getData().toByteArray(), true);
|
||||
|
||||
if (eventMsg.hasVersion()) {
|
||||
if (!versionsStore.isNew(key, eventMsg.getVersion())) {
|
||||
if (!versionsStore.isNew(mapper.getKey(object), eventMsg.getVersion())) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
TenantId tenantId = getTenantId(msg);
|
||||
ObjectType objectType = ObjectType.valueOf(eventMsg.getObjectType());
|
||||
EdqsEventType eventType = EdqsEventType.valueOf(eventMsg.getEventType());
|
||||
String key = object.stringKey();
|
||||
log.trace("[{}] Saving to backup [{}] [{}] [{}]", tenantId, objectType, eventType, key);
|
||||
stateProducer.send(tenantId, objectType, key, msg);
|
||||
stateProducer.send(tenantId, objectType, object.stringKey(), msg);
|
||||
|
||||
int count = eventsReadCount.incrementAndGet();
|
||||
if (count % 100000 == 0) {
|
||||
log.info("[events-to-backup] Processed {} msgs", count);
|
||||
}
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
log.error("Failed to process message: {}", queueMsg, t);
|
||||
|
||||
@ -21,7 +21,6 @@ import com.fasterxml.jackson.core.JsonParser;
|
||||
import com.fasterxml.jackson.databind.DeserializationContext;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
|
||||
import com.fasterxml.jackson.databind.json.JsonMapper;
|
||||
import com.fasterxml.jackson.databind.module.SimpleModule;
|
||||
import com.google.protobuf.ByteString;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
@ -36,6 +35,7 @@ import org.thingsboard.server.common.data.ObjectType;
|
||||
import org.thingsboard.server.common.data.edqs.AttributeKv;
|
||||
import org.thingsboard.server.common.data.edqs.DataPoint;
|
||||
import org.thingsboard.server.common.data.edqs.EdqsObject;
|
||||
import org.thingsboard.server.common.data.edqs.EdqsObjectKey;
|
||||
import org.thingsboard.server.common.data.edqs.Entity;
|
||||
import org.thingsboard.server.common.data.edqs.LatestTsKv;
|
||||
import org.thingsboard.server.common.data.edqs.fields.FieldsUtil;
|
||||
@ -52,6 +52,7 @@ import org.thingsboard.server.edqs.data.dp.DoubleDataPoint;
|
||||
import org.thingsboard.server.edqs.data.dp.JsonDataPoint;
|
||||
import org.thingsboard.server.edqs.data.dp.LongDataPoint;
|
||||
import org.thingsboard.server.edqs.data.dp.StringDataPoint;
|
||||
import org.thingsboard.server.edqs.repo.KeyDictionary;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.DataPointProto;
|
||||
import org.xerial.snappy.Snappy;
|
||||
@ -65,19 +66,29 @@ import java.util.UUID;
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j
|
||||
public class EdqsConverter {
|
||||
public class DefaultEdqsMapper implements EdqsMapper {
|
||||
|
||||
private final EdqsStatsService edqsStatsService;
|
||||
|
||||
@Value("${queue.edqs.string_compression_length_threshold:512}")
|
||||
private int stringCompressionLengthThreshold;
|
||||
|
||||
private final Map<ObjectType, Converter<? extends EdqsObject>> converters = new HashMap<>();
|
||||
private final Converter<Entity> defaultConverter = new JsonConverter<>(Entity.class);
|
||||
private final Map<ObjectType, Mapper<? extends EdqsObject>> mappers = new HashMap<>();
|
||||
private final Mapper<Entity> defaultMapper = new JsonMapper<>(Entity.class) {
|
||||
@Override
|
||||
public EdqsObjectKey getKey(Entity entity) {
|
||||
return new Entity.Key(entity.getFields().getId());
|
||||
}
|
||||
};
|
||||
|
||||
{
|
||||
converters.put(ObjectType.RELATION, new JsonConverter<>(EntityRelation.class));
|
||||
converters.put(ObjectType.ATTRIBUTE_KV, new Converter<AttributeKv>() {
|
||||
mappers.put(ObjectType.RELATION, new JsonMapper<>(EntityRelation.class) {
|
||||
@Override
|
||||
public EdqsObjectKey getKey(EntityRelation relation) {
|
||||
return new EntityRelation.Key(relation.getFrom().getId(), relation.getTo().getId(), relation.getTypeGroup(), relation.getType());
|
||||
}
|
||||
});
|
||||
mappers.put(ObjectType.ATTRIBUTE_KV, new Mapper<AttributeKv>() {
|
||||
@Override
|
||||
public byte[] serialize(ObjectType type, AttributeKv attributeKv) {
|
||||
var proto = TransportProtos.AttributeKvProto.newBuilder()
|
||||
@ -94,12 +105,12 @@ public class EdqsConverter {
|
||||
}
|
||||
|
||||
@Override
|
||||
public AttributeKv deserialize(ObjectType type, byte[] bytes) throws Exception {
|
||||
public AttributeKv deserialize(ObjectType type, byte[] bytes, boolean onlyKey) throws Exception {
|
||||
TransportProtos.AttributeKvProto proto = TransportProtos.AttributeKvProto.parseFrom(bytes);
|
||||
EntityId entityId = EntityIdFactory.getByTypeAndUuid(ProtoUtils.fromProto(proto.getEntityType()),
|
||||
new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB()));
|
||||
AttributeScope scope = AttributeScope.values()[proto.getScope().getNumber()];
|
||||
DataPoint dataPoint = proto.hasDataPoint() ? fromDataPointProto(proto.getDataPoint()) : null;
|
||||
DataPoint dataPoint = onlyKey || !proto.hasDataPoint() ? null : fromDataPointProto(proto.getDataPoint());
|
||||
return AttributeKv.builder()
|
||||
.entityId(entityId)
|
||||
.scope(scope)
|
||||
@ -108,8 +119,13 @@ public class EdqsConverter {
|
||||
.dataPoint(dataPoint)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public EdqsObjectKey getKey(AttributeKv attributeKv) {
|
||||
return new AttributeKv.Key(attributeKv.getEntityId().getId(), attributeKv.getScope(), KeyDictionary.get(attributeKv.getKey()));
|
||||
}
|
||||
});
|
||||
converters.put(ObjectType.LATEST_TS_KV, new Converter<LatestTsKv>() {
|
||||
mappers.put(ObjectType.LATEST_TS_KV, new Mapper<LatestTsKv>() {
|
||||
@Override
|
||||
public byte[] serialize(ObjectType type, LatestTsKv latestTsKv) {
|
||||
var proto = TransportProtos.LatestTsKvProto.newBuilder()
|
||||
@ -125,11 +141,11 @@ public class EdqsConverter {
|
||||
}
|
||||
|
||||
@Override
|
||||
public LatestTsKv deserialize(ObjectType type, byte[] bytes) throws Exception {
|
||||
public LatestTsKv deserialize(ObjectType type, byte[] bytes, boolean onlyKey) throws Exception {
|
||||
TransportProtos.LatestTsKvProto proto = TransportProtos.LatestTsKvProto.parseFrom(bytes);
|
||||
EntityId entityId = EntityIdFactory.getByTypeAndUuid(ProtoUtils.fromProto(proto.getEntityType()),
|
||||
new UUID(proto.getEntityIdMSB(), proto.getEntityIdLSB()));
|
||||
DataPoint dataPoint = proto.hasDataPoint() ? fromDataPointProto(proto.getDataPoint()) : null;
|
||||
DataPoint dataPoint = onlyKey || !proto.hasDataPoint() ? null : fromDataPointProto(proto.getDataPoint());
|
||||
return LatestTsKv.builder()
|
||||
.entityId(entityId)
|
||||
.key(proto.getKey())
|
||||
@ -137,6 +153,11 @@ public class EdqsConverter {
|
||||
.dataPoint(dataPoint)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public EdqsObjectKey getKey(LatestTsKv latestTsKv) {
|
||||
return new LatestTsKv.Key(latestTsKv.getEntityId().getId(), KeyDictionary.get(latestTsKv.getKey()));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@ -223,30 +244,30 @@ public class EdqsConverter {
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@SneakyThrows
|
||||
public <T extends EdqsObject> byte[] serialize(ObjectType type, T value) {
|
||||
Converter<T> converter = (Converter<T>) converters.get(type);
|
||||
if (converter != null) {
|
||||
return converter.serialize(type, value);
|
||||
} else {
|
||||
return defaultConverter.serialize(type, (Entity) value);
|
||||
}
|
||||
public <T extends EdqsObject> byte[] serialize(T value) {
|
||||
ObjectType type = value.type();
|
||||
Mapper<T> mapper = (Mapper<T>) mappers.getOrDefault(type, defaultMapper);
|
||||
return mapper.serialize(type, value);
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
public EdqsObject deserialize(ObjectType type, byte[] bytes) {
|
||||
Converter<? extends EdqsObject> converter = converters.get(type);
|
||||
if (converter != null) {
|
||||
return converter.deserialize(type, bytes);
|
||||
} else {
|
||||
return defaultConverter.deserialize(type, bytes);
|
||||
}
|
||||
public EdqsObject deserialize(ObjectType type, byte[] bytes, boolean onlyKey) {
|
||||
Mapper<? extends EdqsObject> mapper = mappers.getOrDefault(type, defaultMapper);
|
||||
return mapper.deserialize(type, bytes, onlyKey);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@SneakyThrows
|
||||
public <T extends EdqsObject> EdqsObjectKey getKey(T object) {
|
||||
Mapper<T> mapper = (Mapper<T>) mappers.getOrDefault(object.type(), defaultMapper);
|
||||
return mapper.getKey(object);
|
||||
}
|
||||
|
||||
@RequiredArgsConstructor
|
||||
private static class JsonConverter<T> implements Converter<T> {
|
||||
private static abstract class JsonMapper<T> implements Mapper<T> {
|
||||
|
||||
private static final SimpleModule module = new SimpleModule();
|
||||
private static final ObjectMapper mapper = JsonMapper.builder()
|
||||
private static final ObjectMapper mapper = com.fasterxml.jackson.databind.json.JsonMapper.builder()
|
||||
.visibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY)
|
||||
.visibility(PropertyAccessor.GETTER, JsonAutoDetect.Visibility.NONE)
|
||||
.visibility(PropertyAccessor.IS_GETTER, JsonAutoDetect.Visibility.NONE)
|
||||
@ -267,17 +288,19 @@ public class EdqsConverter {
|
||||
|
||||
@SneakyThrows
|
||||
@Override
|
||||
public T deserialize(ObjectType objectType, byte[] bytes) {
|
||||
public T deserialize(ObjectType objectType, byte[] bytes, boolean onlyKey) {
|
||||
return mapper.readValue(bytes, this.type);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private interface Converter<T> {
|
||||
private interface Mapper<T> {
|
||||
|
||||
byte[] serialize(ObjectType type, T value) throws Exception;
|
||||
|
||||
T deserialize(ObjectType type, byte[] bytes) throws Exception;
|
||||
T deserialize(ObjectType type, byte[] bytes, boolean onlyKey) throws Exception;
|
||||
|
||||
EdqsObjectKey getKey(T object);
|
||||
|
||||
}
|
||||
|
||||
@ -0,0 +1,30 @@
|
||||
/**
|
||||
* Copyright © 2016-2025 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.edqs.util;
|
||||
|
||||
import org.thingsboard.server.common.data.ObjectType;
|
||||
import org.thingsboard.server.common.data.edqs.EdqsObject;
|
||||
import org.thingsboard.server.common.data.edqs.EdqsObjectKey;
|
||||
|
||||
public interface EdqsMapper {
|
||||
|
||||
<T extends EdqsObject> byte[] serialize(T value);
|
||||
|
||||
EdqsObject deserialize(ObjectType type, byte[] bytes, boolean onlyKey);
|
||||
|
||||
<T extends EdqsObject> EdqsObjectKey getKey(T object);
|
||||
|
||||
}
|
||||
@ -18,6 +18,7 @@ package org.thingsboard.server.edqs.util;
|
||||
import com.github.benmanes.caffeine.cache.Cache;
|
||||
import com.github.benmanes.caffeine.cache.Caffeine;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.thingsboard.server.common.data.edqs.EdqsObjectKey;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
@ -25,11 +26,15 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
||||
@Slf4j
|
||||
public class VersionsStore {
|
||||
|
||||
private final Cache<String, Long> versions = Caffeine.newBuilder()
|
||||
.expireAfterWrite(24, TimeUnit.HOURS)
|
||||
.build();
|
||||
private final Cache<EdqsObjectKey, Long> versions;
|
||||
|
||||
public boolean isNew(String key, Long version) {
|
||||
public VersionsStore(int ttlMinutes) {
|
||||
this.versions = Caffeine.newBuilder()
|
||||
.expireAfterWrite(ttlMinutes, TimeUnit.MINUTES)
|
||||
.build();
|
||||
}
|
||||
|
||||
public boolean isNew(EdqsObjectKey key, Long version) {
|
||||
AtomicBoolean isNew = new AtomicBoolean(false);
|
||||
versions.asMap().compute(key, (k, prevVersion) -> {
|
||||
if (prevVersion == null || prevVersion <= version) {
|
||||
|
||||
@ -1833,7 +1833,6 @@ message FromEdqsMsg {
|
||||
}
|
||||
|
||||
message EdqsEventMsg {
|
||||
string key = 1;
|
||||
string objectType = 2;
|
||||
bytes data = 3;
|
||||
string eventType = 4;
|
||||
|
||||
@ -46,6 +46,8 @@ public class EdqsConfig {
|
||||
private int maxRequestTimeout;
|
||||
@Value("${queue.edqs.request_executor_size:50}")
|
||||
private int requestExecutorSize;
|
||||
@Value("${queue.edqs.versions_cache_ttl:60}")
|
||||
private int versionsCacheTtl;
|
||||
|
||||
public String getLabel() {
|
||||
if (partitioningStrategy == EdqsPartitioningStrategy.NONE) {
|
||||
|
||||
@ -73,6 +73,8 @@ queue:
|
||||
max_request_timeout: "${TB_EDQS_MAX_REQUEST_TIMEOUT:20000}"
|
||||
# Thread pool size for EDQS requests executor
|
||||
request_executor_size: "${TB_EDQS_REQUEST_EXECUTOR_SIZE:50}"
|
||||
# Time to live for EDQS versions cache in minutes. Must be bigger than the time taken for the sync process.
|
||||
versions_cache_ttl: "${TB_EDQS_VERSIONS_CACHE_TTL_MINUTES:60}"
|
||||
# Strings longer than this threshold will be compressed
|
||||
string_compression_length_threshold: "${TB_EDQS_STRING_COMPRESSION_LENGTH_THRESHOLD:512}"
|
||||
stats:
|
||||
|
||||
@ -60,7 +60,8 @@ import org.thingsboard.server.common.data.query.StringFilterPredicate;
|
||||
import org.thingsboard.server.common.data.relation.EntityRelation;
|
||||
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
|
||||
import org.thingsboard.server.common.stats.DummyEdqsStatsService;
|
||||
import org.thingsboard.server.edqs.util.EdqsConverter;
|
||||
import org.thingsboard.server.edqs.util.DefaultEdqsMapper;
|
||||
import org.thingsboard.server.edqs.util.EdqsMapper;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
@ -79,7 +80,7 @@ public abstract class AbstractEDQTest {
|
||||
@Autowired
|
||||
protected DefaultEdqsRepository repository;
|
||||
@Autowired
|
||||
protected EdqsConverter edqsConverter;
|
||||
protected EdqsMapper edqsMapper;
|
||||
@MockBean
|
||||
private DummyEdqsStatsService edqsStatsService;
|
||||
|
||||
@ -244,12 +245,12 @@ public abstract class AbstractEDQTest {
|
||||
}
|
||||
|
||||
protected void addOrUpdate(EntityType entityType, Object entity) {
|
||||
addOrUpdate(EdqsConverter.toEntity(entityType, entity));
|
||||
addOrUpdate(DefaultEdqsMapper.toEntity(entityType, entity));
|
||||
}
|
||||
|
||||
protected void addOrUpdate(EdqsObject edqsObject) {
|
||||
byte[] serialized = edqsConverter.serialize(edqsObject.type(), edqsObject);
|
||||
edqsObject = edqsConverter.deserialize(edqsObject.type(), serialized);
|
||||
byte[] serialized = edqsMapper.serialize(edqsObject);
|
||||
edqsObject = edqsMapper.deserialize(edqsObject.type(), serialized, false);
|
||||
repository.get(tenantId).addOrUpdate(edqsObject);
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user