merged with edqs branch
This commit is contained in:
commit
96f48dfcc6
@ -0,0 +1,108 @@
|
||||
/**
|
||||
* Copyright © 2016-2025 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.service.edqs;
|
||||
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.annotation.PreDestroy;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.common.util.JacksonUtil;
|
||||
import org.thingsboard.server.common.data.edqs.query.EdqsRequest;
|
||||
import org.thingsboard.server.common.data.edqs.query.EdqsResponse;
|
||||
import org.thingsboard.server.common.data.id.CustomerId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.msg.edqs.EdqsApiService;
|
||||
import org.thingsboard.server.edqs.state.EdqsPartitionService;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.FromEdqsMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg;
|
||||
import org.thingsboard.server.queue.TbQueueRequestTemplate;
|
||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
|
||||
import org.thingsboard.server.queue.provider.EdqsClientQueueFactory;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
@Service
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
@ConditionalOnExpression("'${queue.edqs.api_enabled:true}' == 'true' && ('${service.type:null}' == 'monolith' || '${service.type:null}' == 'tb-core')")
|
||||
public class DefaultEdqsApiService implements EdqsApiService {
|
||||
|
||||
private final EdqsPartitionService edqsPartitionService;
|
||||
private final EdqsClientQueueFactory queueFactory;
|
||||
private TbQueueRequestTemplate<TbProtoQueueMsg<ToEdqsMsg>, TbProtoQueueMsg<FromEdqsMsg>> requestTemplate;
|
||||
|
||||
private Boolean apiEnabled = null;
|
||||
|
||||
@PostConstruct
|
||||
private void init() {
|
||||
requestTemplate = queueFactory.createEdqsRequestTemplate();
|
||||
requestTemplate.init();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<EdqsResponse> processRequest(TenantId tenantId, CustomerId customerId, EdqsRequest request) {
|
||||
var requestMsg = ToEdqsMsg.newBuilder()
|
||||
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
|
||||
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
|
||||
.setTs(System.currentTimeMillis())
|
||||
.setRequestMsg(TransportProtos.EdqsRequestMsg.newBuilder()
|
||||
.setValue(JacksonUtil.toString(request))
|
||||
.build());
|
||||
if (customerId != null && !customerId.isNullUid()) {
|
||||
requestMsg.setCustomerIdMSB(customerId.getId().getMostSignificantBits());
|
||||
requestMsg.setCustomerIdLSB(customerId.getId().getLeastSignificantBits());
|
||||
}
|
||||
|
||||
Integer partition = edqsPartitionService.resolvePartition(tenantId);
|
||||
ListenableFuture<TbProtoQueueMsg<FromEdqsMsg>> resultFuture = requestTemplate.send(new TbProtoQueueMsg<>(UUID.randomUUID(), requestMsg.build()), partition);
|
||||
return Futures.transform(resultFuture, msg -> {
|
||||
TransportProtos.EdqsResponseMsg responseMsg = msg.getValue().getResponseMsg();
|
||||
return JacksonUtil.fromString(responseMsg.getValue(), EdqsResponse.class);
|
||||
}, MoreExecutors.directExecutor());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEnabled() {
|
||||
return Boolean.TRUE.equals(apiEnabled);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setEnabled(boolean enabled) {
|
||||
if (enabled) {
|
||||
log.info("Enabling EDQS API");
|
||||
} else {
|
||||
log.info("Disabling EDQS API");
|
||||
}
|
||||
apiEnabled = enabled;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isSupported() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
private void stop() {
|
||||
requestTemplate.stop();
|
||||
}
|
||||
|
||||
}
|
||||
@ -15,9 +15,6 @@
|
||||
*/
|
||||
package org.thingsboard.server.service.edqs;
|
||||
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.google.protobuf.ByteString;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.annotation.PreDestroy;
|
||||
@ -28,7 +25,6 @@ import lombok.RequiredArgsConstructor;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.stereotype.Service;
|
||||
@ -44,30 +40,24 @@ import org.thingsboard.server.common.data.edqs.EdqsSyncRequest;
|
||||
import org.thingsboard.server.common.data.edqs.Entity;
|
||||
import org.thingsboard.server.common.data.edqs.ToCoreEdqsMsg;
|
||||
import org.thingsboard.server.common.data.edqs.ToCoreEdqsRequest;
|
||||
import org.thingsboard.server.common.data.edqs.query.EdqsRequest;
|
||||
import org.thingsboard.server.common.data.edqs.query.EdqsResponse;
|
||||
import org.thingsboard.server.common.data.id.CustomerId;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
|
||||
import org.thingsboard.server.common.data.kv.JsonDataEntry;
|
||||
import org.thingsboard.server.common.data.kv.KvEntry;
|
||||
import org.thingsboard.server.common.msg.edqs.EdqsApiService;
|
||||
import org.thingsboard.server.common.msg.edqs.EdqsService;
|
||||
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||
import org.thingsboard.server.dao.attributes.AttributesService;
|
||||
import org.thingsboard.server.edqs.processor.EdqsProducer;
|
||||
import org.thingsboard.server.edqs.state.EdqsPartitionService;
|
||||
import org.thingsboard.server.edqs.util.EdqsConverter;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.EdqsEventMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.EdqsRequestMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.FromEdqsMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsCoreServiceMsg;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos.ToEdqsMsg;
|
||||
import org.thingsboard.server.queue.TbQueueRequestTemplate;
|
||||
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
|
||||
import org.thingsboard.server.queue.discovery.HashPartitionService;
|
||||
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
|
||||
import org.thingsboard.server.queue.discovery.TopicService;
|
||||
import org.thingsboard.server.queue.edqs.EdqsQueue;
|
||||
import org.thingsboard.server.queue.environment.DistributedLock;
|
||||
@ -75,7 +65,6 @@ import org.thingsboard.server.queue.environment.DistributedLockService;
|
||||
import org.thingsboard.server.queue.provider.EdqsClientQueueFactory;
|
||||
import org.thingsboard.server.queue.util.AfterStartUp;
|
||||
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@ -88,20 +77,18 @@ public class DefaultEdqsService implements EdqsService {
|
||||
private final EdqsClientQueueFactory queueFactory;
|
||||
private final EdqsConverter edqsConverter;
|
||||
private final EdqsSyncService edqsSyncService;
|
||||
private final EdqsApiService edqsApiService;
|
||||
private final DistributedLockService distributedLockService;
|
||||
private final AttributesService attributesService;
|
||||
private final EdqsPartitionService edqsPartitionService;
|
||||
private final TopicService topicService;
|
||||
private final TbServiceInfoProvider serviceInfoProvider;
|
||||
@Autowired @Lazy
|
||||
private TbClusterService clusterService;
|
||||
@Autowired @Lazy
|
||||
private HashPartitionService hashPartitionService;
|
||||
|
||||
@Value("${queue.edqs.api_enabled:false}")
|
||||
private Boolean apiEnabled;
|
||||
|
||||
private EdqsProducer eventsProducer;
|
||||
private TbQueueRequestTemplate<TbProtoQueueMsg<ToEdqsMsg>, TbProtoQueueMsg<FromEdqsMsg>> requestTemplate;
|
||||
private ExecutorService executor;
|
||||
private DistributedLock syncLock;
|
||||
|
||||
@ -114,17 +101,14 @@ public class DefaultEdqsService implements EdqsService {
|
||||
.topicService(topicService)
|
||||
.producer(queueFactory.createEdqsMsgProducer(EdqsQueue.EVENTS))
|
||||
.build();
|
||||
if (apiEnabled) {
|
||||
apiEnabled = null;
|
||||
}
|
||||
|
||||
requestTemplate = queueFactory.createEdqsRequestTemplate();
|
||||
requestTemplate.init();
|
||||
syncLock = distributedLockService.getLock("edqs_sync");
|
||||
}
|
||||
|
||||
@AfterStartUp(order = AfterStartUp.REGULAR_SERVICE)
|
||||
public void onStartUp() {
|
||||
if (!serviceInfoProvider.isService(ServiceType.TB_CORE)) {
|
||||
return;
|
||||
}
|
||||
executor.submit(() -> {
|
||||
try {
|
||||
EdqsSyncState syncState = getSyncState();
|
||||
@ -134,13 +118,9 @@ public class DefaultEdqsService implements EdqsService {
|
||||
.syncRequest(new EdqsSyncRequest())
|
||||
.build());
|
||||
}
|
||||
} else { // only if topic/RocksDB is not empty and sync is finished
|
||||
if (apiEnabled == null) {
|
||||
log.info("EDQS is already synced, enabling API");
|
||||
apiEnabled = true;
|
||||
} else {
|
||||
log.info("EDQS is already synced");
|
||||
}
|
||||
} else if (edqsApiService.isSupported()) {
|
||||
// only if topic/RocksDB is not empty and sync is finished
|
||||
edqsApiService.setEnabled(true);
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
log.error("Failed to start EDQS service", e);
|
||||
@ -163,7 +143,7 @@ public class DefaultEdqsService implements EdqsService {
|
||||
log.info("Processing system msg {}", msg);
|
||||
try {
|
||||
if (msg.getApiEnabled() != null) {
|
||||
apiEnabled = msg.getApiEnabled();
|
||||
edqsApiService.setEnabled(msg.getApiEnabled());
|
||||
}
|
||||
|
||||
if (msg.getSyncRequest() != null) {
|
||||
@ -177,9 +157,9 @@ public class DefaultEdqsService implements EdqsService {
|
||||
|
||||
saveSyncState(EdqsSyncStatus.STARTED);
|
||||
edqsSyncService.sync();
|
||||
|
||||
saveSyncState(EdqsSyncStatus.FINISHED);
|
||||
if (apiEnabled == null) {
|
||||
|
||||
if (edqsApiService.isSupported()) {
|
||||
broadcast(ToCoreEdqsMsg.builder()
|
||||
.apiEnabled(Boolean.TRUE)
|
||||
.build());
|
||||
@ -229,30 +209,6 @@ public class DefaultEdqsService implements EdqsService {
|
||||
processEvent(tenantId, objectType, EdqsEventType.DELETED, object);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<EdqsResponse> processRequest(TenantId tenantId, CustomerId customerId, EdqsRequest request) {
|
||||
var requestMsg = newEdqsMsg(tenantId)
|
||||
.setRequestMsg(EdqsRequestMsg.newBuilder()
|
||||
.setValue(JacksonUtil.toString(request))
|
||||
.build());
|
||||
if (customerId != null && !customerId.isNullUid()) {
|
||||
requestMsg.setCustomerIdMSB(customerId.getId().getMostSignificantBits());
|
||||
requestMsg.setCustomerIdLSB(customerId.getId().getLeastSignificantBits());
|
||||
}
|
||||
|
||||
Integer partition = edqsPartitionService.resolvePartition(tenantId);
|
||||
ListenableFuture<TbProtoQueueMsg<FromEdqsMsg>> resultFuture = requestTemplate.send(new TbProtoQueueMsg<>(UUID.randomUUID(), requestMsg.build()), partition);
|
||||
return Futures.transform(resultFuture, msg -> {
|
||||
TransportProtos.EdqsResponseMsg responseMsg = msg.getValue().getResponseMsg();
|
||||
return JacksonUtil.fromString(responseMsg.getValue(), EdqsResponse.class);
|
||||
}, MoreExecutors.directExecutor());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isApiEnabled() {
|
||||
return Boolean.TRUE.equals(apiEnabled);
|
||||
}
|
||||
|
||||
protected void processEvent(TenantId tenantId, ObjectType objectType, EdqsEventType eventType, EdqsObject object) {
|
||||
executor.submit(() -> {
|
||||
try {
|
||||
@ -266,7 +222,10 @@ public class DefaultEdqsService implements EdqsService {
|
||||
if (version != null) {
|
||||
eventMsg.setVersion(version);
|
||||
}
|
||||
eventsProducer.send(tenantId, objectType, key, newEdqsMsg(tenantId)
|
||||
eventsProducer.send(tenantId, objectType, key, ToEdqsMsg.newBuilder()
|
||||
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
|
||||
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
|
||||
.setTs(System.currentTimeMillis())
|
||||
.setEventMsg(eventMsg)
|
||||
.build());
|
||||
} catch (Throwable e) {
|
||||
@ -293,20 +252,6 @@ public class DefaultEdqsService implements EdqsService {
|
||||
.build());
|
||||
}
|
||||
|
||||
private static ToEdqsMsg.Builder newEdqsMsg(TenantId tenantId) {
|
||||
return ToEdqsMsg.newBuilder()
|
||||
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
|
||||
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
|
||||
.setTs(System.currentTimeMillis());
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
private void preDestroy() {
|
||||
executor.shutdown();
|
||||
eventsProducer.stop();
|
||||
requestTemplate.stop();
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
private EdqsSyncState getSyncState() {
|
||||
EdqsSyncState state = attributesService.find(TenantId.SYS_TENANT_ID, TenantId.SYS_TENANT_ID, AttributeScope.SERVER_SCOPE, "edqsSyncState").get(30, TimeUnit.SECONDS)
|
||||
@ -326,6 +271,12 @@ public class DefaultEdqsService implements EdqsService {
|
||||
System.currentTimeMillis())).get(30, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
private void stop() {
|
||||
executor.shutdown();
|
||||
eventsProducer.stop();
|
||||
}
|
||||
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
|
||||
@ -23,7 +23,7 @@ import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.query.EntityCountQuery;
|
||||
import org.thingsboard.server.common.data.query.EntityData;
|
||||
import org.thingsboard.server.common.data.query.EntityDataQuery;
|
||||
import org.thingsboard.server.common.msg.edqs.EdqsService;
|
||||
import org.thingsboard.server.common.msg.edqs.EdqsApiService;
|
||||
import org.thingsboard.server.dao.service.DaoSqlTest;
|
||||
import org.thingsboard.server.edqs.state.EdqsStateService;
|
||||
import org.thingsboard.server.edqs.util.EdqsRocksDb;
|
||||
@ -43,9 +43,9 @@ import static org.awaitility.Awaitility.await;
|
||||
public class EdqsEntityQueryControllerTest extends EntityQueryControllerTest {
|
||||
|
||||
@Autowired
|
||||
private EdqsService edqsService;
|
||||
private EdqsApiService edqsApiService;
|
||||
|
||||
@Autowired(required = false)
|
||||
@Autowired
|
||||
private EdqsStateService edqsStateService;
|
||||
|
||||
@MockBean // so that we don't do backup for tests
|
||||
@ -53,7 +53,7 @@ public class EdqsEntityQueryControllerTest extends EntityQueryControllerTest {
|
||||
|
||||
@Before
|
||||
public void before() {
|
||||
await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> edqsService.isApiEnabled() && edqsStateService.isReady());
|
||||
await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> edqsApiService.isEnabled() && edqsStateService.isReady());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -32,7 +32,7 @@ import org.thingsboard.server.common.data.query.EntityDataQuery;
|
||||
import org.thingsboard.server.common.data.query.RelationsQueryFilter;
|
||||
import org.thingsboard.server.common.data.relation.EntitySearchDirection;
|
||||
import org.thingsboard.server.common.data.relation.RelationEntityTypeFilter;
|
||||
import org.thingsboard.server.common.msg.edqs.EdqsService;
|
||||
import org.thingsboard.server.common.msg.edqs.EdqsApiService;
|
||||
import org.thingsboard.server.dao.service.DaoSqlTest;
|
||||
import org.thingsboard.server.edqs.util.EdqsRocksDb;
|
||||
|
||||
@ -56,14 +56,14 @@ import static org.awaitility.Awaitility.await;
|
||||
public class EdqsEntityServiceTest extends EntityServiceTest {
|
||||
|
||||
@Autowired
|
||||
private EdqsService edqsService;
|
||||
private EdqsApiService edqsApiService;
|
||||
|
||||
@MockBean
|
||||
private EdqsRocksDb edqsRocksDb;
|
||||
|
||||
@Before
|
||||
public void beforeEach() {
|
||||
await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> edqsService.isApiEnabled());
|
||||
await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> edqsApiService.isEnabled());
|
||||
}
|
||||
|
||||
// sql implementation has a bug with data duplication, edqs implementation returns correct value
|
||||
|
||||
@ -0,0 +1,34 @@
|
||||
/**
|
||||
* Copyright © 2016-2025 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.common.msg.edqs;
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import org.thingsboard.server.common.data.edqs.query.EdqsRequest;
|
||||
import org.thingsboard.server.common.data.edqs.query.EdqsResponse;
|
||||
import org.thingsboard.server.common.data.id.CustomerId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
|
||||
public interface EdqsApiService {
|
||||
|
||||
ListenableFuture<EdqsResponse> processRequest(TenantId tenantId, CustomerId customerId, EdqsRequest request);
|
||||
|
||||
boolean isEnabled();
|
||||
|
||||
void setEnabled(boolean enabled);
|
||||
|
||||
boolean isSupported();
|
||||
|
||||
}
|
||||
@ -15,23 +15,15 @@
|
||||
*/
|
||||
package org.thingsboard.server.common.msg.edqs;
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import org.thingsboard.server.common.data.ObjectType;
|
||||
import org.thingsboard.server.common.data.edqs.EdqsObject;
|
||||
import org.thingsboard.server.common.data.edqs.ToCoreEdqsMsg;
|
||||
import org.thingsboard.server.common.data.edqs.ToCoreEdqsRequest;
|
||||
import org.thingsboard.server.common.data.edqs.query.EdqsRequest;
|
||||
import org.thingsboard.server.common.data.edqs.query.EdqsResponse;
|
||||
import org.thingsboard.server.common.data.id.CustomerId;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
|
||||
public interface EdqsService {
|
||||
|
||||
ListenableFuture<EdqsResponse> processRequest(TenantId tenantId, CustomerId customerId, EdqsRequest request);
|
||||
|
||||
boolean isApiEnabled();
|
||||
|
||||
void onUpdate(TenantId tenantId, EntityId entityId, Object entity);
|
||||
|
||||
void onUpdate(TenantId tenantId, ObjectType objectType, EdqsObject object);
|
||||
|
||||
@ -45,7 +45,7 @@ import org.thingsboard.server.common.data.query.EntityListFilter;
|
||||
import org.thingsboard.server.common.data.query.EntityTypeFilter;
|
||||
import org.thingsboard.server.common.data.query.KeyFilter;
|
||||
import org.thingsboard.server.common.data.query.RelationsQueryFilter;
|
||||
import org.thingsboard.server.common.msg.edqs.EdqsService;
|
||||
import org.thingsboard.server.common.msg.edqs.EdqsApiService;
|
||||
import org.thingsboard.server.dao.exception.IncorrectParameterException;
|
||||
|
||||
import java.util.ArrayList;
|
||||
@ -85,8 +85,8 @@ public class BaseEntityService extends AbstractEntityService implements EntitySe
|
||||
@Lazy
|
||||
EntityServiceRegistry entityServiceRegistry;
|
||||
|
||||
@Autowired @Lazy
|
||||
private EdqsService edqsService;
|
||||
@Autowired
|
||||
private EdqsApiService edqsApiService;
|
||||
|
||||
@Override
|
||||
public long countEntitiesByQuery(TenantId tenantId, CustomerId customerId, EntityCountQuery query) {
|
||||
@ -95,7 +95,7 @@ public class BaseEntityService extends AbstractEntityService implements EntitySe
|
||||
validateId(customerId, id -> INCORRECT_CUSTOMER_ID + id);
|
||||
validateEntityCountQuery(query);
|
||||
|
||||
if (edqsService.isApiEnabled() && validForEdqs(query) && !tenantId.isSysTenantId()) {
|
||||
if (edqsApiService.isEnabled() && validForEdqs(query) && !tenantId.isSysTenantId()) {
|
||||
EdqsRequest request = EdqsRequest.builder()
|
||||
.entityCountQuery(query)
|
||||
.build();
|
||||
@ -112,7 +112,7 @@ public class BaseEntityService extends AbstractEntityService implements EntitySe
|
||||
validateId(customerId, id -> INCORRECT_CUSTOMER_ID + id);
|
||||
validateEntityDataQuery(query);
|
||||
|
||||
if (edqsService.isApiEnabled() && validForEdqs(query) && !tenantId.isSysTenantId()) {
|
||||
if (edqsApiService.isEnabled() && validForEdqs(query) && !tenantId.isSysTenantId()) {
|
||||
EdqsRequest request = EdqsRequest.builder()
|
||||
.entityDataQuery(query)
|
||||
.build();
|
||||
@ -143,7 +143,7 @@ public class BaseEntityService extends AbstractEntityService implements EntitySe
|
||||
EdqsResponse response;
|
||||
try {
|
||||
log.debug("[{}] Sending request to EDQS: {}", tenantId, request);
|
||||
response = edqsService.processRequest(tenantId, customerId, request).get();
|
||||
response = edqsApiService.processRequest(tenantId, customerId, request).get();
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
@ -0,0 +1,53 @@
|
||||
/**
|
||||
* Copyright © 2016-2025 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.dao.sql.query;
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.server.common.data.edqs.query.EdqsRequest;
|
||||
import org.thingsboard.server.common.data.edqs.query.EdqsResponse;
|
||||
import org.thingsboard.server.common.data.id.CustomerId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.msg.edqs.EdqsApiService;
|
||||
|
||||
@Service
|
||||
@Slf4j
|
||||
@ConditionalOnMissingBean(value = EdqsApiService.class, ignored = DummyEdqsApiService.class)
|
||||
public class DummyEdqsApiService implements EdqsApiService {
|
||||
|
||||
@Override
|
||||
public ListenableFuture<EdqsResponse> processRequest(TenantId tenantId, CustomerId customerId, EdqsRequest request) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isEnabled() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setEnabled(boolean enabled) {
|
||||
log.warn("Got request to enable EDQS API, but it isn't supported", new RuntimeException("stacktrace"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isSupported() {
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
||||
@ -15,16 +15,12 @@
|
||||
*/
|
||||
package org.thingsboard.server.dao.sql.query;
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.thingsboard.server.common.data.ObjectType;
|
||||
import org.thingsboard.server.common.data.edqs.EdqsObject;
|
||||
import org.thingsboard.server.common.data.edqs.ToCoreEdqsMsg;
|
||||
import org.thingsboard.server.common.data.edqs.ToCoreEdqsRequest;
|
||||
import org.thingsboard.server.common.data.edqs.query.EdqsRequest;
|
||||
import org.thingsboard.server.common.data.edqs.query.EdqsResponse;
|
||||
import org.thingsboard.server.common.data.id.CustomerId;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.msg.edqs.EdqsService;
|
||||
@ -33,16 +29,6 @@ import org.thingsboard.server.common.msg.edqs.EdqsService;
|
||||
@ConditionalOnMissingBean(value = EdqsService.class, ignored = DummyEdqsService.class)
|
||||
public class DummyEdqsService implements EdqsService {
|
||||
|
||||
@Override
|
||||
public ListenableFuture<EdqsResponse> processRequest(TenantId tenantId, CustomerId customerId, EdqsRequest request) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isApiEnabled() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onUpdate(TenantId tenantId, EntityId entityId, Object entity) {}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user