Merge pull request #13850 from thingsboard/rc

rc
This commit is contained in:
Viacheslav Klimov 2025-08-11 13:02:44 +03:00 committed by GitHub
commit c05f41ad8d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
48 changed files with 596 additions and 383 deletions

View File

@ -59,8 +59,7 @@ import org.thingsboard.server.gen.edge.v1.RequestMsg;
import org.thingsboard.server.gen.edge.v1.ResponseMsg;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.kafka.TbKafkaSettings;
import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs;
import org.thingsboard.server.queue.kafka.KafkaAdmin;
import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
import org.thingsboard.server.queue.util.AfterStartUp;
import org.thingsboard.server.queue.util.TbCoreComponent;
@ -153,10 +152,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
private TbCoreQueueFactory tbCoreQueueFactory;
@Autowired
private Optional<TbKafkaSettings> kafkaSettings;
@Autowired
private Optional<TbKafkaTopicConfigs> kafkaTopicConfigs;
private Optional<KafkaAdmin> kafkaAdmin;
private Server server;
@ -232,8 +228,8 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
}
private EdgeGrpcSession createEdgeGrpcSession(StreamObserver<ResponseMsg> outputStream) {
return kafkaSettings.isPresent() && kafkaTopicConfigs.isPresent()
? new KafkaEdgeGrpcSession(ctx, topicService, tbCoreQueueFactory, kafkaSettings.get(), kafkaTopicConfigs.get(), outputStream, this::onEdgeConnect, this::onEdgeDisconnect,
return kafkaAdmin.isPresent()
? new KafkaEdgeGrpcSession(ctx, topicService, tbCoreQueueFactory, kafkaAdmin.get(), outputStream, this::onEdgeConnect, this::onEdgeDisconnect,
sendDownlinkExecutorService, maxInboundMessageSize, maxHighPriorityQueueSizePerSession)
: new PostgresEdgeGrpcSession(ctx, outputStream, this::onEdgeConnect, this::onEdgeDisconnect,
sendDownlinkExecutorService, maxInboundMessageSize, maxHighPriorityQueueSizePerSession);
@ -643,10 +639,10 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
List<EdgeId> toRemove = new ArrayList<>();
for (EdgeGrpcSession session : sessions.values()) {
if (session instanceof KafkaEdgeGrpcSession kafkaSession &&
!kafkaSession.isConnected() &&
kafkaSession.getConsumer() != null &&
kafkaSession.getConsumer().getConsumer() != null &&
!kafkaSession.getConsumer().getConsumer().isStopped()) {
!kafkaSession.isConnected() &&
kafkaSession.getConsumer() != null &&
kafkaSession.getConsumer().getConsumer() != null &&
!kafkaSession.getConsumer().getConsumer().isStopped()) {
toRemove.add(kafkaSession.getEdge().getId());
}
}
@ -663,4 +659,5 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
log.warn("Failed to cleanup kafka sessions", e);
}
}
}

View File

@ -32,9 +32,7 @@ import org.thingsboard.server.queue.TbQueueConsumer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.common.consumer.QueueConsumerManager;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
import org.thingsboard.server.queue.kafka.TbKafkaSettings;
import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs;
import org.thingsboard.server.queue.kafka.KafkaAdmin;
import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
import org.thingsboard.server.service.edge.EdgeContextComponent;
@ -51,9 +49,7 @@ public class KafkaEdgeGrpcSession extends EdgeGrpcSession {
private final TopicService topicService;
private final TbCoreQueueFactory tbCoreQueueFactory;
private final TbKafkaSettings kafkaSettings;
private final TbKafkaTopicConfigs kafkaTopicConfigs;
private final KafkaAdmin kafkaAdmin;
private volatile boolean isHighPriorityProcessing;
@ -63,21 +59,20 @@ public class KafkaEdgeGrpcSession extends EdgeGrpcSession {
private ExecutorService consumerExecutor;
public KafkaEdgeGrpcSession(EdgeContextComponent ctx, TopicService topicService, TbCoreQueueFactory tbCoreQueueFactory,
TbKafkaSettings kafkaSettings, TbKafkaTopicConfigs kafkaTopicConfigs, StreamObserver<ResponseMsg> outputStream,
KafkaAdmin kafkaAdmin, StreamObserver<ResponseMsg> outputStream,
BiConsumer<EdgeId, EdgeGrpcSession> sessionOpenListener, BiConsumer<Edge, UUID> sessionCloseListener,
ScheduledExecutorService sendDownlinkExecutorService, int maxInboundMessageSize, int maxHighPriorityQueueSizePerSession) {
super(ctx, outputStream, sessionOpenListener, sessionCloseListener, sendDownlinkExecutorService, maxInboundMessageSize, maxHighPriorityQueueSizePerSession);
this.topicService = topicService;
this.tbCoreQueueFactory = tbCoreQueueFactory;
this.kafkaSettings = kafkaSettings;
this.kafkaTopicConfigs = kafkaTopicConfigs;
this.kafkaAdmin = kafkaAdmin;
}
private void processMsgs(List<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> msgs, TbQueueConsumer<TbProtoQueueMsg<ToEdgeEventNotificationMsg>> consumer) {
log.trace("[{}][{}] starting processing edge events", tenantId, edge.getId());
if (!isConnected() || isSyncInProgress() || isHighPriorityProcessing) {
log.debug("[{}][{}] edge not connected, edge sync is not completed or high priority processing in progress, " +
"connected = {}, sync in progress = {}, high priority in progress = {}. Skipping iteration",
"connected = {}, sync in progress = {}, high priority in progress = {}. Skipping iteration",
tenantId, edge.getId(), isConnected(), isSyncInProgress(), isHighPriorityProcessing);
return;
}
@ -159,7 +154,6 @@ public class KafkaEdgeGrpcSession extends EdgeGrpcSession {
@Override
public void cleanUp() {
String topic = topicService.buildEdgeEventNotificationsTopicPartitionInfo(tenantId, edge.getId()).getTopic();
TbKafkaAdmin kafkaAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeEventConfigs());
kafkaAdmin.deleteTopic(topic);
kafkaAdmin.deleteConsumerGroup(topic);
}

View File

@ -35,7 +35,7 @@ import org.thingsboard.server.dao.edge.stats.EdgeStatsCounterService;
import org.thingsboard.server.dao.edge.stats.MsgCounters;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
import org.thingsboard.server.queue.kafka.KafkaAdmin;
import org.thingsboard.server.queue.util.TbCoreComponent;
import java.util.Collections;
@ -63,7 +63,7 @@ public class EdgeStatsService {
private final TimeseriesService tsService;
private final EdgeStatsCounterService statsCounterService;
private final TopicService topicService;
private final Optional<TbKafkaAdmin> tbKafkaAdmin;
private final Optional<KafkaAdmin> kafkaAdmin;
@Value("${edges.stats.ttl:30}")
private int edgesStatsTtlDays;
@ -81,12 +81,12 @@ public class EdgeStatsService {
long ts = now - (now % reportIntervalMillis);
Map<EdgeId, MsgCounters> countersByEdge = statsCounterService.getCounterByEdge();
Map<EdgeId, Long> lagByEdgeId = tbKafkaAdmin.isPresent() ? getEdgeLagByEdgeId(countersByEdge) : Collections.emptyMap();
Map<EdgeId, Long> lagByEdgeId = kafkaAdmin.isPresent() ? getEdgeLagByEdgeId(countersByEdge) : Collections.emptyMap();
Map<EdgeId, MsgCounters> countersByEdgeSnapshot = new HashMap<>(statsCounterService.getCounterByEdge());
countersByEdgeSnapshot.forEach((edgeId, counters) -> {
TenantId tenantId = counters.getTenantId();
if (tbKafkaAdmin.isPresent()) {
if (kafkaAdmin.isPresent()) {
counters.getMsgsLag().set(lagByEdgeId.getOrDefault(edgeId, 0L));
}
List<TsKvEntry> statsEntries = List.of(
@ -109,7 +109,7 @@ public class EdgeStatsService {
e -> topicService.buildEdgeEventNotificationsTopicPartitionInfo(e.getValue().getTenantId(), e.getKey()).getTopic()
));
Map<String, Long> lagByTopic = tbKafkaAdmin.get().getTotalLagForGroupsBulk(new HashSet<>(edgeToTopicMap.values()));
Map<String, Long> lagByTopic = kafkaAdmin.get().getTotalLagForGroupsBulk(new HashSet<>(edgeToTopicMap.values()));
return edgeToTopicMap.entrySet().stream()
.collect(Collectors.toMap(

View File

@ -20,10 +20,8 @@ import org.springframework.stereotype.Service;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.edqs.EdqsConfig;
import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
import org.thingsboard.server.queue.kafka.TbKafkaSettings;
import org.thingsboard.server.queue.kafka.KafkaAdmin;
import java.util.Collections;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@ -33,8 +31,7 @@ public class KafkaEdqsSyncService extends EdqsSyncService {
private final boolean syncNeeded;
public KafkaEdqsSyncService(TbKafkaSettings kafkaSettings, TopicService topicService, EdqsConfig edqsConfig) {
TbKafkaAdmin kafkaAdmin = new TbKafkaAdmin(kafkaSettings, Collections.emptyMap());
public KafkaEdqsSyncService(KafkaAdmin kafkaAdmin, TopicService topicService, EdqsConfig edqsConfig) {
this.syncNeeded = kafkaAdmin.areAllTopicsEmpty(IntStream.range(0, edqsConfig.getPartitions())
.mapToObj(partition -> TopicPartitionInfo.builder()
.topic(topicService.buildTopicName(edqsConfig.getEventsTopic()))

View File

@ -176,8 +176,8 @@ public class DefaultTbQueueService extends AbstractTbEntityService implements Tb
for (int i = oldPartitions; i < newPartitions; i++) {
tbQueueAdmin.createTopicIfNotExists(
new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName(),
queue.getCustomProperties()
);
queue.getCustomProperties(),
true); // forcing topic creation because the topic may still be cached on some nodes
}
}

View File

@ -140,8 +140,10 @@ public class DefaultFirebaseService implements FirebaseService {
}
public void destroy() {
app.delete();
app = null;
if (app != null) {
app.delete();
app = null;
}
messaging = null;
log.debug("[{}] Destroyed FirebaseContext", key);
}

View File

@ -57,6 +57,7 @@ import org.thingsboard.server.dao.alarm.AlarmService;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.entity.EntityService;
import org.thingsboard.server.dao.model.ModelConstants;
import org.thingsboard.server.dao.sql.query.EntityKeyMapping;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
@ -224,7 +225,7 @@ public class DefaultEntityQueryService implements EntityQueryService {
private EntityDataQuery buildEntityDataQuery(AlarmCountQuery query) {
EntityDataPageLink edpl = new EntityDataPageLink(maxEntitiesPerAlarmSubscription, 0, null,
new EntityDataSortOrder(new EntityKey(EntityKeyType.ENTITY_FIELD, ModelConstants.CREATED_TIME_PROPERTY)));
new EntityDataSortOrder(new EntityKey(EntityKeyType.ENTITY_FIELD, EntityKeyMapping.CREATED_TIME)));
return new EntityDataQuery(query.getEntityFilter(), edpl, null, null, query.getKeyFilters());
}
@ -232,7 +233,7 @@ public class DefaultEntityQueryService implements EntityQueryService {
EntityDataSortOrder sortOrder = query.getPageLink().getSortOrder();
EntityDataSortOrder entitiesSortOrder;
if (sortOrder == null || sortOrder.getKey().getType().equals(EntityKeyType.ALARM_FIELD)) {
entitiesSortOrder = new EntityDataSortOrder(new EntityKey(EntityKeyType.ENTITY_FIELD, ModelConstants.CREATED_TIME_PROPERTY));
entitiesSortOrder = new EntityDataSortOrder(new EntityKey(EntityKeyType.ENTITY_FIELD, EntityKeyMapping.CREATED_TIME));
} else {
entitiesSortOrder = sortOrder;
}

View File

@ -30,9 +30,7 @@ import org.thingsboard.server.dao.edge.EdgeService;
import org.thingsboard.server.dao.tenant.TenantService;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
import org.thingsboard.server.queue.kafka.TbKafkaSettings;
import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs;
import org.thingsboard.server.queue.kafka.KafkaAdmin;
import org.thingsboard.server.queue.util.TbCoreComponent;
import java.time.Instant;
@ -57,7 +55,7 @@ public class KafkaEdgeTopicsCleanUpService extends AbstractCleanUpService {
private final TenantService tenantService;
private final EdgeService edgeService;
private final AttributesService attributesService;
private final TbKafkaAdmin kafkaAdmin;
private final KafkaAdmin kafkaAdmin;
@Value("${sql.ttl.edge_events.edge_events_ttl:2628000}")
private long ttlSeconds;
@ -67,13 +65,13 @@ public class KafkaEdgeTopicsCleanUpService extends AbstractCleanUpService {
public KafkaEdgeTopicsCleanUpService(PartitionService partitionService, EdgeService edgeService,
TenantService tenantService, AttributesService attributesService,
TopicService topicService, TbKafkaSettings kafkaSettings, TbKafkaTopicConfigs kafkaTopicConfigs) {
TopicService topicService, KafkaAdmin kafkaAdmin) {
super(partitionService);
this.topicService = topicService;
this.tenantService = tenantService;
this.edgeService = edgeService;
this.attributesService = attributesService;
this.kafkaAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeEventConfigs());
this.kafkaAdmin = kafkaAdmin;
}
@Scheduled(initialDelayString = "#{T(org.apache.commons.lang3.RandomUtils).nextLong(0, ${sql.ttl.edge_events.execution_interval_ms})}", fixedDelayString = "${sql.ttl.edge_events.execution_interval_ms}")
@ -82,8 +80,8 @@ public class KafkaEdgeTopicsCleanUpService extends AbstractCleanUpService {
return;
}
Set<String> topics = kafkaAdmin.getAllTopics();
if (topics == null || topics.isEmpty()) {
Set<String> topics = kafkaAdmin.listTopics();
if (topics.isEmpty()) {
return;
}

View File

@ -742,9 +742,9 @@ redis:
# Minumum number of idle connections that can be maintained in the pool without being closed
minIdle: "${REDIS_POOL_CONFIG_MIN_IDLE:16}"
# Enable/Disable PING command sent when a connection is borrowed
testOnBorrow: "${REDIS_POOL_CONFIG_TEST_ON_BORROW:true}"
testOnBorrow: "${REDIS_POOL_CONFIG_TEST_ON_BORROW:false}"
# The property is used to specify whether to test the connection before returning it to the connection pool.
testOnReturn: "${REDIS_POOL_CONFIG_TEST_ON_RETURN:true}"
testOnReturn: "${REDIS_POOL_CONFIG_TEST_ON_RETURN:false}"
# The property is used in the context of connection pooling in Redis
testWhileIdle: "${REDIS_POOL_CONFIG_TEST_WHILE_IDLE:true}"
# Minimum time that an idle connection should be idle before it can be evicted from the connection pool. The value is set in milliseconds

View File

@ -25,6 +25,9 @@ import org.thingsboard.server.common.data.edqs.EdqsState;
import org.thingsboard.server.common.data.edqs.EdqsState.EdqsApiMode;
import org.thingsboard.server.common.data.edqs.ToCoreEdqsRequest;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.query.AlarmCountQuery;
import org.thingsboard.server.common.data.query.AlarmData;
import org.thingsboard.server.common.data.query.AlarmDataQuery;
import org.thingsboard.server.common.data.query.EntityCountQuery;
import org.thingsboard.server.common.data.query.EntityData;
import org.thingsboard.server.common.data.query.EntityDataQuery;
@ -70,12 +73,24 @@ public class EdqsEntityQueryControllerTest extends EntityQueryControllerTest {
result -> result.getTotalElements() == expectedResultSize);
}
@Override
protected PageData<AlarmData> findAlarmsByQueryAndCheck(AlarmDataQuery query, int expectedResultSize) {
return await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> findAlarmsByQuery(query),
result -> result.getTotalElements() == expectedResultSize);
}
@Override
protected Long countByQueryAndCheck(EntityCountQuery query, long expectedResult) {
return await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> countByQuery(query),
result -> result == expectedResult);
}
@Override
protected Long countAlarmsByQueryAndCheck(AlarmCountQuery query, long expectedResult) {
return await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> countAlarmsByQuery(query),
result -> result == expectedResult);
}
@Test
public void testEdqsState() throws Exception {
loginSysAdmin();

View File

@ -278,8 +278,7 @@ public class EntityQueryControllerTest extends AbstractControllerTest {
assetTypeFilter.setEntityType(EntityType.ASSET);
AlarmCountQuery assetAlarmQuery = new AlarmCountQuery(assetTypeFilter);
Long assetAlamCount = doPostWithResponse("/api/alarmsQuery/count", assetAlarmQuery, Long.class);
Assert.assertEquals(assets.size(), assetAlamCount.longValue());
countAlarmsByQueryAndCheck(assetAlarmQuery, assets.size());
KeyFilter nameFilter = buildStringKeyFilter(EntityKeyType.ENTITY_FIELD, "name", StringFilterPredicate.StringOperation.STARTS_WITH, "Asset1");
List<KeyFilter> keyFilters = Collections.singletonList(nameFilter);
@ -369,8 +368,7 @@ public class EntityQueryControllerTest extends AbstractControllerTest {
assetTypeFilter.setEntityType(EntityType.ASSET);
AlarmCountQuery assetAlarmQuery = new AlarmCountQuery(assetTypeFilter);
Long assetAlamCount = doPostWithResponse("/api/alarmsQuery/count", assetAlarmQuery, Long.class);
Assert.assertEquals(10, assetAlamCount.longValue());
countAlarmsByQueryAndCheck(assetAlarmQuery, 10);
KeyFilter nameFilter = buildStringKeyFilter(EntityKeyType.ENTITY_FIELD, "name", StringFilterPredicate.StringOperation.STARTS_WITH, "Asset1");
List<KeyFilter> keyFilters = Collections.singletonList(nameFilter);
@ -438,9 +436,7 @@ public class EntityQueryControllerTest extends AbstractControllerTest {
assetTypeFilter.setEntityType(EntityType.ASSET);
AlarmDataQuery assetAlarmQuery = new AlarmDataQuery(assetTypeFilter, pageLink, null, null, null, alarmFields);
PageData<AlarmData> alarmPageData = doPostWithTypedResponse("/api/alarmsQuery/find", assetAlarmQuery, new TypeReference<>() {
});
Assert.assertEquals(10, alarmPageData.getTotalElements());
PageData<AlarmData> alarmPageData = findAlarmsByQueryAndCheck(assetAlarmQuery, 10);
List<String> retrievedAlarmTypes = alarmPageData.getData().stream().map(Alarm::getType).toList();
assertThat(retrievedAlarmTypes).containsExactlyInAnyOrderElementsOf(assetAlarmTypes);
@ -511,9 +507,7 @@ public class EntityQueryControllerTest extends AbstractControllerTest {
assetTypeFilter.setEntityType(EntityType.ASSET);
AlarmDataQuery assetAlarmQuery = new AlarmDataQuery(assetTypeFilter, pageLink, null, null, null, Collections.emptyList());
PageData<AlarmData> alarmPageData = doPostWithTypedResponse("/api/alarmsQuery/find", assetAlarmQuery, new TypeReference<>() {
});
Assert.assertEquals(10, alarmPageData.getTotalElements());
PageData<AlarmData> alarmPageData = findAlarmsByQueryAndCheck(assetAlarmQuery, 10);
List<String> retrievedAlarmTypes = alarmPageData.getData().stream().map(Alarm::getType).toList();
assertThat(retrievedAlarmTypes).containsExactlyInAnyOrderElementsOf(assetAlarmTypes);
@ -1141,22 +1135,42 @@ public class EntityQueryControllerTest extends AbstractControllerTest {
});
}
protected PageData<AlarmData> findAlarmsByQuery(AlarmDataQuery query) throws Exception {
return doPostWithTypedResponse("/api/alarmsQuery/find", query, new TypeReference<>() {});
}
protected PageData<EntityData> findByQueryAndCheck(EntityDataQuery query, int expectedResultSize) throws Exception {
PageData<EntityData> result = findByQuery(query);
assertThat(result.getTotalElements()).isEqualTo(expectedResultSize);
return result;
}
protected PageData<AlarmData> findAlarmsByQueryAndCheck(AlarmDataQuery query, int expectedResultSize) throws Exception {
PageData<AlarmData> result = findAlarmsByQuery(query);
assertThat(result.getTotalElements()).isEqualTo(expectedResultSize);
return result;
}
protected Long countByQuery(EntityCountQuery countQuery) throws Exception {
return doPostWithResponse("/api/entitiesQuery/count", countQuery, Long.class);
}
protected Long countAlarmsByQuery(AlarmCountQuery countQuery) throws Exception {
return doPostWithResponse("/api/alarmsQuery/count", countQuery, Long.class);
}
protected Long countByQueryAndCheck(EntityCountQuery query, long expectedResult) throws Exception {
Long result = countByQuery(query);
assertThat(result).isEqualTo(expectedResult);
return result;
}
protected Long countAlarmsByQueryAndCheck(AlarmCountQuery query, long expectedResult) throws Exception {
Long result = countAlarmsByQuery(query);
assertThat(result).isEqualTo(expectedResult);
return result;
}
private KeyFilter getEntityFieldStringEqualToKeyFilter(String keyName, String value) {
KeyFilter tenantOwnerNameFilter = new KeyFilter();
tenantOwnerNameFilter.setKey(new EntityKey(EntityKeyType.ENTITY_FIELD, keyName));

View File

@ -33,7 +33,7 @@ import org.thingsboard.server.dao.edge.stats.EdgeStatsCounterService;
import org.thingsboard.server.dao.edge.stats.MsgCounters;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
import org.thingsboard.server.queue.kafka.KafkaAdmin;
import org.thingsboard.server.service.edge.stats.EdgeStatsService;
import java.util.List;
@ -141,7 +141,7 @@ public class EdgeStatsTest {
TopicPartitionInfo partitionInfo = new TopicPartitionInfo(topic, tenantId, 0, false);
when(topicService.buildEdgeEventNotificationsTopicPartitionInfo(tenantId, edgeId)).thenReturn(partitionInfo);
TbKafkaAdmin kafkaAdmin = mock(TbKafkaAdmin.class);
KafkaAdmin kafkaAdmin = mock(KafkaAdmin.class);
when(kafkaAdmin.getTotalLagForGroupsBulk(Set.of(topic)))
.thenReturn(Map.of(topic, 15L));

View File

@ -79,6 +79,7 @@ import org.thingsboard.server.common.data.query.RelationsQueryFilter;
import org.thingsboard.server.common.data.query.SingleEntityFilter;
import org.thingsboard.server.common.data.query.StringFilterPredicate;
import org.thingsboard.server.common.data.query.StringFilterPredicate.StringOperation;
import org.thingsboard.server.common.data.query.TsValue;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.EntitySearchDirection;
import org.thingsboard.server.common.data.relation.RelationEntityTypeFilter;
@ -118,8 +119,10 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.thingsboard.server.common.data.AttributeScope.SERVER_SCOPE;
import static org.thingsboard.server.common.data.query.EntityKeyType.ATTRIBUTE;
import static org.thingsboard.server.common.data.query.EntityKeyType.ENTITY_FIELD;
import static org.thingsboard.server.common.data.query.EntityKeyType.SERVER_ATTRIBUTE;
@Slf4j
@DaoSqlTest
@ -604,7 +607,7 @@ public class EntityServiceTest extends AbstractControllerTest {
List<ListenableFuture<AttributesSaveResult>> attributeFutures = new ArrayList<>();
for (int i = 0; i < assets.size(); i++) {
Asset asset = assets.get(i);
attributeFutures.add(saveLongAttribute(asset.getId(), "consumption", consumptions.get(i), AttributeScope.SERVER_SCOPE));
attributeFutures.add(saveLongAttribute(asset.getId(), "consumption", consumptions.get(i), SERVER_SCOPE));
}
Futures.allAsList(attributeFutures).get();
@ -1745,6 +1748,33 @@ public class EntityServiceTest extends AbstractControllerTest {
deviceService.deleteDevicesByTenantId(tenantId);
}
@Test
public void testFindTenantTelemetry() {
// save timeseries by sys admin
BasicTsKvEntry timeseries = new BasicTsKvEntry(42L, new DoubleDataEntry("temperature", 45.5));
timeseriesService.save(TenantId.SYS_TENANT_ID, tenantId, timeseries);
AttributeKvEntry attr = new BaseAttributeKvEntry(new LongDataEntry("attr", 10L), 42L);
attributesService.save(TenantId.SYS_TENANT_ID, tenantId, SERVER_SCOPE, List.of(attr));
SingleEntityFilter singleEntityFilter = new SingleEntityFilter();
singleEntityFilter.setSingleEntity(AliasEntityId.fromEntityId(tenantId));
List<EntityKey> entityFields = List.of(
new EntityKey(ENTITY_FIELD, "name")
);
List<EntityKey> latestValues = List.of(
new EntityKey(EntityKeyType.TIME_SERIES, "temperature"),
new EntityKey(EntityKeyType.SERVER_ATTRIBUTE, "attr")
);
EntityDataPageLink pageLink = new EntityDataPageLink(1000, 0, null, null);
EntityDataQuery query = new EntityDataQuery(singleEntityFilter, pageLink, entityFields, latestValues, null);
findByQueryAndCheckTelemetry(query, EntityKeyType.TIME_SERIES, "temperature", List.of("45.5"));
findByQueryAndCheckTelemetry(query, EntityKeyType.SERVER_ATTRIBUTE, "attr", List.of("10"));
}
@Test
public void testBuildStringPredicateQueryOperations() throws ExecutionException, InterruptedException {

View File

@ -507,7 +507,7 @@ public class TbRuleEngineQueueConsumerManagerTest {
consumerManager.delete(true);
await().atMost(2, TimeUnit.SECONDS)
await().atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> {
verify(ruleEngineMsgProducer).send(any(), any(), any());
});

View File

@ -79,10 +79,10 @@ public abstract class TBRedisCacheConfiguration {
@Value("${redis.pool_config.minIdle:16}")
private int minIdle;
@Value("${redis.pool_config.testOnBorrow:true}")
@Value("${redis.pool_config.testOnBorrow:false}")
private boolean testOnBorrow;
@Value("${redis.pool_config.testOnReturn:true}")
@Value("${redis.pool_config.testOnReturn:false}")
private boolean testOnReturn;
@Value("${redis.pool_config.testWhileIdle:true}")

View File

@ -16,7 +16,7 @@
package org.thingsboard.server.queue;
public interface TbEdgeQueueAdmin extends TbQueueAdmin {
void syncEdgeNotificationsOffsets(String fatGroupId, String newGroupId);
void deleteConsumerGroup(String consumerGroupId);
}

View File

@ -18,12 +18,13 @@ package org.thingsboard.server.queue;
public interface TbQueueAdmin {
default void createTopicIfNotExists(String topic) {
createTopicIfNotExists(topic, null);
createTopicIfNotExists(topic, null, false);
}
void createTopicIfNotExists(String topic, String properties);
void createTopicIfNotExists(String topic, String properties, boolean force);
void destroy();
void deleteTopic(String topic);
}

View File

@ -38,7 +38,7 @@ import org.thingsboard.server.common.data.ai.provider.OpenAiProviderConfig;
@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.PROPERTY,
include = JsonTypeInfo.As.EXISTING_PROPERTY,
property = "provider",
visible = true
)

View File

@ -43,7 +43,7 @@ import org.thingsboard.server.queue.edqs.EdqsConfig;
import org.thingsboard.server.queue.edqs.EdqsExecutors;
import org.thingsboard.server.queue.edqs.KafkaEdqsComponent;
import org.thingsboard.server.queue.edqs.KafkaEdqsQueueFactory;
import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
import org.thingsboard.server.queue.kafka.KafkaAdmin;
import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate;
import java.util.HashMap;
@ -68,6 +68,7 @@ public class KafkaEdqsStateService implements EdqsStateService {
private final EdqsExecutors edqsExecutors;
private final EdqsMapper mapper;
private final TopicService topicService;
private final KafkaAdmin kafkaAdmin;
@Autowired
@Lazy
private EdqsProcessor edqsProcessor;
@ -86,7 +87,6 @@ public class KafkaEdqsStateService implements EdqsStateService {
@Override
public void init(PartitionedQueueConsumerManager<TbProtoQueueMsg<ToEdqsMsg>> eventConsumer, List<PartitionedQueueConsumerManager<?>> otherConsumers) {
versionsStore = new VersionsStore(config.getVersionsCacheTtl());
TbKafkaAdmin queueAdmin = queueFactory.getEdqsQueueAdmin();
stateConsumer = PartitionedQueueConsumerManager.<TbProtoQueueMsg<ToEdqsMsg>>create()
.queueKey(new QueueKey(ServiceType.EDQS, config.getStateTopic()))
.topic(topicService.buildTopicName(config.getStateTopic()))
@ -106,7 +106,7 @@ public class KafkaEdqsStateService implements EdqsStateService {
consumer.commit();
})
.consumerCreator((config, tpi) -> queueFactory.createEdqsStateConsumer())
.queueAdmin(queueAdmin)
.queueAdmin(queueFactory.getEdqsQueueAdmin())
.consumerExecutor(edqsExecutors.getConsumersExecutor())
.taskExecutor(edqsExecutors.getConsumerTaskExecutor())
.scheduler(edqsExecutors.getScheduler())
@ -174,7 +174,7 @@ public class KafkaEdqsStateService implements EdqsStateService {
// (because we need to be able to consume the same topic-partition by multiple instances)
Map<String, Long> offsets = new HashMap<>();
try {
queueAdmin.getConsumerGroupOffsets(eventsToBackupKafkaConsumer.getGroupId())
kafkaAdmin.getConsumerGroupOffsets(eventsToBackupKafkaConsumer.getGroupId())
.forEach((topicPartition, offsetAndMetadata) -> {
offsets.put(topicPartition.topic(), offsetAndMetadata.offset());
});

View File

@ -43,7 +43,7 @@ public class RuleEngineTbQueueAdminFactory {
return new TbQueueAdmin() {
@Override
public void createTopicIfNotExists(String topic, String properties) {
public void createTopicIfNotExists(String topic, String properties, boolean force) {
}
@Override

View File

@ -0,0 +1,285 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.queue.kafka;
import jakarta.annotation.PreDestroy;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.concurrent.ConcurrentException;
import org.apache.commons.lang3.concurrent.LazyInitializer;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TopicExistsException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.CachedValue;
import org.thingsboard.server.queue.util.TbKafkaComponent;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
@TbKafkaComponent
@Component
@Slf4j
public class KafkaAdmin {
/*
* TODO: Get rid of per consumer/producer TbKafkaAdmin,
* use single KafkaAdmin instance that accepts topicConfigs.
* */
private final TbKafkaSettings settings;
@Value("${queue.kafka.request.timeout.ms:30000}")
private int requestTimeoutMs;
@Value("${queue.kafka.topics_cache_ttl_ms:300000}") // 5 minutes by default
private int topicsCacheTtlMs;
private final LazyInitializer<AdminClient> adminClient;
private final CachedValue<Set<String>> topics;
public KafkaAdmin(@Lazy TbKafkaSettings settings) {
this.settings = settings;
this.adminClient = LazyInitializer.<AdminClient>builder()
.setInitializer(() -> AdminClient.create(settings.toAdminProps()))
.get();
this.topics = new CachedValue<>(() -> {
Set<String> topics = ConcurrentHashMap.newKeySet();
topics.addAll(listTopics());
return topics;
}, topicsCacheTtlMs);
}
public void createTopicIfNotExists(String topic, Map<String, String> properties, boolean force) {
Set<String> topics = getTopics();
if (!force && topics.contains(topic)) {
log.trace("Topic {} already present in cache", topic);
return;
}
log.debug("Creating topic {} with properties {}", topic, properties);
String numPartitionsStr = properties.remove(TbKafkaTopicConfigs.NUM_PARTITIONS_SETTING);
int partitions = numPartitionsStr != null ? Integer.parseInt(numPartitionsStr) : 1;
NewTopic newTopic = new NewTopic(topic, partitions, settings.getReplicationFactor()).configs(properties);
try {
getClient().createTopics(List.of(newTopic)).all().get(requestTimeoutMs, TimeUnit.MILLISECONDS);
topics.add(topic);
} catch (ExecutionException ee) {
log.trace("Failed to create topic {} with properties {}", topic, properties, ee);
if (ee.getCause() instanceof TopicExistsException) {
//do nothing
} else {
log.warn("[{}] Failed to create topic", topic, ee);
throw new RuntimeException(ee);
}
} catch (Exception e) {
log.warn("[{}] Failed to create topic", topic, e);
throw new RuntimeException(e);
}
}
public void deleteTopic(String topic) {
log.debug("Deleting topic {}", topic);
try {
getClient().deleteTopics(List.of(topic)).all().get(requestTimeoutMs, TimeUnit.MILLISECONDS);
} catch (Exception e) {
log.error("Failed to delete kafka topic [{}].", topic, e);
}
}
private Set<String> getTopics() {
return topics.get();
}
public Set<String> listTopics() {
try {
Set<String> topics = getClient().listTopics().names().get(requestTimeoutMs, TimeUnit.MILLISECONDS);
log.trace("Listed topics: {}", topics);
return topics;
} catch (Exception e) {
log.error("Failed to get all topics.", e);
return Collections.emptySet();
}
}
public Map<String, Long> getTotalLagForGroupsBulk(Set<String> groupIds) {
Map<String, Long> result = new HashMap<>();
for (String groupId : groupIds) {
result.put(groupId, getTotalConsumerGroupLag(groupId));
}
return result;
}
public long getTotalConsumerGroupLag(String groupId) {
try {
Map<TopicPartition, OffsetAndMetadata> committedOffsets = getConsumerGroupOffsets(groupId);
if (committedOffsets.isEmpty()) {
return 0L;
}
Map<TopicPartition, OffsetSpec> latestOffsetsSpec = committedOffsets.keySet().stream()
.collect(Collectors.toMap(tp -> tp, tp -> OffsetSpec.latest()));
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets =
getClient().listOffsets(latestOffsetsSpec).all().get(requestTimeoutMs, TimeUnit.MILLISECONDS);
return committedOffsets.entrySet().stream()
.mapToLong(entry -> {
TopicPartition tp = entry.getKey();
long committed = entry.getValue().offset();
long end = endOffsets.getOrDefault(tp,
new ListOffsetsResult.ListOffsetsResultInfo(0L, 0L, Optional.empty())).offset();
return end - committed;
}).sum();
} catch (Exception e) {
log.error("Failed to get total lag for consumer group: {}", groupId, e);
return 0L;
}
}
@SneakyThrows
public Map<TopicPartition, OffsetAndMetadata> getConsumerGroupOffsets(String groupId) {
return getClient().listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata().get(requestTimeoutMs, TimeUnit.MILLISECONDS);
}
/**
* Sync offsets from a fat group to a single-partition group
* Migration back from single-partition consumer to a fat group is not supported
* TODO: The best possible approach to synchronize the offsets is to do the synchronization as a part of the save Queue parameters with stop all consumers
* */
public void syncOffsets(String fatGroupId, String newGroupId, Integer partitionId) {
try {
log.info("syncOffsets [{}][{}][{}]", fatGroupId, newGroupId, partitionId);
if (partitionId == null) {
return;
}
syncOffsetsUnsafe(fatGroupId, newGroupId, "." + partitionId);
} catch (Exception e) {
log.warn("Failed to syncOffsets from {} to {} partitionId {}", fatGroupId, newGroupId, partitionId, e);
}
}
public void syncOffsetsUnsafe(String fatGroupId, String newGroupId, String topicSuffix) throws ExecutionException, InterruptedException, TimeoutException {
Map<TopicPartition, OffsetAndMetadata> oldOffsets = getConsumerGroupOffsets(fatGroupId);
if (oldOffsets.isEmpty()) {
return;
}
for (var consumerOffset : oldOffsets.entrySet()) {
var tp = consumerOffset.getKey();
if (!tp.topic().endsWith(topicSuffix)) {
continue;
}
var om = consumerOffset.getValue();
Map<TopicPartition, OffsetAndMetadata> newOffsets = getConsumerGroupOffsets(newGroupId);
var existingOffset = newOffsets.get(tp);
if (existingOffset == null) {
log.info("[{}] topic offset does not exists in the new node group {}, all found offsets {}", tp, newGroupId, newOffsets);
} else if (existingOffset.offset() >= om.offset()) {
log.info("[{}] topic offset {} >= than old node group offset {}", tp, existingOffset.offset(), om.offset());
break;
} else {
log.info("[{}] SHOULD alter topic offset [{}] less than old node group offset [{}]", tp, existingOffset.offset(), om.offset());
}
getClient().alterConsumerGroupOffsets(newGroupId, Map.of(tp, om)).all().get(requestTimeoutMs, TimeUnit.MILLISECONDS);
log.info("[{}] altered new consumer groupId {}", tp, newGroupId);
break;
}
}
public boolean isTopicEmpty(String topic) {
return areAllTopicsEmpty(Set.of(topic));
}
public boolean areAllTopicsEmpty(Set<String> topics) {
try {
List<String> existingTopics = getTopics().stream().filter(topics::contains).toList();
if (existingTopics.isEmpty()) {
return true;
}
List<TopicPartition> allPartitions = getClient().describeTopics(existingTopics).allTopicNames().get(requestTimeoutMs, TimeUnit.MILLISECONDS)
.entrySet().stream()
.flatMap(entry -> {
String topic = entry.getKey();
TopicDescription topicDescription = entry.getValue();
return topicDescription.partitions().stream().map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition()));
})
.toList();
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> beginningOffsets = getClient().listOffsets(allPartitions.stream()
.collect(Collectors.toMap(partition -> partition, partition -> OffsetSpec.earliest()))).all().get(requestTimeoutMs, TimeUnit.MILLISECONDS);
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets = getClient().listOffsets(allPartitions.stream()
.collect(Collectors.toMap(partition -> partition, partition -> OffsetSpec.latest()))).all().get(requestTimeoutMs, TimeUnit.MILLISECONDS);
for (TopicPartition partition : allPartitions) {
long beginningOffset = beginningOffsets.get(partition).offset();
long endOffset = endOffsets.get(partition).offset();
if (beginningOffset != endOffset) {
log.debug("Partition [{}] of topic [{}] is not empty. Returning false.", partition.partition(), partition.topic());
return false;
}
}
return true;
} catch (Exception e) {
log.error("Failed to check if topics [{}] empty.", topics, e);
return false;
}
}
public void deleteConsumerGroup(String consumerGroupId) {
try {
getClient().deleteConsumerGroups(List.of(consumerGroupId)).all().get(requestTimeoutMs, TimeUnit.MILLISECONDS);
} catch (Exception e) {
log.warn("Failed to delete consumer group {}", consumerGroupId, e);
}
}
public AdminClient getClient() {
try {
return adminClient.get();
} catch (ConcurrentException e) {
throw new RuntimeException("Failed to initialize Kafka admin client", e);
}
}
@PreDestroy
private void destroy() throws Exception {
if (adminClient.isInitialized()) {
adminClient.get().close();
}
}
}

View File

@ -16,31 +16,12 @@
package org.thingsboard.server.queue.kafka;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TopicExistsException;
import org.thingsboard.server.queue.TbEdgeQueueAdmin;
import org.thingsboard.server.queue.TbQueueAdmin;
import org.thingsboard.server.queue.util.PropertyUtils;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
/**
* Created by ashvayka on 24.09.18.
@ -52,251 +33,42 @@ public class TbKafkaAdmin implements TbQueueAdmin, TbEdgeQueueAdmin {
private final Map<String, String> topicConfigs;
@Getter
private final int numPartitions;
private volatile Set<String> topics;
private final short replicationFactor;
public TbKafkaAdmin(TbKafkaSettings settings, Map<String, String> topicConfigs) {
this.settings = settings;
this.topicConfigs = topicConfigs;
String numPartitionsStr = topicConfigs.get(TbKafkaTopicConfigs.NUM_PARTITIONS_SETTING);
if (numPartitionsStr != null) {
numPartitions = Integer.parseInt(numPartitionsStr);
} else {
numPartitions = 1;
}
replicationFactor = settings.getReplicationFactor();
}
@Override
public void createTopicIfNotExists(String topic, String properties) {
Set<String> topics = getTopics();
if (topics.contains(topic)) {
return;
}
try {
Map<String, String> configs = PropertyUtils.getProps(topicConfigs, properties);
configs.remove(TbKafkaTopicConfigs.NUM_PARTITIONS_SETTING);
NewTopic newTopic = new NewTopic(topic, numPartitions, replicationFactor).configs(configs);
createTopic(newTopic).values().get(topic).get();
topics.add(topic);
} catch (ExecutionException ee) {
if (ee.getCause() instanceof TopicExistsException) {
//do nothing
} else {
log.warn("[{}] Failed to create topic", topic, ee);
throw new RuntimeException(ee);
}
} catch (Exception e) {
log.warn("[{}] Failed to create topic", topic, e);
throw new RuntimeException(e);
}
public void createTopicIfNotExists(String topic, String properties, boolean force) {
settings.getAdmin().createTopicIfNotExists(topic, PropertyUtils.getProps(topicConfigs, properties), force);
}
@Override
public void deleteTopic(String topic) {
Set<String> topics = getTopics();
if (topics.remove(topic)) {
settings.getAdminClient().deleteTopics(Collections.singletonList(topic));
} else {
try {
if (settings.getAdminClient().listTopics().names().get().contains(topic)) {
settings.getAdminClient().deleteTopics(Collections.singletonList(topic));
} else {
log.warn("Kafka topic [{}] does not exist.", topic);
}
} catch (InterruptedException | ExecutionException e) {
log.error("Failed to delete kafka topic [{}].", topic, e);
}
}
}
private Set<String> getTopics() {
if (topics == null) {
synchronized (this) {
if (topics == null) {
topics = ConcurrentHashMap.newKeySet();
try {
topics.addAll(settings.getAdminClient().listTopics().names().get());
} catch (InterruptedException | ExecutionException e) {
log.error("Failed to get all topics.", e);
}
}
}
}
return topics;
}
public Set<String> getAllTopics() {
try {
return settings.getAdminClient().listTopics().names().get();
} catch (InterruptedException | ExecutionException e) {
log.error("Failed to get all topics.", e);
}
return null;
}
public CreateTopicsResult createTopic(NewTopic topic) {
return settings.getAdminClient().createTopics(Collections.singletonList(topic));
settings.getAdmin().deleteTopic(topic);
}
@Override
public void destroy() {
}
/**
* Sync offsets from a fat group to a single-partition group
* Migration back from single-partition consumer to a fat group is not supported
* TODO: The best possible approach to synchronize the offsets is to do the synchronization as a part of the save Queue parameters with stop all consumers
* */
public void syncOffsets(String fatGroupId, String newGroupId, Integer partitionId) {
try {
log.info("syncOffsets [{}][{}][{}]", fatGroupId, newGroupId, partitionId);
if (partitionId == null) {
return;
}
syncOffsetsUnsafe(fatGroupId, newGroupId, "." + partitionId);
} catch (Exception e) {
log.warn("Failed to syncOffsets from {} to {} partitionId {}", fatGroupId, newGroupId, partitionId, e);
}
}
/**
* Sync edge notifications offsets from a fat group to a single group per edge
* */
public void syncEdgeNotificationsOffsets(String fatGroupId, String newGroupId) {
try {
log.info("syncEdgeNotificationsOffsets [{}][{}]", fatGroupId, newGroupId);
syncOffsetsUnsafe(fatGroupId, newGroupId, newGroupId);
settings.getAdmin().syncOffsetsUnsafe(fatGroupId, newGroupId, newGroupId);
} catch (Exception e) {
log.warn("Failed to syncEdgeNotificationsOffsets from {} to {}", fatGroupId, newGroupId, e);
}
}
@Override
public void deleteConsumerGroup(String consumerGroupId) {
try {
settings.getAdminClient().deleteConsumerGroups(Collections.singletonList(consumerGroupId));
} catch (Exception e) {
log.warn("Failed to delete consumer group {}", consumerGroupId, e);
}
}
void syncOffsetsUnsafe(String fatGroupId, String newGroupId, String topicSuffix) throws ExecutionException, InterruptedException, TimeoutException {
Map<TopicPartition, OffsetAndMetadata> oldOffsets = getConsumerGroupOffsets(fatGroupId);
if (oldOffsets.isEmpty()) {
return;
}
for (var consumerOffset : oldOffsets.entrySet()) {
var tp = consumerOffset.getKey();
if (!tp.topic().endsWith(topicSuffix)) {
continue;
}
var om = consumerOffset.getValue();
Map<TopicPartition, OffsetAndMetadata> newOffsets = getConsumerGroupOffsets(newGroupId);
var existingOffset = newOffsets.get(tp);
if (existingOffset == null) {
log.info("[{}] topic offset does not exists in the new node group {}, all found offsets {}", tp, newGroupId, newOffsets);
} else if (existingOffset.offset() >= om.offset()) {
log.info("[{}] topic offset {} >= than old node group offset {}", tp, existingOffset.offset(), om.offset());
break;
} else {
log.info("[{}] SHOULD alter topic offset [{}] less than old node group offset [{}]", tp, existingOffset.offset(), om.offset());
}
settings.getAdminClient().alterConsumerGroupOffsets(newGroupId, Map.of(tp, om)).all().get(10, TimeUnit.SECONDS);
log.info("[{}] altered new consumer groupId {}", tp, newGroupId);
break;
}
}
@SneakyThrows
public Map<TopicPartition, OffsetAndMetadata> getConsumerGroupOffsets(String groupId) {
return settings.getAdminClient().listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
}
public boolean isTopicEmpty(String topic) {
return areAllTopicsEmpty(Set.of(topic));
}
public boolean areAllTopicsEmpty(Set<String> topics) {
try {
List<String> existingTopics = getTopics().stream().filter(topics::contains).toList();
if (existingTopics.isEmpty()) {
return true;
}
List<TopicPartition> allPartitions = settings.getAdminClient().describeTopics(existingTopics).topicNameValues().entrySet().stream()
.flatMap(entry -> {
String topic = entry.getKey();
TopicDescription topicDescription;
try {
topicDescription = entry.getValue().get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
return topicDescription.partitions().stream().map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition()));
})
.toList();
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> beginningOffsets = settings.getAdminClient().listOffsets(allPartitions.stream()
.collect(Collectors.toMap(partition -> partition, partition -> OffsetSpec.earliest()))).all().get();
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets = settings.getAdminClient().listOffsets(allPartitions.stream()
.collect(Collectors.toMap(partition -> partition, partition -> OffsetSpec.latest()))).all().get();
for (TopicPartition partition : allPartitions) {
long beginningOffset = beginningOffsets.get(partition).offset();
long endOffset = endOffsets.get(partition).offset();
if (beginningOffset != endOffset) {
log.debug("Partition [{}] of topic [{}] is not empty. Returning false.", partition.partition(), partition.topic());
return false;
}
}
return true;
} catch (InterruptedException | ExecutionException e) {
log.error("Failed to check if topics [{}] empty.", topics, e);
return false;
}
}
public Map<String, Long> getTotalLagForGroupsBulk(Set<String> groupIds) {
Map<String, Long> result = new HashMap<>();
for (String groupId : groupIds) {
result.put(groupId, getTotalConsumerGroupLag(groupId));
}
return result;
}
public long getTotalConsumerGroupLag(String groupId) {
try {
Map<TopicPartition, OffsetAndMetadata> committedOffsets = getConsumerGroupOffsets(groupId);
if (committedOffsets.isEmpty()) {
return 0L;
}
Map<TopicPartition, OffsetSpec> latestOffsetsSpec = committedOffsets.keySet().stream()
.collect(Collectors.toMap(tp -> tp, tp -> OffsetSpec.latest()));
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets =
settings.getAdminClient().listOffsets(latestOffsetsSpec)
.all().get(10, TimeUnit.SECONDS);
return committedOffsets.entrySet().stream()
.mapToLong(entry -> {
TopicPartition tp = entry.getKey();
long committed = entry.getValue().offset();
long end = endOffsets.getOrDefault(tp,
new ListOffsetsResult.ListOffsetsResultInfo(0L, 0L, Optional.empty())).offset();
return end - committed;
}).sum();
} catch (Exception e) {
log.error("Failed to get total lag for consumer group: {}", groupId, e);
return 0L;
}
}
}

View File

@ -19,11 +19,11 @@ import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import org.thingsboard.server.queue.util.TbKafkaComponent;
@Component
@ConditionalOnProperty(prefix = "queue", value = "type", havingValue = "kafka")
@TbKafkaComponent
@Getter
@AllArgsConstructor
@NoArgsConstructor

View File

@ -26,10 +26,10 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.queue.util.TbKafkaComponent;
import java.time.Duration;
import java.util.ArrayList;
@ -44,11 +44,12 @@ import java.util.concurrent.TimeUnit;
@Slf4j
@Component
@RequiredArgsConstructor
@ConditionalOnProperty(prefix = "queue", value = "type", havingValue = "kafka")
@TbKafkaComponent
public class TbKafkaConsumerStatsService {
private final Set<String> monitoredGroups = ConcurrentHashMap.newKeySet();
private final TbKafkaSettings kafkaSettings;
private final KafkaAdmin kafkaAdmin;
private final TbKafkaConsumerStatisticConfig statsConfig;
private Consumer<String, byte[]> consumer;
@ -77,7 +78,7 @@ public class TbKafkaConsumerStatsService {
}
for (String groupId : monitoredGroups) {
try {
Map<TopicPartition, OffsetAndMetadata> groupOffsets = kafkaSettings.getAdminClient().listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata()
Map<TopicPartition, OffsetAndMetadata> groupOffsets = kafkaSettings.getAdmin().getClient().listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata()
.get(statsConfig.getKafkaResponseTimeoutMs(), TimeUnit.MILLISECONDS);
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(groupOffsets.keySet(), timeoutDuration);
@ -159,12 +160,14 @@ public class TbKafkaConsumerStatsService {
@Override
public String toString() {
return "[" +
"topic=[" + topic + ']' +
", partition=[" + partition + "]" +
", committedOffset=[" + committedOffset + "]" +
", endOffset=[" + endOffset + "]" +
", lag=[" + lag + "]" +
"]";
"topic=[" + topic + ']' +
", partition=[" + partition + "]" +
", committedOffset=[" + committedOffset + "]" +
", endOffset=[" + endOffset + "]" +
", lag=[" + lag + "]" +
"]";
}
}
}

View File

@ -16,12 +16,10 @@
package org.thingsboard.server.queue.kafka;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
@ -30,12 +28,13 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.TbProperty;
import org.thingsboard.server.queue.util.PropertyUtils;
import org.thingsboard.server.queue.util.TbKafkaComponent;
import java.util.HashMap;
import java.util.LinkedHashMap;
@ -47,7 +46,7 @@ import java.util.Properties;
* Created by ashvayka on 25.09.18.
*/
@Slf4j
@ConditionalOnProperty(prefix = "queue", value = "type", havingValue = "kafka")
@TbKafkaComponent
@ConfigurationProperties(prefix = "queue.kafka")
@Component
public class TbKafkaSettings {
@ -143,6 +142,9 @@ public class TbKafkaSettings {
@Value("${queue.kafka.consumer-properties-per-topic-inline:}")
private String consumerPropertiesPerTopicInline;
@Autowired
private KafkaAdmin kafkaAdmin;
@Deprecated
@Setter
private List<TbProperty> other;
@ -150,8 +152,6 @@ public class TbKafkaSettings {
@Setter
private Map<String, List<TbProperty>> consumerPropertiesPerTopic = new HashMap<>();
private volatile AdminClient adminClient;
@PostConstruct
public void initInlineTopicProperties() {
Map<String, List<TbProperty>> inlineProps = parseTopicPropertyList(consumerPropertiesPerTopicInline);
@ -240,15 +240,12 @@ public class TbKafkaSettings {
}
}
public AdminClient getAdminClient() {
if (adminClient == null) {
synchronized (this) {
if (adminClient == null) {
adminClient = AdminClient.create(toAdminProps());
}
}
}
return adminClient;
/*
* Temporary solution to avoid major code changes.
* FIXME: use single instance of Kafka queue admin, don't create a separate one for each consumer/producer
* */
public KafkaAdmin getAdmin() {
return kafkaAdmin;
}
protected Properties toAdminProps() {
@ -279,11 +276,4 @@ public class TbKafkaSettings {
return result;
}
@PreDestroy
private void destroy() {
if (adminClient != null) {
adminClient.close();
}
}
}

View File

@ -18,14 +18,14 @@ package org.thingsboard.server.queue.kafka;
import jakarta.annotation.PostConstruct;
import lombok.Getter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import org.thingsboard.server.queue.util.PropertyUtils;
import org.thingsboard.server.queue.util.TbKafkaComponent;
import java.util.Map;
@Component
@ConditionalOnProperty(prefix = "queue", value = "type", havingValue = "kafka")
@TbKafkaComponent
public class TbKafkaTopicConfigs {
public static final String NUM_PARTITIONS_SETTING = "partitions";

View File

@ -78,7 +78,7 @@ public class InMemoryTbTransportQueueFactory implements TbTransportQueueFactory
templateBuilder.queueAdmin(new TbQueueAdmin() {
@Override
public void createTopicIfNotExists(String topic, String properties) {}
public void createTopicIfNotExists(String topic, String properties, boolean force) {}
@Override
public void destroy() {}

View File

@ -58,6 +58,7 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.edqs.EdqsConfig;
import org.thingsboard.server.queue.kafka.KafkaAdmin;
import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
import org.thingsboard.server.queue.kafka.TbKafkaConsumerStatsService;
import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate;
@ -83,6 +84,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
private final TopicService topicService;
private final TbKafkaSettings kafkaSettings;
private final KafkaAdmin kafkaAdmin;
private final TbServiceInfoProvider serviceInfoProvider;
private final TbQueueCoreSettings coreSettings;
private final TbQueueRuleEngineSettings ruleEngineSettings;
@ -118,7 +120,9 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
private final AtomicLong consumerCount = new AtomicLong();
private final AtomicLong edgeConsumerCount = new AtomicLong();
public KafkaMonolithQueueFactory(TopicService topicService, TbKafkaSettings kafkaSettings,
public KafkaMonolithQueueFactory(TopicService topicService,
TbKafkaSettings kafkaSettings,
KafkaAdmin kafkaAdmin,
TbServiceInfoProvider serviceInfoProvider,
TbQueueCoreSettings coreSettings,
TbQueueRuleEngineSettings ruleEngineSettings,
@ -134,6 +138,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
TasksQueueConfig tasksQueueConfig) {
this.topicService = topicService;
this.kafkaSettings = kafkaSettings;
this.kafkaAdmin = kafkaAdmin;
this.serviceInfoProvider = serviceInfoProvider;
this.coreSettings = coreSettings;
this.ruleEngineSettings = ruleEngineSettings;
@ -240,7 +245,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
String queueName = configuration.getName();
String groupId = topicService.buildConsumerGroupId("re-", configuration.getTenantId(), queueName, partitionId);
ruleEngineAdmin.syncOffsets(topicService.buildConsumerGroupId("re-", configuration.getTenantId(), queueName, null), // the fat groupId
kafkaAdmin.syncOffsets(topicService.buildConsumerGroupId("re-", configuration.getTenantId(), queueName, null), // the fat groupId
groupId, partitionId);
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToRuleEngineMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();

View File

@ -52,6 +52,7 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.discovery.TopicService;
import org.thingsboard.server.queue.edqs.EdqsConfig;
import org.thingsboard.server.queue.kafka.KafkaAdmin;
import org.thingsboard.server.queue.kafka.TbKafkaAdmin;
import org.thingsboard.server.queue.kafka.TbKafkaConsumerStatsService;
import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate;
@ -74,6 +75,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
private final TopicService topicService;
private final TbKafkaSettings kafkaSettings;
private final KafkaAdmin kafkaAdmin;
private final TbServiceInfoProvider serviceInfoProvider;
private final TbQueueCoreSettings coreSettings;
private final TbQueueRuleEngineSettings ruleEngineSettings;
@ -99,6 +101,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
private final AtomicLong consumerCount = new AtomicLong();
public KafkaTbRuleEngineQueueFactory(TopicService topicService, TbKafkaSettings kafkaSettings,
KafkaAdmin kafkaAdmin,
TbServiceInfoProvider serviceInfoProvider,
TbQueueCoreSettings coreSettings,
TbQueueRuleEngineSettings ruleEngineSettings,
@ -111,6 +114,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
TbKafkaTopicConfigs kafkaTopicConfigs) {
this.topicService = topicService;
this.kafkaSettings = kafkaSettings;
this.kafkaAdmin = kafkaAdmin;
this.serviceInfoProvider = serviceInfoProvider;
this.coreSettings = coreSettings;
this.ruleEngineSettings = ruleEngineSettings;
@ -234,7 +238,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
String queueName = configuration.getName();
String groupId = topicService.buildConsumerGroupId("re-", configuration.getTenantId(), queueName, partitionId);
ruleEngineAdmin.syncOffsets(topicService.buildConsumerGroupId("re-", configuration.getTenantId(), queueName, null), // the fat groupId
kafkaAdmin.syncOffsets(topicService.buildConsumerGroupId("re-", configuration.getTenantId(), queueName, null), // the fat groupId
groupId, partitionId);
TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToRuleEngineMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();

View File

@ -0,0 +1,29 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.queue.util;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Inherited
@Retention(RetentionPolicy.RUNTIME)
@Target({java.lang.annotation.ElementType.TYPE, java.lang.annotation.ElementType.METHOD})
@ConditionalOnProperty(prefix = "queue", value = "type", havingValue = "kafka")
public @interface TbKafkaComponent {}

View File

@ -28,7 +28,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.spy;
@SpringBootTest(classes = TbKafkaSettings.class)
@SpringBootTest(classes = {TbKafkaSettings.class, KafkaAdmin.class})
@TestPropertySource(properties = {
"queue.type=kafka",
"queue.kafka.bootstrap.servers=localhost:9092",

View File

@ -59,6 +59,7 @@ public class SslCredentialsWebServerCustomizer implements WebServerFactoryCustom
public void customize(ConfigurableServletWebServerFactory factory) {
SslCredentials sslCredentials = this.httpServerSslCredentialsConfig.getCredentials();
Ssl ssl = serverProperties.getSsl();
ssl.setBundle("default");
ssl.setKeyAlias(sslCredentials.getKeyAlias());
ssl.setKeyPassword(sslCredentials.getKeyPassword());
factory.setSsl(ssl);

View File

@ -0,0 +1,38 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.common.util;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
public class CachedValue<V> {
private final LoadingCache<Object, V> cache;
public CachedValue(Supplier<V> supplier, long valueTtlMs) {
this.cache = Caffeine.newBuilder()
.expireAfterWrite(valueTtlMs, TimeUnit.MILLISECONDS)
.build(__ -> supplier.get());
}
public V get() {
return cache.get(this);
}
}

View File

@ -26,6 +26,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.ObjectType;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.edqs.AttributeKv;
@ -118,7 +119,8 @@ public class BaseAttributesService implements AttributesService {
List<ListenableFuture<Long>> futures = new ArrayList<>(attributes.size());
for (AttributeKvEntry attribute : attributes) {
ListenableFuture<Long> future = Futures.transform(attributesDao.save(tenantId, entityId, scope, attribute), version -> {
edqsService.onUpdate(tenantId, ObjectType.ATTRIBUTE_KV, new AttributeKv(entityId, scope, attribute, version));
TenantId edqsTenantId = entityId.getEntityType() == EntityType.TENANT ? (TenantId) entityId : tenantId;
edqsService.onUpdate(edqsTenantId, ObjectType.ATTRIBUTE_KV, new AttributeKv(entityId, scope, attribute, version));
return version;
}, MoreExecutors.directExecutor());
futures.add(future);
@ -136,7 +138,8 @@ public class BaseAttributesService implements AttributesService {
String key = keyVersionPair.getFirst();
Long version = keyVersionPair.getSecond();
if (version != null) {
edqsService.onDelete(tenantId, ObjectType.ATTRIBUTE_KV, new AttributeKv(entityId, scope, key, version));
TenantId edqsTenantId = entityId.getEntityType() == EntityType.TENANT ? (TenantId) entityId : tenantId;
edqsService.onDelete(edqsTenantId, ObjectType.ATTRIBUTE_KV, new AttributeKv(entityId, scope, key, version));
}
keys.add(key);
}

View File

@ -30,6 +30,7 @@ import org.springframework.stereotype.Service;
import org.thingsboard.server.cache.TbCacheValueWrapper;
import org.thingsboard.server.cache.VersionedTbCache;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.ObjectType;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.edqs.AttributeKv;
@ -239,7 +240,8 @@ public class CachedAttributesService implements AttributesService {
ListenableFuture<Long> future = Futures.transform(attributesDao.save(tenantId, entityId, scope, attribute), version -> {
BaseAttributeKvEntry attributeKvEntry = new BaseAttributeKvEntry(((BaseAttributeKvEntry) attribute).getKv(), attribute.getLastUpdateTs(), version);
put(entityId, scope, attributeKvEntry);
edqsService.onUpdate(tenantId, ObjectType.ATTRIBUTE_KV, new AttributeKv(entityId, scope, attributeKvEntry, version));
TenantId edqsTenantId = entityId.getEntityType() == EntityType.TENANT ? (TenantId) entityId : tenantId;
edqsService.onUpdate(edqsTenantId, ObjectType.ATTRIBUTE_KV, new AttributeKv(entityId, scope, attributeKvEntry, version));
return version;
}, cacheExecutor);
futures.add(future);
@ -263,7 +265,8 @@ public class CachedAttributesService implements AttributesService {
Long version = keyVersionPair.getSecond();
cache.evict(new AttributeCacheKey(scope, entityId, key), version);
if (version != null) {
edqsService.onDelete(tenantId, ObjectType.ATTRIBUTE_KV, new AttributeKv(entityId, scope, key, version));
TenantId edqsTenantId = entityId.getEntityType() == EntityType.TENANT ? (TenantId) entityId : tenantId;
edqsService.onDelete(edqsTenantId, ObjectType.ATTRIBUTE_KV, new AttributeKv(entityId, scope, key, version));
}
return key;
}, cacheExecutor)).toList());

View File

@ -26,9 +26,13 @@ import org.owasp.validator.html.ScanException;
import org.thingsboard.server.common.data.validation.NoXss;
import java.util.Optional;
import java.util.regex.Pattern;
@Slf4j
public class NoXssValidator implements ConstraintValidator<NoXss, Object> {
private static final Pattern JS_TEMPLATE_PATTERN = Pattern.compile("\\{\\{.*}}", Pattern.DOTALL);
private static final AntiSamy xssChecker = new AntiSamy();
private static final Policy xssPolicy;
@ -59,6 +63,9 @@ public class NoXssValidator implements ConstraintValidator<NoXss, Object> {
if (stringValue.isEmpty()) {
return true;
}
if (JS_TEMPLATE_PATTERN.matcher(stringValue).find()) {
return false;
}
try {
return xssChecker.scan(stringValue, xssPolicy).getNumberOfErrors() == 0;
} catch (ScanException | PolicyException e) {

View File

@ -196,7 +196,8 @@ public class BaseTimeseriesService implements TimeseriesService {
if (saveLatest) {
latestFutures.add(Futures.transform(timeseriesLatestDao.saveLatest(tenantId, entityId, tsKvEntry), version -> {
if (version != null) {
edqsService.onUpdate(tenantId, ObjectType.LATEST_TS_KV, new LatestTsKv(entityId, tsKvEntry, version));
TenantId edqsTenantId = entityId.getEntityType() == EntityType.TENANT ? (TenantId) entityId : tenantId;
edqsService.onUpdate(edqsTenantId, ObjectType.LATEST_TS_KV, new LatestTsKv(entityId, tsKvEntry, version));
}
return version;
}, MoreExecutors.directExecutor()));
@ -276,7 +277,8 @@ public class BaseTimeseriesService implements TimeseriesService {
return Futures.transform(timeseriesLatestDao.removeLatest(tenantId, entityId, query), result -> {
if (result.isRemoved()) {
Long version = result.getVersion();
edqsService.onDelete(tenantId, ObjectType.LATEST_TS_KV, new LatestTsKv(entityId, query.getKey(), version));
TenantId edqsTenantId = entityId.getEntityType() == EntityType.TENANT ? (TenantId) entityId : tenantId;
edqsService.onDelete(edqsTenantId, ObjectType.LATEST_TS_KV, new LatestTsKv(entityId, query.getKey(), version));
}
return result;
}, MoreExecutors.directExecutor());

View File

@ -486,6 +486,17 @@ public class DeviceServiceTest extends AbstractServiceTest {
});
}
@Test
public void testSaveDeviceWithJSInjection_thenDataValidationException() {
Device device = new Device();
device.setType("default");
device.setTenantId(tenantId);
device.setName("{{constructor.constructor('location.href=\"https://evil.com\"')()}}");
Assertions.assertThrows(DataValidationException.class, () -> {
deviceService.saveDevice(device);
});
}
@Test
public void testSaveDeviceWithInvalidTenant() {
Device device = new Device();

View File

@ -35,7 +35,14 @@ public class NoXssValidatorTest {
"<p><a href=\"http://htmlbook.ru/example/knob.html\">Link!!!</a></p>1221",
"<h3>Please log in to proceed</h3> <form action=http://192.168.149.128>Username:<br><input type=\"username\" name=\"username\"></br>Password:<br><input type=\"password\" name=\"password\"></br><br><input type=\"submit\" value=\"Log in\"></br>",
" <img src= \"http://site.com/\" > ",
"123 <input type=text value=a onfocus=alert(1337) AUTOFOCUS>bebe"
"123 <input type=text value=a onfocus=alert(1337) AUTOFOCUS>bebe",
"{{constructor.constructor('location.href=\"https://evil.com\"')()}}",
" {{constructor.constructor('alert(1)')()}}",
"{{}}",
"{{{constructor.constructor('location.href=\"https://evil.com\"')()}}}",
"test {{constructor.constructor('location.href=\"https://evil.com\"')()}} test",
"{{#if user}}Hello, {{user.name}}{{/if}}",
"{{ user.name }}"
})
public void givenEntityWithMaliciousPropertyValue_thenReturnValidationError(String maliciousString) {
Asset invalidAsset = new Asset();

View File

@ -114,9 +114,9 @@ redis:
# Minumum number of idle connections that can be maintained in the pool without being closed
minIdle: "${REDIS_POOL_CONFIG_MIN_IDLE:16}"
# Enable/Disable PING command send when a connection is borrowed
testOnBorrow: "${REDIS_POOL_CONFIG_TEST_ON_BORROW:true}"
testOnBorrow: "${REDIS_POOL_CONFIG_TEST_ON_BORROW:false}"
# The property is used to specify whether to test the connection before returning it to the connection pool.
testOnReturn: "${REDIS_POOL_CONFIG_TEST_ON_RETURN:true}"
testOnReturn: "${REDIS_POOL_CONFIG_TEST_ON_RETURN:false}"
# The property is used in the context of connection pooling in Redis
testWhileIdle: "${REDIS_POOL_CONFIG_TEST_WHILE_IDLE:true}"
# Minimum amount of time that an idle connection should be idle before it can be evicted from the connection pool. Value set in milliseconds

View File

@ -147,9 +147,9 @@ redis:
# Minumum number of idle connections that can be maintained in the pool without being closed
minIdle: "${REDIS_POOL_CONFIG_MIN_IDLE:16}"
# Enable/Disable PING command send when a connection is borrowed
testOnBorrow: "${REDIS_POOL_CONFIG_TEST_ON_BORROW:true}"
testOnBorrow: "${REDIS_POOL_CONFIG_TEST_ON_BORROW:false}"
# The property is used to specify whether to test the connection before returning it to the connection pool.
testOnReturn: "${REDIS_POOL_CONFIG_TEST_ON_RETURN:true}"
testOnReturn: "${REDIS_POOL_CONFIG_TEST_ON_RETURN:false}"
# The property is used in the context of connection pooling in Redis
testWhileIdle: "${REDIS_POOL_CONFIG_TEST_WHILE_IDLE:true}"
# Minimum amount of time that an idle connection should be idle before it can be evicted from the connection pool. Value set in milliseconds

View File

@ -114,9 +114,9 @@ redis:
# Minumum number of idle connections that can be maintained in the pool without being closed
minIdle: "${REDIS_POOL_CONFIG_MIN_IDLE:16}"
# Enable/Disable PING command send when a connection is borrowed
testOnBorrow: "${REDIS_POOL_CONFIG_TEST_ON_BORROW:true}"
testOnBorrow: "${REDIS_POOL_CONFIG_TEST_ON_BORROW:false}"
# The property is used to specify whether to test the connection before returning it to the connection pool.
testOnReturn: "${REDIS_POOL_CONFIG_TEST_ON_RETURN:true}"
testOnReturn: "${REDIS_POOL_CONFIG_TEST_ON_RETURN:false}"
# The property is used in the context of connection pooling in Redis
testWhileIdle: "${REDIS_POOL_CONFIG_TEST_WHILE_IDLE:true}"
# Minimum amount of time that an idle connection should be idle before it can be evicted from the connection pool. Value set in milliseconds

View File

@ -115,9 +115,9 @@ redis:
# Minumum number of idle connections that can be maintained in the pool without being closed
minIdle: "${REDIS_POOL_CONFIG_MIN_IDLE:16}"
# Enable/Disable PING command send when a connection is borrowed
testOnBorrow: "${REDIS_POOL_CONFIG_TEST_ON_BORROW:true}"
testOnBorrow: "${REDIS_POOL_CONFIG_TEST_ON_BORROW:false}"
# The property is used to specify whether to test the connection before returning it to the connection pool.
testOnReturn: "${REDIS_POOL_CONFIG_TEST_ON_RETURN:true}"
testOnReturn: "${REDIS_POOL_CONFIG_TEST_ON_RETURN:false}"
# The property is used in the context of connection pooling in Redis
testWhileIdle: "${REDIS_POOL_CONFIG_TEST_WHILE_IDLE:true}"
# Minimum amount of time that an idle connection should be idle before it can be evicted from the connection pool. Value set in milliseconds

View File

@ -114,9 +114,9 @@ redis:
# Minumum number of idle connections that can be maintained in the pool without being closed
minIdle: "${REDIS_POOL_CONFIG_MIN_IDLE:16}"
# Enable/Disable PING command send when a connection is borrowed
testOnBorrow: "${REDIS_POOL_CONFIG_TEST_ON_BORROW:true}"
testOnBorrow: "${REDIS_POOL_CONFIG_TEST_ON_BORROW:false}"
# The property is used to specify whether to test the connection before returning it to the connection pool.
testOnReturn: "${REDIS_POOL_CONFIG_TEST_ON_RETURN:true}"
testOnReturn: "${REDIS_POOL_CONFIG_TEST_ON_RETURN:false}"
# The property is used in the context of connection pooling in Redis
testWhileIdle: "${REDIS_POOL_CONFIG_TEST_WHILE_IDLE:true}"
# Minimum amount of time that an idle connection should be idle before it can be evicted from the connection pool. Value set in milliseconds

View File

@ -36,6 +36,7 @@ import {
import { AiModelService } from '@core/http/ai-model.service';
import { CheckConnectivityDialogComponent } from '@home/components/ai-model/check-connectivity-dialog.component';
import { map } from 'rxjs/operators';
import { deepTrim } from '@core/utils';
export interface AIModelDialogData {
AIModel?: AiModel;
@ -162,6 +163,6 @@ export class AIModelDialogComponent extends DialogComponent<AIModelDialogCompone
add(): void {
const aiModel = {...this.data.AIModel, ...this.aiModelForms.value} as AiModel;
this.aiModelService.saveAiModel(aiModel).subscribe(aiModel => this.dialogRef.close(aiModel));
this.aiModelService.saveAiModel(deepTrim(aiModel)).subscribe(aiModel => this.dialogRef.close(aiModel));
}
}

View File

@ -79,25 +79,24 @@
<div class="tb-form-panel-title" tb-hint-tooltip-icon="{{ getResponseFormatHint }}">
{{ 'rule-node-config.ai.response-format' | translate }}
</div>
<tb-toggle-select formControlName="type" [disabled]="disabledResponseFormatType">
<tb-toggle-select formControlName="type">
<tb-toggle-option [value]="responseFormat.TEXT">{{ 'rule-node-config.ai.response-text' | translate }}</tb-toggle-option>
<tb-toggle-option [value]="responseFormat.JSON">{{ 'rule-node-config.ai.response-json' | translate }}</tb-toggle-option>
<tb-toggle-option [value]="responseFormat.JSON_SCHEMA">{{ 'rule-node-config.ai.response-json-schema' | translate }}</tb-toggle-option>
</tb-toggle-select>
</div>
@if (aiConfigForm.get('responseFormat.type').value === responseFormat.JSON_SCHEMA) {
<tb-json-object-edit
jsonRequired
label="{{ 'rule-node-config.ai.response-json-schema' | translate }}"
formControlName="schema">
<button mat-icon-button class="tb-mat-32"
toolbarSuffixButton
matTooltip="{{ 'rule-node-config.ai.response-json-schema-hint' | translate }}"
matTooltipPosition="above">
<mat-icon class="material-icons">info_outline</mat-icon>
</button>
</tb-json-object-edit>
}
<tb-json-object-edit
jsonRequired
[class.!hidden]="aiConfigForm.get('responseFormat.type').value !== responseFormat.JSON_SCHEMA"
label="{{ 'rule-node-config.ai.response-json-schema' | translate }}"
formControlName="schema">
<button mat-icon-button class="tb-mat-32"
toolbarSuffixButton
matTooltip="{{ 'rule-node-config.ai.response-json-schema-hint' | translate }}"
matTooltipPosition="above">
<mat-icon class="material-icons">info_outline</mat-icon>
</button>
</tb-json-object-edit>
</div>
<div class="tb-form-panel stroked no-padding no-gap">

View File

@ -23,6 +23,7 @@ import { AIModelDialogComponent, AIModelDialogData } from '@home/components/ai-m
import { AiModel, AiRuleNodeResponseFormatTypeOnlyText, ResponseFormat } from '@shared/models/ai-model.models';
import { deepTrim } from '@core/utils';
import { TranslateService } from '@ngx-translate/core';
import { jsonRequired } from '@shared/components/json-object-edit.component';
@Component({
selector: 'tb-external-node-ai-config',
@ -37,8 +38,6 @@ export class AiConfigComponent extends RuleNodeConfigurationComponent {
responseFormat = ResponseFormat;
disabledResponseFormatType: boolean;
constructor(private fb: UntypedFormBuilder,
private translate: TranslateService,
private dialog: MatDialog) {
@ -56,7 +55,7 @@ export class AiConfigComponent extends RuleNodeConfigurationComponent {
userPrompt: [configuration?.userPrompt ?? '', [Validators.required, Validators.maxLength(10000), Validators.pattern(/.*\S.*/)]],
responseFormat: this.fb.group({
type: [configuration?.responseFormat?.type ?? ResponseFormat.JSON, []],
schema: [configuration?.responseFormat?.schema ?? null, [Validators.required]],
schema: [configuration?.responseFormat?.schema ?? null, [jsonRequired]],
}),
timeoutSeconds: [configuration?.timeoutSeconds ?? 60, []],
forceAck: [configuration?.forceAck ?? true, []]
@ -88,10 +87,10 @@ export class AiConfigComponent extends RuleNodeConfigurationComponent {
if (this.aiConfigForm.get('responseFormat.type').value !== ResponseFormat.TEXT) {
this.aiConfigForm.get('responseFormat.type').patchValue(ResponseFormat.TEXT, {emitEvent: true});
}
this.disabledResponseFormatType = true;
this.aiConfigForm.get('responseFormat.type').disable();
}
} else {
this.disabledResponseFormatType = false;
this.aiConfigForm.get('responseFormat.type').enable();
}
}

View File

@ -23,6 +23,11 @@ import { AiModel } from '@shared/models/ai-model.models';
@Component({
selector: 'tb-ai-model-table-header',
templateUrl: './ai-model-table-header.component.html',
styles: [`
:host {
width: 100%;
}
`],
styleUrls: []
})
export class AiModelTableHeaderComponent extends EntityTableHeaderComponent<AiModel> {