diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java index eaef1f7c7d..7340696788 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java @@ -59,8 +59,7 @@ import org.thingsboard.server.gen.edge.v1.RequestMsg; import org.thingsboard.server.gen.edge.v1.ResponseMsg; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.discovery.TopicService; -import org.thingsboard.server.queue.kafka.TbKafkaSettings; -import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs; +import org.thingsboard.server.queue.kafka.KafkaAdmin; import org.thingsboard.server.queue.provider.TbCoreQueueFactory; import org.thingsboard.server.queue.util.AfterStartUp; import org.thingsboard.server.queue.util.TbCoreComponent; @@ -153,10 +152,7 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i private TbCoreQueueFactory tbCoreQueueFactory; @Autowired - private Optional kafkaSettings; - - @Autowired - private Optional kafkaTopicConfigs; + private Optional kafkaAdmin; private Server server; @@ -232,8 +228,8 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i } private EdgeGrpcSession createEdgeGrpcSession(StreamObserver outputStream) { - return kafkaSettings.isPresent() && kafkaTopicConfigs.isPresent() - ? new KafkaEdgeGrpcSession(ctx, topicService, tbCoreQueueFactory, kafkaSettings.get(), kafkaTopicConfigs.get(), outputStream, this::onEdgeConnect, this::onEdgeDisconnect, + return kafkaAdmin.isPresent() + ? new KafkaEdgeGrpcSession(ctx, topicService, tbCoreQueueFactory, kafkaAdmin.get(), outputStream, this::onEdgeConnect, this::onEdgeDisconnect, sendDownlinkExecutorService, maxInboundMessageSize, maxHighPriorityQueueSizePerSession) : new PostgresEdgeGrpcSession(ctx, outputStream, this::onEdgeConnect, this::onEdgeDisconnect, sendDownlinkExecutorService, maxInboundMessageSize, maxHighPriorityQueueSizePerSession); @@ -643,10 +639,10 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i List toRemove = new ArrayList<>(); for (EdgeGrpcSession session : sessions.values()) { if (session instanceof KafkaEdgeGrpcSession kafkaSession && - !kafkaSession.isConnected() && - kafkaSession.getConsumer() != null && - kafkaSession.getConsumer().getConsumer() != null && - !kafkaSession.getConsumer().getConsumer().isStopped()) { + !kafkaSession.isConnected() && + kafkaSession.getConsumer() != null && + kafkaSession.getConsumer().getConsumer() != null && + !kafkaSession.getConsumer().getConsumer().isStopped()) { toRemove.add(kafkaSession.getEdge().getId()); } } @@ -663,4 +659,5 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i log.warn("Failed to cleanup kafka sessions", e); } } + } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/KafkaEdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/KafkaEdgeGrpcSession.java index ab0b42abb4..d165be33d4 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/KafkaEdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/KafkaEdgeGrpcSession.java @@ -32,9 +32,7 @@ import org.thingsboard.server.queue.TbQueueConsumer; import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.common.consumer.QueueConsumerManager; import org.thingsboard.server.queue.discovery.TopicService; -import org.thingsboard.server.queue.kafka.TbKafkaAdmin; -import org.thingsboard.server.queue.kafka.TbKafkaSettings; -import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs; +import org.thingsboard.server.queue.kafka.KafkaAdmin; import org.thingsboard.server.queue.provider.TbCoreQueueFactory; import org.thingsboard.server.service.edge.EdgeContextComponent; @@ -51,9 +49,7 @@ public class KafkaEdgeGrpcSession extends EdgeGrpcSession { private final TopicService topicService; private final TbCoreQueueFactory tbCoreQueueFactory; - - private final TbKafkaSettings kafkaSettings; - private final TbKafkaTopicConfigs kafkaTopicConfigs; + private final KafkaAdmin kafkaAdmin; private volatile boolean isHighPriorityProcessing; @@ -63,21 +59,20 @@ public class KafkaEdgeGrpcSession extends EdgeGrpcSession { private ExecutorService consumerExecutor; public KafkaEdgeGrpcSession(EdgeContextComponent ctx, TopicService topicService, TbCoreQueueFactory tbCoreQueueFactory, - TbKafkaSettings kafkaSettings, TbKafkaTopicConfigs kafkaTopicConfigs, StreamObserver outputStream, + KafkaAdmin kafkaAdmin, StreamObserver outputStream, BiConsumer sessionOpenListener, BiConsumer sessionCloseListener, ScheduledExecutorService sendDownlinkExecutorService, int maxInboundMessageSize, int maxHighPriorityQueueSizePerSession) { super(ctx, outputStream, sessionOpenListener, sessionCloseListener, sendDownlinkExecutorService, maxInboundMessageSize, maxHighPriorityQueueSizePerSession); this.topicService = topicService; this.tbCoreQueueFactory = tbCoreQueueFactory; - this.kafkaSettings = kafkaSettings; - this.kafkaTopicConfigs = kafkaTopicConfigs; + this.kafkaAdmin = kafkaAdmin; } private void processMsgs(List> msgs, TbQueueConsumer> consumer) { log.trace("[{}][{}] starting processing edge events", tenantId, edge.getId()); if (!isConnected() || isSyncInProgress() || isHighPriorityProcessing) { log.debug("[{}][{}] edge not connected, edge sync is not completed or high priority processing in progress, " + - "connected = {}, sync in progress = {}, high priority in progress = {}. Skipping iteration", + "connected = {}, sync in progress = {}, high priority in progress = {}. Skipping iteration", tenantId, edge.getId(), isConnected(), isSyncInProgress(), isHighPriorityProcessing); return; } @@ -159,7 +154,6 @@ public class KafkaEdgeGrpcSession extends EdgeGrpcSession { @Override public void cleanUp() { String topic = topicService.buildEdgeEventNotificationsTopicPartitionInfo(tenantId, edge.getId()).getTopic(); - TbKafkaAdmin kafkaAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeEventConfigs()); kafkaAdmin.deleteTopic(topic); kafkaAdmin.deleteConsumerGroup(topic); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/stats/EdgeStatsService.java b/application/src/main/java/org/thingsboard/server/service/edge/stats/EdgeStatsService.java index 3fc391ec72..48b2a47cfb 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/stats/EdgeStatsService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/stats/EdgeStatsService.java @@ -35,7 +35,7 @@ import org.thingsboard.server.dao.edge.stats.EdgeStatsCounterService; import org.thingsboard.server.dao.edge.stats.MsgCounters; import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.queue.discovery.TopicService; -import org.thingsboard.server.queue.kafka.TbKafkaAdmin; +import org.thingsboard.server.queue.kafka.KafkaAdmin; import org.thingsboard.server.queue.util.TbCoreComponent; import java.util.Collections; @@ -63,7 +63,7 @@ public class EdgeStatsService { private final TimeseriesService tsService; private final EdgeStatsCounterService statsCounterService; private final TopicService topicService; - private final Optional tbKafkaAdmin; + private final Optional kafkaAdmin; @Value("${edges.stats.ttl:30}") private int edgesStatsTtlDays; @@ -81,12 +81,12 @@ public class EdgeStatsService { long ts = now - (now % reportIntervalMillis); Map countersByEdge = statsCounterService.getCounterByEdge(); - Map lagByEdgeId = tbKafkaAdmin.isPresent() ? getEdgeLagByEdgeId(countersByEdge) : Collections.emptyMap(); + Map lagByEdgeId = kafkaAdmin.isPresent() ? getEdgeLagByEdgeId(countersByEdge) : Collections.emptyMap(); Map countersByEdgeSnapshot = new HashMap<>(statsCounterService.getCounterByEdge()); countersByEdgeSnapshot.forEach((edgeId, counters) -> { TenantId tenantId = counters.getTenantId(); - if (tbKafkaAdmin.isPresent()) { + if (kafkaAdmin.isPresent()) { counters.getMsgsLag().set(lagByEdgeId.getOrDefault(edgeId, 0L)); } List statsEntries = List.of( @@ -109,7 +109,7 @@ public class EdgeStatsService { e -> topicService.buildEdgeEventNotificationsTopicPartitionInfo(e.getValue().getTenantId(), e.getKey()).getTopic() )); - Map lagByTopic = tbKafkaAdmin.get().getTotalLagForGroupsBulk(new HashSet<>(edgeToTopicMap.values())); + Map lagByTopic = kafkaAdmin.get().getTotalLagForGroupsBulk(new HashSet<>(edgeToTopicMap.values())); return edgeToTopicMap.entrySet().stream() .collect(Collectors.toMap( diff --git a/application/src/main/java/org/thingsboard/server/service/edqs/KafkaEdqsSyncService.java b/application/src/main/java/org/thingsboard/server/service/edqs/KafkaEdqsSyncService.java index 43b0c575a0..5ded4f66c6 100644 --- a/application/src/main/java/org/thingsboard/server/service/edqs/KafkaEdqsSyncService.java +++ b/application/src/main/java/org/thingsboard/server/service/edqs/KafkaEdqsSyncService.java @@ -20,10 +20,8 @@ import org.springframework.stereotype.Service; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.edqs.EdqsConfig; -import org.thingsboard.server.queue.kafka.TbKafkaAdmin; -import org.thingsboard.server.queue.kafka.TbKafkaSettings; +import org.thingsboard.server.queue.kafka.KafkaAdmin; -import java.util.Collections; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -33,8 +31,7 @@ public class KafkaEdqsSyncService extends EdqsSyncService { private final boolean syncNeeded; - public KafkaEdqsSyncService(TbKafkaSettings kafkaSettings, TopicService topicService, EdqsConfig edqsConfig) { - TbKafkaAdmin kafkaAdmin = new TbKafkaAdmin(kafkaSettings, Collections.emptyMap()); + public KafkaEdqsSyncService(KafkaAdmin kafkaAdmin, TopicService topicService, EdqsConfig edqsConfig) { this.syncNeeded = kafkaAdmin.areAllTopicsEmpty(IntStream.range(0, edqsConfig.getPartitions()) .mapToObj(partition -> TopicPartitionInfo.builder() .topic(topicService.buildTopicName(edqsConfig.getEventsTopic())) diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/queue/DefaultTbQueueService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/queue/DefaultTbQueueService.java index 0d4cc26ce7..26f0d81182 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/queue/DefaultTbQueueService.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/queue/DefaultTbQueueService.java @@ -176,8 +176,8 @@ public class DefaultTbQueueService extends AbstractTbEntityService implements Tb for (int i = oldPartitions; i < newPartitions; i++) { tbQueueAdmin.createTopicIfNotExists( new TopicPartitionInfo(queue.getTopic(), queue.getTenantId(), i, false).getFullTopicName(), - queue.getCustomProperties() - ); + queue.getCustomProperties(), + true); // forcing topic creation because the topic may still be cached on some nodes } } diff --git a/application/src/main/java/org/thingsboard/server/service/notification/provider/DefaultFirebaseService.java b/application/src/main/java/org/thingsboard/server/service/notification/provider/DefaultFirebaseService.java index d1307fcd5d..8056b9fde2 100644 --- a/application/src/main/java/org/thingsboard/server/service/notification/provider/DefaultFirebaseService.java +++ b/application/src/main/java/org/thingsboard/server/service/notification/provider/DefaultFirebaseService.java @@ -140,8 +140,10 @@ public class DefaultFirebaseService implements FirebaseService { } public void destroy() { - app.delete(); - app = null; + if (app != null) { + app.delete(); + app = null; + } messaging = null; log.debug("[{}] Destroyed FirebaseContext", key); } diff --git a/application/src/main/java/org/thingsboard/server/service/query/DefaultEntityQueryService.java b/application/src/main/java/org/thingsboard/server/service/query/DefaultEntityQueryService.java index b3fe76f154..45a73072f5 100644 --- a/application/src/main/java/org/thingsboard/server/service/query/DefaultEntityQueryService.java +++ b/application/src/main/java/org/thingsboard/server/service/query/DefaultEntityQueryService.java @@ -57,6 +57,7 @@ import org.thingsboard.server.dao.alarm.AlarmService; import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.entity.EntityService; import org.thingsboard.server.dao.model.ModelConstants; +import org.thingsboard.server.dao.sql.query.EntityKeyMapping; import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.executors.DbCallbackExecutorService; @@ -224,7 +225,7 @@ public class DefaultEntityQueryService implements EntityQueryService { private EntityDataQuery buildEntityDataQuery(AlarmCountQuery query) { EntityDataPageLink edpl = new EntityDataPageLink(maxEntitiesPerAlarmSubscription, 0, null, - new EntityDataSortOrder(new EntityKey(EntityKeyType.ENTITY_FIELD, ModelConstants.CREATED_TIME_PROPERTY))); + new EntityDataSortOrder(new EntityKey(EntityKeyType.ENTITY_FIELD, EntityKeyMapping.CREATED_TIME))); return new EntityDataQuery(query.getEntityFilter(), edpl, null, null, query.getKeyFilters()); } @@ -232,7 +233,7 @@ public class DefaultEntityQueryService implements EntityQueryService { EntityDataSortOrder sortOrder = query.getPageLink().getSortOrder(); EntityDataSortOrder entitiesSortOrder; if (sortOrder == null || sortOrder.getKey().getType().equals(EntityKeyType.ALARM_FIELD)) { - entitiesSortOrder = new EntityDataSortOrder(new EntityKey(EntityKeyType.ENTITY_FIELD, ModelConstants.CREATED_TIME_PROPERTY)); + entitiesSortOrder = new EntityDataSortOrder(new EntityKey(EntityKeyType.ENTITY_FIELD, EntityKeyMapping.CREATED_TIME)); } else { entitiesSortOrder = sortOrder; } diff --git a/application/src/main/java/org/thingsboard/server/service/ttl/KafkaEdgeTopicsCleanUpService.java b/application/src/main/java/org/thingsboard/server/service/ttl/KafkaEdgeTopicsCleanUpService.java index 6d06c85585..3354c63f9f 100644 --- a/application/src/main/java/org/thingsboard/server/service/ttl/KafkaEdgeTopicsCleanUpService.java +++ b/application/src/main/java/org/thingsboard/server/service/ttl/KafkaEdgeTopicsCleanUpService.java @@ -30,9 +30,7 @@ import org.thingsboard.server.dao.edge.EdgeService; import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TopicService; -import org.thingsboard.server.queue.kafka.TbKafkaAdmin; -import org.thingsboard.server.queue.kafka.TbKafkaSettings; -import org.thingsboard.server.queue.kafka.TbKafkaTopicConfigs; +import org.thingsboard.server.queue.kafka.KafkaAdmin; import org.thingsboard.server.queue.util.TbCoreComponent; import java.time.Instant; @@ -57,7 +55,7 @@ public class KafkaEdgeTopicsCleanUpService extends AbstractCleanUpService { private final TenantService tenantService; private final EdgeService edgeService; private final AttributesService attributesService; - private final TbKafkaAdmin kafkaAdmin; + private final KafkaAdmin kafkaAdmin; @Value("${sql.ttl.edge_events.edge_events_ttl:2628000}") private long ttlSeconds; @@ -67,13 +65,13 @@ public class KafkaEdgeTopicsCleanUpService extends AbstractCleanUpService { public KafkaEdgeTopicsCleanUpService(PartitionService partitionService, EdgeService edgeService, TenantService tenantService, AttributesService attributesService, - TopicService topicService, TbKafkaSettings kafkaSettings, TbKafkaTopicConfigs kafkaTopicConfigs) { + TopicService topicService, KafkaAdmin kafkaAdmin) { super(partitionService); this.topicService = topicService; this.tenantService = tenantService; this.edgeService = edgeService; this.attributesService = attributesService; - this.kafkaAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getEdgeEventConfigs()); + this.kafkaAdmin = kafkaAdmin; } @Scheduled(initialDelayString = "#{T(org.apache.commons.lang3.RandomUtils).nextLong(0, ${sql.ttl.edge_events.execution_interval_ms})}", fixedDelayString = "${sql.ttl.edge_events.execution_interval_ms}") @@ -82,8 +80,8 @@ public class KafkaEdgeTopicsCleanUpService extends AbstractCleanUpService { return; } - Set topics = kafkaAdmin.getAllTopics(); - if (topics == null || topics.isEmpty()) { + Set topics = kafkaAdmin.listTopics(); + if (topics.isEmpty()) { return; } diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index c54553fff8..54c1442ea7 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -742,9 +742,9 @@ redis: # Minumum number of idle connections that can be maintained in the pool without being closed minIdle: "${REDIS_POOL_CONFIG_MIN_IDLE:16}" # Enable/Disable PING command sent when a connection is borrowed - testOnBorrow: "${REDIS_POOL_CONFIG_TEST_ON_BORROW:true}" + testOnBorrow: "${REDIS_POOL_CONFIG_TEST_ON_BORROW:false}" # The property is used to specify whether to test the connection before returning it to the connection pool. - testOnReturn: "${REDIS_POOL_CONFIG_TEST_ON_RETURN:true}" + testOnReturn: "${REDIS_POOL_CONFIG_TEST_ON_RETURN:false}" # The property is used in the context of connection pooling in Redis testWhileIdle: "${REDIS_POOL_CONFIG_TEST_WHILE_IDLE:true}" # Minimum time that an idle connection should be idle before it can be evicted from the connection pool. The value is set in milliseconds diff --git a/application/src/test/java/org/thingsboard/server/controller/EdqsEntityQueryControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/EdqsEntityQueryControllerTest.java index 89bbe5e499..655ab417e6 100644 --- a/application/src/test/java/org/thingsboard/server/controller/EdqsEntityQueryControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/EdqsEntityQueryControllerTest.java @@ -25,6 +25,9 @@ import org.thingsboard.server.common.data.edqs.EdqsState; import org.thingsboard.server.common.data.edqs.EdqsState.EdqsApiMode; import org.thingsboard.server.common.data.edqs.ToCoreEdqsRequest; import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.query.AlarmCountQuery; +import org.thingsboard.server.common.data.query.AlarmData; +import org.thingsboard.server.common.data.query.AlarmDataQuery; import org.thingsboard.server.common.data.query.EntityCountQuery; import org.thingsboard.server.common.data.query.EntityData; import org.thingsboard.server.common.data.query.EntityDataQuery; @@ -70,12 +73,24 @@ public class EdqsEntityQueryControllerTest extends EntityQueryControllerTest { result -> result.getTotalElements() == expectedResultSize); } + @Override + protected PageData findAlarmsByQueryAndCheck(AlarmDataQuery query, int expectedResultSize) { + return await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> findAlarmsByQuery(query), + result -> result.getTotalElements() == expectedResultSize); + } + @Override protected Long countByQueryAndCheck(EntityCountQuery query, long expectedResult) { return await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> countByQuery(query), result -> result == expectedResult); } + @Override + protected Long countAlarmsByQueryAndCheck(AlarmCountQuery query, long expectedResult) { + return await().atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> countAlarmsByQuery(query), + result -> result == expectedResult); + } + @Test public void testEdqsState() throws Exception { loginSysAdmin(); diff --git a/application/src/test/java/org/thingsboard/server/controller/EntityQueryControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/EntityQueryControllerTest.java index ee18543796..7b33c88062 100644 --- a/application/src/test/java/org/thingsboard/server/controller/EntityQueryControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/EntityQueryControllerTest.java @@ -278,8 +278,7 @@ public class EntityQueryControllerTest extends AbstractControllerTest { assetTypeFilter.setEntityType(EntityType.ASSET); AlarmCountQuery assetAlarmQuery = new AlarmCountQuery(assetTypeFilter); - Long assetAlamCount = doPostWithResponse("/api/alarmsQuery/count", assetAlarmQuery, Long.class); - Assert.assertEquals(assets.size(), assetAlamCount.longValue()); + countAlarmsByQueryAndCheck(assetAlarmQuery, assets.size()); KeyFilter nameFilter = buildStringKeyFilter(EntityKeyType.ENTITY_FIELD, "name", StringFilterPredicate.StringOperation.STARTS_WITH, "Asset1"); List keyFilters = Collections.singletonList(nameFilter); @@ -369,8 +368,7 @@ public class EntityQueryControllerTest extends AbstractControllerTest { assetTypeFilter.setEntityType(EntityType.ASSET); AlarmCountQuery assetAlarmQuery = new AlarmCountQuery(assetTypeFilter); - Long assetAlamCount = doPostWithResponse("/api/alarmsQuery/count", assetAlarmQuery, Long.class); - Assert.assertEquals(10, assetAlamCount.longValue()); + countAlarmsByQueryAndCheck(assetAlarmQuery, 10); KeyFilter nameFilter = buildStringKeyFilter(EntityKeyType.ENTITY_FIELD, "name", StringFilterPredicate.StringOperation.STARTS_WITH, "Asset1"); List keyFilters = Collections.singletonList(nameFilter); @@ -438,9 +436,7 @@ public class EntityQueryControllerTest extends AbstractControllerTest { assetTypeFilter.setEntityType(EntityType.ASSET); AlarmDataQuery assetAlarmQuery = new AlarmDataQuery(assetTypeFilter, pageLink, null, null, null, alarmFields); - PageData alarmPageData = doPostWithTypedResponse("/api/alarmsQuery/find", assetAlarmQuery, new TypeReference<>() { - }); - Assert.assertEquals(10, alarmPageData.getTotalElements()); + PageData alarmPageData = findAlarmsByQueryAndCheck(assetAlarmQuery, 10); List retrievedAlarmTypes = alarmPageData.getData().stream().map(Alarm::getType).toList(); assertThat(retrievedAlarmTypes).containsExactlyInAnyOrderElementsOf(assetAlarmTypes); @@ -511,9 +507,7 @@ public class EntityQueryControllerTest extends AbstractControllerTest { assetTypeFilter.setEntityType(EntityType.ASSET); AlarmDataQuery assetAlarmQuery = new AlarmDataQuery(assetTypeFilter, pageLink, null, null, null, Collections.emptyList()); - PageData alarmPageData = doPostWithTypedResponse("/api/alarmsQuery/find", assetAlarmQuery, new TypeReference<>() { - }); - Assert.assertEquals(10, alarmPageData.getTotalElements()); + PageData alarmPageData = findAlarmsByQueryAndCheck(assetAlarmQuery, 10); List retrievedAlarmTypes = alarmPageData.getData().stream().map(Alarm::getType).toList(); assertThat(retrievedAlarmTypes).containsExactlyInAnyOrderElementsOf(assetAlarmTypes); @@ -1141,22 +1135,42 @@ public class EntityQueryControllerTest extends AbstractControllerTest { }); } + protected PageData findAlarmsByQuery(AlarmDataQuery query) throws Exception { + return doPostWithTypedResponse("/api/alarmsQuery/find", query, new TypeReference<>() {}); + } + protected PageData findByQueryAndCheck(EntityDataQuery query, int expectedResultSize) throws Exception { PageData result = findByQuery(query); assertThat(result.getTotalElements()).isEqualTo(expectedResultSize); return result; } + protected PageData findAlarmsByQueryAndCheck(AlarmDataQuery query, int expectedResultSize) throws Exception { + PageData result = findAlarmsByQuery(query); + assertThat(result.getTotalElements()).isEqualTo(expectedResultSize); + return result; + } + protected Long countByQuery(EntityCountQuery countQuery) throws Exception { return doPostWithResponse("/api/entitiesQuery/count", countQuery, Long.class); } + protected Long countAlarmsByQuery(AlarmCountQuery countQuery) throws Exception { + return doPostWithResponse("/api/alarmsQuery/count", countQuery, Long.class); + } + protected Long countByQueryAndCheck(EntityCountQuery query, long expectedResult) throws Exception { Long result = countByQuery(query); assertThat(result).isEqualTo(expectedResult); return result; } + protected Long countAlarmsByQueryAndCheck(AlarmCountQuery query, long expectedResult) throws Exception { + Long result = countAlarmsByQuery(query); + assertThat(result).isEqualTo(expectedResult); + return result; + } + private KeyFilter getEntityFieldStringEqualToKeyFilter(String keyName, String value) { KeyFilter tenantOwnerNameFilter = new KeyFilter(); tenantOwnerNameFilter.setKey(new EntityKey(EntityKeyType.ENTITY_FIELD, keyName)); diff --git a/application/src/test/java/org/thingsboard/server/service/edge/EdgeStatsTest.java b/application/src/test/java/org/thingsboard/server/service/edge/EdgeStatsTest.java index 933318ae59..25ff0f1b5d 100644 --- a/application/src/test/java/org/thingsboard/server/service/edge/EdgeStatsTest.java +++ b/application/src/test/java/org/thingsboard/server/service/edge/EdgeStatsTest.java @@ -33,7 +33,7 @@ import org.thingsboard.server.dao.edge.stats.EdgeStatsCounterService; import org.thingsboard.server.dao.edge.stats.MsgCounters; import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.queue.discovery.TopicService; -import org.thingsboard.server.queue.kafka.TbKafkaAdmin; +import org.thingsboard.server.queue.kafka.KafkaAdmin; import org.thingsboard.server.service.edge.stats.EdgeStatsService; import java.util.List; @@ -141,7 +141,7 @@ public class EdgeStatsTest { TopicPartitionInfo partitionInfo = new TopicPartitionInfo(topic, tenantId, 0, false); when(topicService.buildEdgeEventNotificationsTopicPartitionInfo(tenantId, edgeId)).thenReturn(partitionInfo); - TbKafkaAdmin kafkaAdmin = mock(TbKafkaAdmin.class); + KafkaAdmin kafkaAdmin = mock(KafkaAdmin.class); when(kafkaAdmin.getTotalLagForGroupsBulk(Set.of(topic))) .thenReturn(Map.of(topic, 15L)); diff --git a/application/src/test/java/org/thingsboard/server/service/entitiy/EntityServiceTest.java b/application/src/test/java/org/thingsboard/server/service/entitiy/EntityServiceTest.java index 14f90822bf..8e95a68110 100644 --- a/application/src/test/java/org/thingsboard/server/service/entitiy/EntityServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/entitiy/EntityServiceTest.java @@ -79,6 +79,7 @@ import org.thingsboard.server.common.data.query.RelationsQueryFilter; import org.thingsboard.server.common.data.query.SingleEntityFilter; import org.thingsboard.server.common.data.query.StringFilterPredicate; import org.thingsboard.server.common.data.query.StringFilterPredicate.StringOperation; +import org.thingsboard.server.common.data.query.TsValue; import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.EntitySearchDirection; import org.thingsboard.server.common.data.relation.RelationEntityTypeFilter; @@ -118,8 +119,10 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; +import static org.thingsboard.server.common.data.AttributeScope.SERVER_SCOPE; import static org.thingsboard.server.common.data.query.EntityKeyType.ATTRIBUTE; import static org.thingsboard.server.common.data.query.EntityKeyType.ENTITY_FIELD; +import static org.thingsboard.server.common.data.query.EntityKeyType.SERVER_ATTRIBUTE; @Slf4j @DaoSqlTest @@ -604,7 +607,7 @@ public class EntityServiceTest extends AbstractControllerTest { List> attributeFutures = new ArrayList<>(); for (int i = 0; i < assets.size(); i++) { Asset asset = assets.get(i); - attributeFutures.add(saveLongAttribute(asset.getId(), "consumption", consumptions.get(i), AttributeScope.SERVER_SCOPE)); + attributeFutures.add(saveLongAttribute(asset.getId(), "consumption", consumptions.get(i), SERVER_SCOPE)); } Futures.allAsList(attributeFutures).get(); @@ -1745,6 +1748,33 @@ public class EntityServiceTest extends AbstractControllerTest { deviceService.deleteDevicesByTenantId(tenantId); } + @Test + public void testFindTenantTelemetry() { + // save timeseries by sys admin + BasicTsKvEntry timeseries = new BasicTsKvEntry(42L, new DoubleDataEntry("temperature", 45.5)); + timeseriesService.save(TenantId.SYS_TENANT_ID, tenantId, timeseries); + + AttributeKvEntry attr = new BaseAttributeKvEntry(new LongDataEntry("attr", 10L), 42L); + attributesService.save(TenantId.SYS_TENANT_ID, tenantId, SERVER_SCOPE, List.of(attr)); + + SingleEntityFilter singleEntityFilter = new SingleEntityFilter(); + singleEntityFilter.setSingleEntity(AliasEntityId.fromEntityId(tenantId)); + + List entityFields = List.of( + new EntityKey(ENTITY_FIELD, "name") + ); + List latestValues = List.of( + new EntityKey(EntityKeyType.TIME_SERIES, "temperature"), + new EntityKey(EntityKeyType.SERVER_ATTRIBUTE, "attr") + ); + + EntityDataPageLink pageLink = new EntityDataPageLink(1000, 0, null, null); + EntityDataQuery query = new EntityDataQuery(singleEntityFilter, pageLink, entityFields, latestValues, null); + + findByQueryAndCheckTelemetry(query, EntityKeyType.TIME_SERIES, "temperature", List.of("45.5")); + findByQueryAndCheckTelemetry(query, EntityKeyType.SERVER_ATTRIBUTE, "attr", List.of("10")); + } + @Test public void testBuildStringPredicateQueryOperations() throws ExecutionException, InterruptedException { diff --git a/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java b/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java index 09bd02e5a4..bcbe52b5c9 100644 --- a/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java +++ b/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java @@ -507,7 +507,7 @@ public class TbRuleEngineQueueConsumerManagerTest { consumerManager.delete(true); - await().atMost(2, TimeUnit.SECONDS) + await().atMost(5, TimeUnit.SECONDS) .untilAsserted(() -> { verify(ruleEngineMsgProducer).send(any(), any(), any()); }); diff --git a/common/cache/src/main/java/org/thingsboard/server/cache/TBRedisCacheConfiguration.java b/common/cache/src/main/java/org/thingsboard/server/cache/TBRedisCacheConfiguration.java index 8e06913d2c..8f51afed66 100644 --- a/common/cache/src/main/java/org/thingsboard/server/cache/TBRedisCacheConfiguration.java +++ b/common/cache/src/main/java/org/thingsboard/server/cache/TBRedisCacheConfiguration.java @@ -79,10 +79,10 @@ public abstract class TBRedisCacheConfiguration { @Value("${redis.pool_config.minIdle:16}") private int minIdle; - @Value("${redis.pool_config.testOnBorrow:true}") + @Value("${redis.pool_config.testOnBorrow:false}") private boolean testOnBorrow; - @Value("${redis.pool_config.testOnReturn:true}") + @Value("${redis.pool_config.testOnReturn:false}") private boolean testOnReturn; @Value("${redis.pool_config.testWhileIdle:true}") diff --git a/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbEdgeQueueAdmin.java b/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbEdgeQueueAdmin.java index 9be50bb145..9210d4c1a8 100644 --- a/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbEdgeQueueAdmin.java +++ b/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbEdgeQueueAdmin.java @@ -16,7 +16,7 @@ package org.thingsboard.server.queue; public interface TbEdgeQueueAdmin extends TbQueueAdmin { + void syncEdgeNotificationsOffsets(String fatGroupId, String newGroupId); - void deleteConsumerGroup(String consumerGroupId); } diff --git a/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueAdmin.java b/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueAdmin.java index 48d9b3c34f..0b9925765c 100644 --- a/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueAdmin.java +++ b/common/cluster-api/src/main/java/org/thingsboard/server/queue/TbQueueAdmin.java @@ -18,12 +18,13 @@ package org.thingsboard.server.queue; public interface TbQueueAdmin { default void createTopicIfNotExists(String topic) { - createTopicIfNotExists(topic, null); + createTopicIfNotExists(topic, null, false); } - void createTopicIfNotExists(String topic, String properties); + void createTopicIfNotExists(String topic, String properties, boolean force); void destroy(); void deleteTopic(String topic); + } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/ai/model/AiModelConfig.java b/common/data/src/main/java/org/thingsboard/server/common/data/ai/model/AiModelConfig.java index 0a2b41a91f..bfaa29a6e3 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/ai/model/AiModelConfig.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/ai/model/AiModelConfig.java @@ -38,7 +38,7 @@ import org.thingsboard.server.common.data.ai.provider.OpenAiProviderConfig; @JsonTypeInfo( use = JsonTypeInfo.Id.NAME, - include = JsonTypeInfo.As.PROPERTY, + include = JsonTypeInfo.As.EXISTING_PROPERTY, property = "provider", visible = true ) diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java index 7e2e99e662..b34abe5363 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java @@ -43,7 +43,7 @@ import org.thingsboard.server.queue.edqs.EdqsConfig; import org.thingsboard.server.queue.edqs.EdqsExecutors; import org.thingsboard.server.queue.edqs.KafkaEdqsComponent; import org.thingsboard.server.queue.edqs.KafkaEdqsQueueFactory; -import org.thingsboard.server.queue.kafka.TbKafkaAdmin; +import org.thingsboard.server.queue.kafka.KafkaAdmin; import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate; import java.util.HashMap; @@ -68,6 +68,7 @@ public class KafkaEdqsStateService implements EdqsStateService { private final EdqsExecutors edqsExecutors; private final EdqsMapper mapper; private final TopicService topicService; + private final KafkaAdmin kafkaAdmin; @Autowired @Lazy private EdqsProcessor edqsProcessor; @@ -86,7 +87,6 @@ public class KafkaEdqsStateService implements EdqsStateService { @Override public void init(PartitionedQueueConsumerManager> eventConsumer, List> otherConsumers) { versionsStore = new VersionsStore(config.getVersionsCacheTtl()); - TbKafkaAdmin queueAdmin = queueFactory.getEdqsQueueAdmin(); stateConsumer = PartitionedQueueConsumerManager.>create() .queueKey(new QueueKey(ServiceType.EDQS, config.getStateTopic())) .topic(topicService.buildTopicName(config.getStateTopic())) @@ -106,7 +106,7 @@ public class KafkaEdqsStateService implements EdqsStateService { consumer.commit(); }) .consumerCreator((config, tpi) -> queueFactory.createEdqsStateConsumer()) - .queueAdmin(queueAdmin) + .queueAdmin(queueFactory.getEdqsQueueAdmin()) .consumerExecutor(edqsExecutors.getConsumersExecutor()) .taskExecutor(edqsExecutors.getConsumerTaskExecutor()) .scheduler(edqsExecutors.getScheduler()) @@ -174,7 +174,7 @@ public class KafkaEdqsStateService implements EdqsStateService { // (because we need to be able to consume the same topic-partition by multiple instances) Map offsets = new HashMap<>(); try { - queueAdmin.getConsumerGroupOffsets(eventsToBackupKafkaConsumer.getGroupId()) + kafkaAdmin.getConsumerGroupOffsets(eventsToBackupKafkaConsumer.getGroupId()) .forEach((topicPartition, offsetAndMetadata) -> { offsets.put(topicPartition.topic(), offsetAndMetadata.offset()); }); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/RuleEngineTbQueueAdminFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/RuleEngineTbQueueAdminFactory.java index 6d78fa8b4f..ecb2a2b771 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/RuleEngineTbQueueAdminFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/RuleEngineTbQueueAdminFactory.java @@ -43,7 +43,7 @@ public class RuleEngineTbQueueAdminFactory { return new TbQueueAdmin() { @Override - public void createTopicIfNotExists(String topic, String properties) { + public void createTopicIfNotExists(String topic, String properties, boolean force) { } @Override diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/KafkaAdmin.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/KafkaAdmin.java new file mode 100644 index 0000000000..6261e81497 --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/KafkaAdmin.java @@ -0,0 +1,285 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.kafka; + +import jakarta.annotation.PreDestroy; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.concurrent.ConcurrentException; +import org.apache.commons.lang3.concurrent.LazyInitializer; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.OffsetSpec; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TopicExistsException; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Lazy; +import org.springframework.stereotype.Component; +import org.thingsboard.common.util.CachedValue; +import org.thingsboard.server.queue.util.TbKafkaComponent; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +@TbKafkaComponent +@Component +@Slf4j +public class KafkaAdmin { + + /* + * TODO: Get rid of per consumer/producer TbKafkaAdmin, + * use single KafkaAdmin instance that accepts topicConfigs. + * */ + + private final TbKafkaSettings settings; + + @Value("${queue.kafka.request.timeout.ms:30000}") + private int requestTimeoutMs; + @Value("${queue.kafka.topics_cache_ttl_ms:300000}") // 5 minutes by default + private int topicsCacheTtlMs; + + private final LazyInitializer adminClient; + private final CachedValue> topics; + + public KafkaAdmin(@Lazy TbKafkaSettings settings) { + this.settings = settings; + this.adminClient = LazyInitializer.builder() + .setInitializer(() -> AdminClient.create(settings.toAdminProps())) + .get(); + this.topics = new CachedValue<>(() -> { + Set topics = ConcurrentHashMap.newKeySet(); + topics.addAll(listTopics()); + return topics; + }, topicsCacheTtlMs); + } + + public void createTopicIfNotExists(String topic, Map properties, boolean force) { + Set topics = getTopics(); + if (!force && topics.contains(topic)) { + log.trace("Topic {} already present in cache", topic); + return; + } + + log.debug("Creating topic {} with properties {}", topic, properties); + String numPartitionsStr = properties.remove(TbKafkaTopicConfigs.NUM_PARTITIONS_SETTING); + int partitions = numPartitionsStr != null ? Integer.parseInt(numPartitionsStr) : 1; + NewTopic newTopic = new NewTopic(topic, partitions, settings.getReplicationFactor()).configs(properties); + + try { + getClient().createTopics(List.of(newTopic)).all().get(requestTimeoutMs, TimeUnit.MILLISECONDS); + topics.add(topic); + } catch (ExecutionException ee) { + log.trace("Failed to create topic {} with properties {}", topic, properties, ee); + if (ee.getCause() instanceof TopicExistsException) { + //do nothing + } else { + log.warn("[{}] Failed to create topic", topic, ee); + throw new RuntimeException(ee); + } + } catch (Exception e) { + log.warn("[{}] Failed to create topic", topic, e); + throw new RuntimeException(e); + } + } + + public void deleteTopic(String topic) { + log.debug("Deleting topic {}", topic); + try { + getClient().deleteTopics(List.of(topic)).all().get(requestTimeoutMs, TimeUnit.MILLISECONDS); + } catch (Exception e) { + log.error("Failed to delete kafka topic [{}].", topic, e); + } + } + + private Set getTopics() { + return topics.get(); + } + + public Set listTopics() { + try { + Set topics = getClient().listTopics().names().get(requestTimeoutMs, TimeUnit.MILLISECONDS); + log.trace("Listed topics: {}", topics); + return topics; + } catch (Exception e) { + log.error("Failed to get all topics.", e); + return Collections.emptySet(); + } + } + + public Map getTotalLagForGroupsBulk(Set groupIds) { + Map result = new HashMap<>(); + for (String groupId : groupIds) { + result.put(groupId, getTotalConsumerGroupLag(groupId)); + } + return result; + } + + public long getTotalConsumerGroupLag(String groupId) { + try { + Map committedOffsets = getConsumerGroupOffsets(groupId); + if (committedOffsets.isEmpty()) { + return 0L; + } + + Map latestOffsetsSpec = committedOffsets.keySet().stream() + .collect(Collectors.toMap(tp -> tp, tp -> OffsetSpec.latest())); + + Map endOffsets = + getClient().listOffsets(latestOffsetsSpec).all().get(requestTimeoutMs, TimeUnit.MILLISECONDS); + + return committedOffsets.entrySet().stream() + .mapToLong(entry -> { + TopicPartition tp = entry.getKey(); + long committed = entry.getValue().offset(); + long end = endOffsets.getOrDefault(tp, + new ListOffsetsResult.ListOffsetsResultInfo(0L, 0L, Optional.empty())).offset(); + return end - committed; + }).sum(); + + } catch (Exception e) { + log.error("Failed to get total lag for consumer group: {}", groupId, e); + return 0L; + } + } + + @SneakyThrows + public Map getConsumerGroupOffsets(String groupId) { + return getClient().listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata().get(requestTimeoutMs, TimeUnit.MILLISECONDS); + } + + /** + * Sync offsets from a fat group to a single-partition group + * Migration back from single-partition consumer to a fat group is not supported + * TODO: The best possible approach to synchronize the offsets is to do the synchronization as a part of the save Queue parameters with stop all consumers + * */ + public void syncOffsets(String fatGroupId, String newGroupId, Integer partitionId) { + try { + log.info("syncOffsets [{}][{}][{}]", fatGroupId, newGroupId, partitionId); + if (partitionId == null) { + return; + } + syncOffsetsUnsafe(fatGroupId, newGroupId, "." + partitionId); + } catch (Exception e) { + log.warn("Failed to syncOffsets from {} to {} partitionId {}", fatGroupId, newGroupId, partitionId, e); + } + } + + public void syncOffsetsUnsafe(String fatGroupId, String newGroupId, String topicSuffix) throws ExecutionException, InterruptedException, TimeoutException { + Map oldOffsets = getConsumerGroupOffsets(fatGroupId); + if (oldOffsets.isEmpty()) { + return; + } + + for (var consumerOffset : oldOffsets.entrySet()) { + var tp = consumerOffset.getKey(); + if (!tp.topic().endsWith(topicSuffix)) { + continue; + } + var om = consumerOffset.getValue(); + Map newOffsets = getConsumerGroupOffsets(newGroupId); + + var existingOffset = newOffsets.get(tp); + if (existingOffset == null) { + log.info("[{}] topic offset does not exists in the new node group {}, all found offsets {}", tp, newGroupId, newOffsets); + } else if (existingOffset.offset() >= om.offset()) { + log.info("[{}] topic offset {} >= than old node group offset {}", tp, existingOffset.offset(), om.offset()); + break; + } else { + log.info("[{}] SHOULD alter topic offset [{}] less than old node group offset [{}]", tp, existingOffset.offset(), om.offset()); + } + getClient().alterConsumerGroupOffsets(newGroupId, Map.of(tp, om)).all().get(requestTimeoutMs, TimeUnit.MILLISECONDS); + log.info("[{}] altered new consumer groupId {}", tp, newGroupId); + break; + } + } + + public boolean isTopicEmpty(String topic) { + return areAllTopicsEmpty(Set.of(topic)); + } + + public boolean areAllTopicsEmpty(Set topics) { + try { + List existingTopics = getTopics().stream().filter(topics::contains).toList(); + if (existingTopics.isEmpty()) { + return true; + } + + List allPartitions = getClient().describeTopics(existingTopics).allTopicNames().get(requestTimeoutMs, TimeUnit.MILLISECONDS) + .entrySet().stream() + .flatMap(entry -> { + String topic = entry.getKey(); + TopicDescription topicDescription = entry.getValue(); + return topicDescription.partitions().stream().map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition())); + }) + .toList(); + + Map beginningOffsets = getClient().listOffsets(allPartitions.stream() + .collect(Collectors.toMap(partition -> partition, partition -> OffsetSpec.earliest()))).all().get(requestTimeoutMs, TimeUnit.MILLISECONDS); + Map endOffsets = getClient().listOffsets(allPartitions.stream() + .collect(Collectors.toMap(partition -> partition, partition -> OffsetSpec.latest()))).all().get(requestTimeoutMs, TimeUnit.MILLISECONDS); + + for (TopicPartition partition : allPartitions) { + long beginningOffset = beginningOffsets.get(partition).offset(); + long endOffset = endOffsets.get(partition).offset(); + + if (beginningOffset != endOffset) { + log.debug("Partition [{}] of topic [{}] is not empty. Returning false.", partition.partition(), partition.topic()); + return false; + } + } + return true; + } catch (Exception e) { + log.error("Failed to check if topics [{}] empty.", topics, e); + return false; + } + } + + public void deleteConsumerGroup(String consumerGroupId) { + try { + getClient().deleteConsumerGroups(List.of(consumerGroupId)).all().get(requestTimeoutMs, TimeUnit.MILLISECONDS); + } catch (Exception e) { + log.warn("Failed to delete consumer group {}", consumerGroupId, e); + } + } + + public AdminClient getClient() { + try { + return adminClient.get(); + } catch (ConcurrentException e) { + throw new RuntimeException("Failed to initialize Kafka admin client", e); + } + } + + @PreDestroy + private void destroy() throws Exception { + if (adminClient.isInitialized()) { + adminClient.get().close(); + } + } + +} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java index 47a7ef13e3..65e9d5e4c4 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaAdmin.java @@ -16,31 +16,12 @@ package org.thingsboard.server.queue.kafka; import lombok.Getter; -import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.admin.CreateTopicsResult; -import org.apache.kafka.clients.admin.ListOffsetsResult; -import org.apache.kafka.clients.admin.NewTopic; -import org.apache.kafka.clients.admin.OffsetSpec; -import org.apache.kafka.clients.admin.TopicDescription; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.TopicExistsException; import org.thingsboard.server.queue.TbEdgeQueueAdmin; import org.thingsboard.server.queue.TbQueueAdmin; import org.thingsboard.server.queue.util.PropertyUtils; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.stream.Collectors; /** * Created by ashvayka on 24.09.18. @@ -52,251 +33,42 @@ public class TbKafkaAdmin implements TbQueueAdmin, TbEdgeQueueAdmin { private final Map topicConfigs; @Getter private final int numPartitions; - private volatile Set topics; - - private final short replicationFactor; public TbKafkaAdmin(TbKafkaSettings settings, Map topicConfigs) { this.settings = settings; this.topicConfigs = topicConfigs; - String numPartitionsStr = topicConfigs.get(TbKafkaTopicConfigs.NUM_PARTITIONS_SETTING); if (numPartitionsStr != null) { numPartitions = Integer.parseInt(numPartitionsStr); } else { numPartitions = 1; } - replicationFactor = settings.getReplicationFactor(); } @Override - public void createTopicIfNotExists(String topic, String properties) { - Set topics = getTopics(); - if (topics.contains(topic)) { - return; - } - try { - Map configs = PropertyUtils.getProps(topicConfigs, properties); - configs.remove(TbKafkaTopicConfigs.NUM_PARTITIONS_SETTING); - NewTopic newTopic = new NewTopic(topic, numPartitions, replicationFactor).configs(configs); - createTopic(newTopic).values().get(topic).get(); - topics.add(topic); - } catch (ExecutionException ee) { - if (ee.getCause() instanceof TopicExistsException) { - //do nothing - } else { - log.warn("[{}] Failed to create topic", topic, ee); - throw new RuntimeException(ee); - } - } catch (Exception e) { - log.warn("[{}] Failed to create topic", topic, e); - throw new RuntimeException(e); - } + public void createTopicIfNotExists(String topic, String properties, boolean force) { + settings.getAdmin().createTopicIfNotExists(topic, PropertyUtils.getProps(topicConfigs, properties), force); } @Override public void deleteTopic(String topic) { - Set topics = getTopics(); - if (topics.remove(topic)) { - settings.getAdminClient().deleteTopics(Collections.singletonList(topic)); - } else { - try { - if (settings.getAdminClient().listTopics().names().get().contains(topic)) { - settings.getAdminClient().deleteTopics(Collections.singletonList(topic)); - } else { - log.warn("Kafka topic [{}] does not exist.", topic); - } - } catch (InterruptedException | ExecutionException e) { - log.error("Failed to delete kafka topic [{}].", topic, e); - } - } - } - - private Set getTopics() { - if (topics == null) { - synchronized (this) { - if (topics == null) { - topics = ConcurrentHashMap.newKeySet(); - try { - topics.addAll(settings.getAdminClient().listTopics().names().get()); - } catch (InterruptedException | ExecutionException e) { - log.error("Failed to get all topics.", e); - } - } - } - } - return topics; - } - - public Set getAllTopics() { - try { - return settings.getAdminClient().listTopics().names().get(); - } catch (InterruptedException | ExecutionException e) { - log.error("Failed to get all topics.", e); - } - return null; - } - - public CreateTopicsResult createTopic(NewTopic topic) { - return settings.getAdminClient().createTopics(Collections.singletonList(topic)); + settings.getAdmin().deleteTopic(topic); } @Override public void destroy() { } - /** - * Sync offsets from a fat group to a single-partition group - * Migration back from single-partition consumer to a fat group is not supported - * TODO: The best possible approach to synchronize the offsets is to do the synchronization as a part of the save Queue parameters with stop all consumers - * */ - public void syncOffsets(String fatGroupId, String newGroupId, Integer partitionId) { - try { - log.info("syncOffsets [{}][{}][{}]", fatGroupId, newGroupId, partitionId); - if (partitionId == null) { - return; - } - syncOffsetsUnsafe(fatGroupId, newGroupId, "." + partitionId); - } catch (Exception e) { - log.warn("Failed to syncOffsets from {} to {} partitionId {}", fatGroupId, newGroupId, partitionId, e); - } - } - /** * Sync edge notifications offsets from a fat group to a single group per edge * */ public void syncEdgeNotificationsOffsets(String fatGroupId, String newGroupId) { try { log.info("syncEdgeNotificationsOffsets [{}][{}]", fatGroupId, newGroupId); - syncOffsetsUnsafe(fatGroupId, newGroupId, newGroupId); + settings.getAdmin().syncOffsetsUnsafe(fatGroupId, newGroupId, newGroupId); } catch (Exception e) { log.warn("Failed to syncEdgeNotificationsOffsets from {} to {}", fatGroupId, newGroupId, e); } } - @Override - public void deleteConsumerGroup(String consumerGroupId) { - try { - settings.getAdminClient().deleteConsumerGroups(Collections.singletonList(consumerGroupId)); - } catch (Exception e) { - log.warn("Failed to delete consumer group {}", consumerGroupId, e); - } - } - - void syncOffsetsUnsafe(String fatGroupId, String newGroupId, String topicSuffix) throws ExecutionException, InterruptedException, TimeoutException { - Map oldOffsets = getConsumerGroupOffsets(fatGroupId); - if (oldOffsets.isEmpty()) { - return; - } - - for (var consumerOffset : oldOffsets.entrySet()) { - var tp = consumerOffset.getKey(); - if (!tp.topic().endsWith(topicSuffix)) { - continue; - } - var om = consumerOffset.getValue(); - Map newOffsets = getConsumerGroupOffsets(newGroupId); - - var existingOffset = newOffsets.get(tp); - if (existingOffset == null) { - log.info("[{}] topic offset does not exists in the new node group {}, all found offsets {}", tp, newGroupId, newOffsets); - } else if (existingOffset.offset() >= om.offset()) { - log.info("[{}] topic offset {} >= than old node group offset {}", tp, existingOffset.offset(), om.offset()); - break; - } else { - log.info("[{}] SHOULD alter topic offset [{}] less than old node group offset [{}]", tp, existingOffset.offset(), om.offset()); - } - settings.getAdminClient().alterConsumerGroupOffsets(newGroupId, Map.of(tp, om)).all().get(10, TimeUnit.SECONDS); - log.info("[{}] altered new consumer groupId {}", tp, newGroupId); - break; - } - } - - @SneakyThrows - public Map getConsumerGroupOffsets(String groupId) { - return settings.getAdminClient().listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS); - } - - public boolean isTopicEmpty(String topic) { - return areAllTopicsEmpty(Set.of(topic)); - } - - public boolean areAllTopicsEmpty(Set topics) { - try { - List existingTopics = getTopics().stream().filter(topics::contains).toList(); - if (existingTopics.isEmpty()) { - return true; - } - - List allPartitions = settings.getAdminClient().describeTopics(existingTopics).topicNameValues().entrySet().stream() - .flatMap(entry -> { - String topic = entry.getKey(); - TopicDescription topicDescription; - try { - topicDescription = entry.getValue().get(); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } - return topicDescription.partitions().stream().map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition())); - }) - .toList(); - - Map beginningOffsets = settings.getAdminClient().listOffsets(allPartitions.stream() - .collect(Collectors.toMap(partition -> partition, partition -> OffsetSpec.earliest()))).all().get(); - Map endOffsets = settings.getAdminClient().listOffsets(allPartitions.stream() - .collect(Collectors.toMap(partition -> partition, partition -> OffsetSpec.latest()))).all().get(); - - for (TopicPartition partition : allPartitions) { - long beginningOffset = beginningOffsets.get(partition).offset(); - long endOffset = endOffsets.get(partition).offset(); - - if (beginningOffset != endOffset) { - log.debug("Partition [{}] of topic [{}] is not empty. Returning false.", partition.partition(), partition.topic()); - return false; - } - } - return true; - } catch (InterruptedException | ExecutionException e) { - log.error("Failed to check if topics [{}] empty.", topics, e); - return false; - } - } - - public Map getTotalLagForGroupsBulk(Set groupIds) { - Map result = new HashMap<>(); - for (String groupId : groupIds) { - result.put(groupId, getTotalConsumerGroupLag(groupId)); - } - return result; - } - - public long getTotalConsumerGroupLag(String groupId) { - try { - Map committedOffsets = getConsumerGroupOffsets(groupId); - if (committedOffsets.isEmpty()) { - return 0L; - } - - Map latestOffsetsSpec = committedOffsets.keySet().stream() - .collect(Collectors.toMap(tp -> tp, tp -> OffsetSpec.latest())); - - Map endOffsets = - settings.getAdminClient().listOffsets(latestOffsetsSpec) - .all().get(10, TimeUnit.SECONDS); - - return committedOffsets.entrySet().stream() - .mapToLong(entry -> { - TopicPartition tp = entry.getKey(); - long committed = entry.getValue().offset(); - long end = endOffsets.getOrDefault(tp, - new ListOffsetsResult.ListOffsetsResultInfo(0L, 0L, Optional.empty())).offset(); - return end - committed; - }).sum(); - - } catch (Exception e) { - log.error("Failed to get total lag for consumer group: {}", groupId, e); - return 0L; - } - } - } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerStatisticConfig.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerStatisticConfig.java index d44f9ee700..12a3ea8fa6 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerStatisticConfig.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerStatisticConfig.java @@ -19,11 +19,11 @@ import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NoArgsConstructor; import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; +import org.thingsboard.server.queue.util.TbKafkaComponent; @Component -@ConditionalOnProperty(prefix = "queue", value = "type", havingValue = "kafka") +@TbKafkaComponent @Getter @AllArgsConstructor @NoArgsConstructor diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerStatsService.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerStatsService.java index 7a9c01b72f..e1cfb995c8 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerStatsService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerStatsService.java @@ -26,10 +26,10 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.server.common.data.StringUtils; +import org.thingsboard.server.queue.util.TbKafkaComponent; import java.time.Duration; import java.util.ArrayList; @@ -44,11 +44,12 @@ import java.util.concurrent.TimeUnit; @Slf4j @Component @RequiredArgsConstructor -@ConditionalOnProperty(prefix = "queue", value = "type", havingValue = "kafka") +@TbKafkaComponent public class TbKafkaConsumerStatsService { private final Set monitoredGroups = ConcurrentHashMap.newKeySet(); private final TbKafkaSettings kafkaSettings; + private final KafkaAdmin kafkaAdmin; private final TbKafkaConsumerStatisticConfig statsConfig; private Consumer consumer; @@ -77,7 +78,7 @@ public class TbKafkaConsumerStatsService { } for (String groupId : monitoredGroups) { try { - Map groupOffsets = kafkaSettings.getAdminClient().listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata() + Map groupOffsets = kafkaSettings.getAdmin().getClient().listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata() .get(statsConfig.getKafkaResponseTimeoutMs(), TimeUnit.MILLISECONDS); Map endOffsets = consumer.endOffsets(groupOffsets.keySet(), timeoutDuration); @@ -159,12 +160,14 @@ public class TbKafkaConsumerStatsService { @Override public String toString() { return "[" + - "topic=[" + topic + ']' + - ", partition=[" + partition + "]" + - ", committedOffset=[" + committedOffset + "]" + - ", endOffset=[" + endOffset + "]" + - ", lag=[" + lag + "]" + - "]"; + "topic=[" + topic + ']' + + ", partition=[" + partition + "]" + + ", committedOffset=[" + committedOffset + "]" + + ", endOffset=[" + endOffset + "]" + + ", lag=[" + lag + "]" + + "]"; } + } + } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java index 06ccfe3a69..11736f68cf 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaSettings.java @@ -16,12 +16,10 @@ package org.thingsboard.server.queue.kafka; import jakarta.annotation.PostConstruct; -import jakarta.annotation.PreDestroy; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; @@ -30,12 +28,13 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; import org.thingsboard.server.common.data.TbProperty; import org.thingsboard.server.queue.util.PropertyUtils; +import org.thingsboard.server.queue.util.TbKafkaComponent; import java.util.HashMap; import java.util.LinkedHashMap; @@ -47,7 +46,7 @@ import java.util.Properties; * Created by ashvayka on 25.09.18. */ @Slf4j -@ConditionalOnProperty(prefix = "queue", value = "type", havingValue = "kafka") +@TbKafkaComponent @ConfigurationProperties(prefix = "queue.kafka") @Component public class TbKafkaSettings { @@ -143,6 +142,9 @@ public class TbKafkaSettings { @Value("${queue.kafka.consumer-properties-per-topic-inline:}") private String consumerPropertiesPerTopicInline; + @Autowired + private KafkaAdmin kafkaAdmin; + @Deprecated @Setter private List other; @@ -150,8 +152,6 @@ public class TbKafkaSettings { @Setter private Map> consumerPropertiesPerTopic = new HashMap<>(); - private volatile AdminClient adminClient; - @PostConstruct public void initInlineTopicProperties() { Map> inlineProps = parseTopicPropertyList(consumerPropertiesPerTopicInline); @@ -240,15 +240,12 @@ public class TbKafkaSettings { } } - public AdminClient getAdminClient() { - if (adminClient == null) { - synchronized (this) { - if (adminClient == null) { - adminClient = AdminClient.create(toAdminProps()); - } - } - } - return adminClient; + /* + * Temporary solution to avoid major code changes. + * FIXME: use single instance of Kafka queue admin, don't create a separate one for each consumer/producer + * */ + public KafkaAdmin getAdmin() { + return kafkaAdmin; } protected Properties toAdminProps() { @@ -279,11 +276,4 @@ public class TbKafkaSettings { return result; } - @PreDestroy - private void destroy() { - if (adminClient != null) { - adminClient.close(); - } - } - } diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaTopicConfigs.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaTopicConfigs.java index 5d5834d20a..c50fd0d720 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaTopicConfigs.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaTopicConfigs.java @@ -18,14 +18,14 @@ package org.thingsboard.server.queue.kafka; import jakarta.annotation.PostConstruct; import lombok.Getter; import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; import org.thingsboard.server.queue.util.PropertyUtils; +import org.thingsboard.server.queue.util.TbKafkaComponent; import java.util.Map; @Component -@ConditionalOnProperty(prefix = "queue", value = "type", havingValue = "kafka") +@TbKafkaComponent public class TbKafkaTopicConfigs { public static final String NUM_PARTITIONS_SETTING = "partitions"; diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryTbTransportQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryTbTransportQueueFactory.java index 75d53d9830..2c932534b1 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryTbTransportQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/InMemoryTbTransportQueueFactory.java @@ -78,7 +78,7 @@ public class InMemoryTbTransportQueueFactory implements TbTransportQueueFactory templateBuilder.queueAdmin(new TbQueueAdmin() { @Override - public void createTopicIfNotExists(String topic, String properties) {} + public void createTopicIfNotExists(String topic, String properties, boolean force) {} @Override public void destroy() {} diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java index 866f8d235e..245330cc3e 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaMonolithQueueFactory.java @@ -58,6 +58,7 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.edqs.EdqsConfig; +import org.thingsboard.server.queue.kafka.KafkaAdmin; import org.thingsboard.server.queue.kafka.TbKafkaAdmin; import org.thingsboard.server.queue.kafka.TbKafkaConsumerStatsService; import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate; @@ -83,6 +84,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi private final TopicService topicService; private final TbKafkaSettings kafkaSettings; + private final KafkaAdmin kafkaAdmin; private final TbServiceInfoProvider serviceInfoProvider; private final TbQueueCoreSettings coreSettings; private final TbQueueRuleEngineSettings ruleEngineSettings; @@ -118,7 +120,9 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi private final AtomicLong consumerCount = new AtomicLong(); private final AtomicLong edgeConsumerCount = new AtomicLong(); - public KafkaMonolithQueueFactory(TopicService topicService, TbKafkaSettings kafkaSettings, + public KafkaMonolithQueueFactory(TopicService topicService, + TbKafkaSettings kafkaSettings, + KafkaAdmin kafkaAdmin, TbServiceInfoProvider serviceInfoProvider, TbQueueCoreSettings coreSettings, TbQueueRuleEngineSettings ruleEngineSettings, @@ -134,6 +138,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi TasksQueueConfig tasksQueueConfig) { this.topicService = topicService; this.kafkaSettings = kafkaSettings; + this.kafkaAdmin = kafkaAdmin; this.serviceInfoProvider = serviceInfoProvider; this.coreSettings = coreSettings; this.ruleEngineSettings = ruleEngineSettings; @@ -240,7 +245,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi String queueName = configuration.getName(); String groupId = topicService.buildConsumerGroupId("re-", configuration.getTenantId(), queueName, partitionId); - ruleEngineAdmin.syncOffsets(topicService.buildConsumerGroupId("re-", configuration.getTenantId(), queueName, null), // the fat groupId + kafkaAdmin.syncOffsets(topicService.buildConsumerGroupId("re-", configuration.getTenantId(), queueName, null), // the fat groupId groupId, partitionId); TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java index 3b67ea4f9f..dbf4ab1aba 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/provider/KafkaTbRuleEngineQueueFactory.java @@ -52,6 +52,7 @@ import org.thingsboard.server.queue.common.TbProtoQueueMsg; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.discovery.TopicService; import org.thingsboard.server.queue.edqs.EdqsConfig; +import org.thingsboard.server.queue.kafka.KafkaAdmin; import org.thingsboard.server.queue.kafka.TbKafkaAdmin; import org.thingsboard.server.queue.kafka.TbKafkaConsumerStatsService; import org.thingsboard.server.queue.kafka.TbKafkaConsumerTemplate; @@ -74,6 +75,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { private final TopicService topicService; private final TbKafkaSettings kafkaSettings; + private final KafkaAdmin kafkaAdmin; private final TbServiceInfoProvider serviceInfoProvider; private final TbQueueCoreSettings coreSettings; private final TbQueueRuleEngineSettings ruleEngineSettings; @@ -99,6 +101,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { private final AtomicLong consumerCount = new AtomicLong(); public KafkaTbRuleEngineQueueFactory(TopicService topicService, TbKafkaSettings kafkaSettings, + KafkaAdmin kafkaAdmin, TbServiceInfoProvider serviceInfoProvider, TbQueueCoreSettings coreSettings, TbQueueRuleEngineSettings ruleEngineSettings, @@ -111,6 +114,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { TbKafkaTopicConfigs kafkaTopicConfigs) { this.topicService = topicService; this.kafkaSettings = kafkaSettings; + this.kafkaAdmin = kafkaAdmin; this.serviceInfoProvider = serviceInfoProvider; this.coreSettings = coreSettings; this.ruleEngineSettings = ruleEngineSettings; @@ -234,7 +238,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory { String queueName = configuration.getName(); String groupId = topicService.buildConsumerGroupId("re-", configuration.getTenantId(), queueName, partitionId); - ruleEngineAdmin.syncOffsets(topicService.buildConsumerGroupId("re-", configuration.getTenantId(), queueName, null), // the fat groupId + kafkaAdmin.syncOffsets(topicService.buildConsumerGroupId("re-", configuration.getTenantId(), queueName, null), // the fat groupId groupId, partitionId); TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder> consumerBuilder = TbKafkaConsumerTemplate.builder(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/util/TbKafkaComponent.java b/common/queue/src/main/java/org/thingsboard/server/queue/util/TbKafkaComponent.java new file mode 100644 index 0000000000..ad4862e36d --- /dev/null +++ b/common/queue/src/main/java/org/thingsboard/server/queue/util/TbKafkaComponent.java @@ -0,0 +1,29 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.util; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; + +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Inherited +@Retention(RetentionPolicy.RUNTIME) +@Target({java.lang.annotation.ElementType.TYPE, java.lang.annotation.ElementType.METHOD}) +@ConditionalOnProperty(prefix = "queue", value = "type", havingValue = "kafka") +public @interface TbKafkaComponent {} diff --git a/common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaSettingsTest.java b/common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaSettingsTest.java index ad026c63aa..bc37982245 100644 --- a/common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaSettingsTest.java +++ b/common/queue/src/test/java/org/thingsboard/server/queue/kafka/TbKafkaSettingsTest.java @@ -28,7 +28,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.spy; -@SpringBootTest(classes = TbKafkaSettings.class) +@SpringBootTest(classes = {TbKafkaSettings.class, KafkaAdmin.class}) @TestPropertySource(properties = { "queue.type=kafka", "queue.kafka.bootstrap.servers=localhost:9092", diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/config/ssl/SslCredentialsWebServerCustomizer.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/config/ssl/SslCredentialsWebServerCustomizer.java index 6c73bb03de..fc64be6e8b 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/config/ssl/SslCredentialsWebServerCustomizer.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/config/ssl/SslCredentialsWebServerCustomizer.java @@ -59,6 +59,7 @@ public class SslCredentialsWebServerCustomizer implements WebServerFactoryCustom public void customize(ConfigurableServletWebServerFactory factory) { SslCredentials sslCredentials = this.httpServerSslCredentialsConfig.getCredentials(); Ssl ssl = serverProperties.getSsl(); + ssl.setBundle("default"); ssl.setKeyAlias(sslCredentials.getKeyAlias()); ssl.setKeyPassword(sslCredentials.getKeyPassword()); factory.setSsl(ssl); diff --git a/common/util/src/main/java/org/thingsboard/common/util/CachedValue.java b/common/util/src/main/java/org/thingsboard/common/util/CachedValue.java new file mode 100644 index 0000000000..b0a41c2a42 --- /dev/null +++ b/common/util/src/main/java/org/thingsboard/common/util/CachedValue.java @@ -0,0 +1,38 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.common.util; + +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; + +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +public class CachedValue { + + private final LoadingCache cache; + + public CachedValue(Supplier supplier, long valueTtlMs) { + this.cache = Caffeine.newBuilder() + .expireAfterWrite(valueTtlMs, TimeUnit.MILLISECONDS) + .build(__ -> supplier.get()); + } + + public V get() { + return cache.get(this); + } + +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java index 9803670d4b..62b35caa7f 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java @@ -26,6 +26,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Primary; import org.springframework.stereotype.Service; import org.thingsboard.server.common.data.AttributeScope; +import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.ObjectType; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.edqs.AttributeKv; @@ -118,7 +119,8 @@ public class BaseAttributesService implements AttributesService { List> futures = new ArrayList<>(attributes.size()); for (AttributeKvEntry attribute : attributes) { ListenableFuture future = Futures.transform(attributesDao.save(tenantId, entityId, scope, attribute), version -> { - edqsService.onUpdate(tenantId, ObjectType.ATTRIBUTE_KV, new AttributeKv(entityId, scope, attribute, version)); + TenantId edqsTenantId = entityId.getEntityType() == EntityType.TENANT ? (TenantId) entityId : tenantId; + edqsService.onUpdate(edqsTenantId, ObjectType.ATTRIBUTE_KV, new AttributeKv(entityId, scope, attribute, version)); return version; }, MoreExecutors.directExecutor()); futures.add(future); @@ -136,7 +138,8 @@ public class BaseAttributesService implements AttributesService { String key = keyVersionPair.getFirst(); Long version = keyVersionPair.getSecond(); if (version != null) { - edqsService.onDelete(tenantId, ObjectType.ATTRIBUTE_KV, new AttributeKv(entityId, scope, key, version)); + TenantId edqsTenantId = entityId.getEntityType() == EntityType.TENANT ? (TenantId) entityId : tenantId; + edqsService.onDelete(edqsTenantId, ObjectType.ATTRIBUTE_KV, new AttributeKv(entityId, scope, key, version)); } keys.add(key); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java index d99413f13a..44b2daaf8e 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java @@ -30,6 +30,7 @@ import org.springframework.stereotype.Service; import org.thingsboard.server.cache.TbCacheValueWrapper; import org.thingsboard.server.cache.VersionedTbCache; import org.thingsboard.server.common.data.AttributeScope; +import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.ObjectType; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.edqs.AttributeKv; @@ -239,7 +240,8 @@ public class CachedAttributesService implements AttributesService { ListenableFuture future = Futures.transform(attributesDao.save(tenantId, entityId, scope, attribute), version -> { BaseAttributeKvEntry attributeKvEntry = new BaseAttributeKvEntry(((BaseAttributeKvEntry) attribute).getKv(), attribute.getLastUpdateTs(), version); put(entityId, scope, attributeKvEntry); - edqsService.onUpdate(tenantId, ObjectType.ATTRIBUTE_KV, new AttributeKv(entityId, scope, attributeKvEntry, version)); + TenantId edqsTenantId = entityId.getEntityType() == EntityType.TENANT ? (TenantId) entityId : tenantId; + edqsService.onUpdate(edqsTenantId, ObjectType.ATTRIBUTE_KV, new AttributeKv(entityId, scope, attributeKvEntry, version)); return version; }, cacheExecutor); futures.add(future); @@ -263,7 +265,8 @@ public class CachedAttributesService implements AttributesService { Long version = keyVersionPair.getSecond(); cache.evict(new AttributeCacheKey(scope, entityId, key), version); if (version != null) { - edqsService.onDelete(tenantId, ObjectType.ATTRIBUTE_KV, new AttributeKv(entityId, scope, key, version)); + TenantId edqsTenantId = entityId.getEntityType() == EntityType.TENANT ? (TenantId) entityId : tenantId; + edqsService.onDelete(edqsTenantId, ObjectType.ATTRIBUTE_KV, new AttributeKv(entityId, scope, key, version)); } return key; }, cacheExecutor)).toList()); diff --git a/dao/src/main/java/org/thingsboard/server/dao/service/NoXssValidator.java b/dao/src/main/java/org/thingsboard/server/dao/service/NoXssValidator.java index 1fbe682831..ff71be4298 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/service/NoXssValidator.java +++ b/dao/src/main/java/org/thingsboard/server/dao/service/NoXssValidator.java @@ -26,9 +26,13 @@ import org.owasp.validator.html.ScanException; import org.thingsboard.server.common.data.validation.NoXss; import java.util.Optional; +import java.util.regex.Pattern; @Slf4j public class NoXssValidator implements ConstraintValidator { + + private static final Pattern JS_TEMPLATE_PATTERN = Pattern.compile("\\{\\{.*}}", Pattern.DOTALL); + private static final AntiSamy xssChecker = new AntiSamy(); private static final Policy xssPolicy; @@ -59,6 +63,9 @@ public class NoXssValidator implements ConstraintValidator { if (stringValue.isEmpty()) { return true; } + if (JS_TEMPLATE_PATTERN.matcher(stringValue).find()) { + return false; + } try { return xssChecker.scan(stringValue, xssPolicy).getNumberOfErrors() == 0; } catch (ScanException | PolicyException e) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java index cecf4ab587..ceb7fcf822 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java @@ -196,7 +196,8 @@ public class BaseTimeseriesService implements TimeseriesService { if (saveLatest) { latestFutures.add(Futures.transform(timeseriesLatestDao.saveLatest(tenantId, entityId, tsKvEntry), version -> { if (version != null) { - edqsService.onUpdate(tenantId, ObjectType.LATEST_TS_KV, new LatestTsKv(entityId, tsKvEntry, version)); + TenantId edqsTenantId = entityId.getEntityType() == EntityType.TENANT ? (TenantId) entityId : tenantId; + edqsService.onUpdate(edqsTenantId, ObjectType.LATEST_TS_KV, new LatestTsKv(entityId, tsKvEntry, version)); } return version; }, MoreExecutors.directExecutor())); @@ -276,7 +277,8 @@ public class BaseTimeseriesService implements TimeseriesService { return Futures.transform(timeseriesLatestDao.removeLatest(tenantId, entityId, query), result -> { if (result.isRemoved()) { Long version = result.getVersion(); - edqsService.onDelete(tenantId, ObjectType.LATEST_TS_KV, new LatestTsKv(entityId, query.getKey(), version)); + TenantId edqsTenantId = entityId.getEntityType() == EntityType.TENANT ? (TenantId) entityId : tenantId; + edqsService.onDelete(edqsTenantId, ObjectType.LATEST_TS_KV, new LatestTsKv(entityId, query.getKey(), version)); } return result; }, MoreExecutors.directExecutor()); diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/DeviceServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/DeviceServiceTest.java index bbbd48aa49..32767043d7 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/DeviceServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/DeviceServiceTest.java @@ -486,6 +486,17 @@ public class DeviceServiceTest extends AbstractServiceTest { }); } + @Test + public void testSaveDeviceWithJSInjection_thenDataValidationException() { + Device device = new Device(); + device.setType("default"); + device.setTenantId(tenantId); + device.setName("{{constructor.constructor('location.href=\"https://evil.com\"')()}}"); + Assertions.assertThrows(DataValidationException.class, () -> { + deviceService.saveDevice(device); + }); + } + @Test public void testSaveDeviceWithInvalidTenant() { Device device = new Device(); diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/NoXssValidatorTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/NoXssValidatorTest.java index 4877938d89..34da80e2db 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/NoXssValidatorTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/NoXssValidatorTest.java @@ -35,7 +35,14 @@ public class NoXssValidatorTest { "

Link!!!

1221", "

Please log in to proceed

Username:

Password:



", " ", - "123 bebe" + "123 bebe", + "{{constructor.constructor('location.href=\"https://evil.com\"')()}}", + " {{constructor.constructor('alert(1)')()}}", + "{{}}", + "{{{constructor.constructor('location.href=\"https://evil.com\"')()}}}", + "test {{constructor.constructor('location.href=\"https://evil.com\"')()}} test", + "{{#if user}}Hello, {{user.name}}{{/if}}", + "{{ user.name }}" }) public void givenEntityWithMaliciousPropertyValue_thenReturnValidationError(String maliciousString) { Asset invalidAsset = new Asset(); diff --git a/transport/coap/src/main/resources/tb-coap-transport.yml b/transport/coap/src/main/resources/tb-coap-transport.yml index c54bf038f2..f40a09c753 100644 --- a/transport/coap/src/main/resources/tb-coap-transport.yml +++ b/transport/coap/src/main/resources/tb-coap-transport.yml @@ -114,9 +114,9 @@ redis: # Minumum number of idle connections that can be maintained in the pool without being closed minIdle: "${REDIS_POOL_CONFIG_MIN_IDLE:16}" # Enable/Disable PING command send when a connection is borrowed - testOnBorrow: "${REDIS_POOL_CONFIG_TEST_ON_BORROW:true}" + testOnBorrow: "${REDIS_POOL_CONFIG_TEST_ON_BORROW:false}" # The property is used to specify whether to test the connection before returning it to the connection pool. - testOnReturn: "${REDIS_POOL_CONFIG_TEST_ON_RETURN:true}" + testOnReturn: "${REDIS_POOL_CONFIG_TEST_ON_RETURN:false}" # The property is used in the context of connection pooling in Redis testWhileIdle: "${REDIS_POOL_CONFIG_TEST_WHILE_IDLE:true}" # Minimum amount of time that an idle connection should be idle before it can be evicted from the connection pool. Value set in milliseconds diff --git a/transport/http/src/main/resources/tb-http-transport.yml b/transport/http/src/main/resources/tb-http-transport.yml index 3a3725edef..587894d5ce 100644 --- a/transport/http/src/main/resources/tb-http-transport.yml +++ b/transport/http/src/main/resources/tb-http-transport.yml @@ -147,9 +147,9 @@ redis: # Minumum number of idle connections that can be maintained in the pool without being closed minIdle: "${REDIS_POOL_CONFIG_MIN_IDLE:16}" # Enable/Disable PING command send when a connection is borrowed - testOnBorrow: "${REDIS_POOL_CONFIG_TEST_ON_BORROW:true}" + testOnBorrow: "${REDIS_POOL_CONFIG_TEST_ON_BORROW:false}" # The property is used to specify whether to test the connection before returning it to the connection pool. - testOnReturn: "${REDIS_POOL_CONFIG_TEST_ON_RETURN:true}" + testOnReturn: "${REDIS_POOL_CONFIG_TEST_ON_RETURN:false}" # The property is used in the context of connection pooling in Redis testWhileIdle: "${REDIS_POOL_CONFIG_TEST_WHILE_IDLE:true}" # Minimum amount of time that an idle connection should be idle before it can be evicted from the connection pool. Value set in milliseconds diff --git a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml index 60568a6a4e..0895bfa676 100644 --- a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml +++ b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml @@ -114,9 +114,9 @@ redis: # Minumum number of idle connections that can be maintained in the pool without being closed minIdle: "${REDIS_POOL_CONFIG_MIN_IDLE:16}" # Enable/Disable PING command send when a connection is borrowed - testOnBorrow: "${REDIS_POOL_CONFIG_TEST_ON_BORROW:true}" + testOnBorrow: "${REDIS_POOL_CONFIG_TEST_ON_BORROW:false}" # The property is used to specify whether to test the connection before returning it to the connection pool. - testOnReturn: "${REDIS_POOL_CONFIG_TEST_ON_RETURN:true}" + testOnReturn: "${REDIS_POOL_CONFIG_TEST_ON_RETURN:false}" # The property is used in the context of connection pooling in Redis testWhileIdle: "${REDIS_POOL_CONFIG_TEST_WHILE_IDLE:true}" # Minimum amount of time that an idle connection should be idle before it can be evicted from the connection pool. Value set in milliseconds diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml index 55781abeeb..fae10cc892 100644 --- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml @@ -115,9 +115,9 @@ redis: # Minumum number of idle connections that can be maintained in the pool without being closed minIdle: "${REDIS_POOL_CONFIG_MIN_IDLE:16}" # Enable/Disable PING command send when a connection is borrowed - testOnBorrow: "${REDIS_POOL_CONFIG_TEST_ON_BORROW:true}" + testOnBorrow: "${REDIS_POOL_CONFIG_TEST_ON_BORROW:false}" # The property is used to specify whether to test the connection before returning it to the connection pool. - testOnReturn: "${REDIS_POOL_CONFIG_TEST_ON_RETURN:true}" + testOnReturn: "${REDIS_POOL_CONFIG_TEST_ON_RETURN:false}" # The property is used in the context of connection pooling in Redis testWhileIdle: "${REDIS_POOL_CONFIG_TEST_WHILE_IDLE:true}" # Minimum amount of time that an idle connection should be idle before it can be evicted from the connection pool. Value set in milliseconds diff --git a/transport/snmp/src/main/resources/tb-snmp-transport.yml b/transport/snmp/src/main/resources/tb-snmp-transport.yml index 746e8f2173..79aee31921 100644 --- a/transport/snmp/src/main/resources/tb-snmp-transport.yml +++ b/transport/snmp/src/main/resources/tb-snmp-transport.yml @@ -114,9 +114,9 @@ redis: # Minumum number of idle connections that can be maintained in the pool without being closed minIdle: "${REDIS_POOL_CONFIG_MIN_IDLE:16}" # Enable/Disable PING command send when a connection is borrowed - testOnBorrow: "${REDIS_POOL_CONFIG_TEST_ON_BORROW:true}" + testOnBorrow: "${REDIS_POOL_CONFIG_TEST_ON_BORROW:false}" # The property is used to specify whether to test the connection before returning it to the connection pool. - testOnReturn: "${REDIS_POOL_CONFIG_TEST_ON_RETURN:true}" + testOnReturn: "${REDIS_POOL_CONFIG_TEST_ON_RETURN:false}" # The property is used in the context of connection pooling in Redis testWhileIdle: "${REDIS_POOL_CONFIG_TEST_WHILE_IDLE:true}" # Minimum amount of time that an idle connection should be idle before it can be evicted from the connection pool. Value set in milliseconds diff --git a/ui-ngx/src/app/modules/home/components/ai-model/ai-model-dialog.component.ts b/ui-ngx/src/app/modules/home/components/ai-model/ai-model-dialog.component.ts index db5d1d7e23..c459d66f12 100644 --- a/ui-ngx/src/app/modules/home/components/ai-model/ai-model-dialog.component.ts +++ b/ui-ngx/src/app/modules/home/components/ai-model/ai-model-dialog.component.ts @@ -36,6 +36,7 @@ import { import { AiModelService } from '@core/http/ai-model.service'; import { CheckConnectivityDialogComponent } from '@home/components/ai-model/check-connectivity-dialog.component'; import { map } from 'rxjs/operators'; +import { deepTrim } from '@core/utils'; export interface AIModelDialogData { AIModel?: AiModel; @@ -162,6 +163,6 @@ export class AIModelDialogComponent extends DialogComponent this.dialogRef.close(aiModel)); + this.aiModelService.saveAiModel(deepTrim(aiModel)).subscribe(aiModel => this.dialogRef.close(aiModel)); } } diff --git a/ui-ngx/src/app/modules/home/components/rule-node/external/ai-config.component.html b/ui-ngx/src/app/modules/home/components/rule-node/external/ai-config.component.html index 80519cea28..f590dae84f 100644 --- a/ui-ngx/src/app/modules/home/components/rule-node/external/ai-config.component.html +++ b/ui-ngx/src/app/modules/home/components/rule-node/external/ai-config.component.html @@ -79,25 +79,24 @@
{{ 'rule-node-config.ai.response-format' | translate }}
- + {{ 'rule-node-config.ai.response-text' | translate }} {{ 'rule-node-config.ai.response-json' | translate }} {{ 'rule-node-config.ai.response-json-schema' | translate }} - @if (aiConfigForm.get('responseFormat.type').value === responseFormat.JSON_SCHEMA) { - - - - } + + +
diff --git a/ui-ngx/src/app/modules/home/components/rule-node/external/ai-config.component.ts b/ui-ngx/src/app/modules/home/components/rule-node/external/ai-config.component.ts index 47313da81d..1ef8ebca72 100644 --- a/ui-ngx/src/app/modules/home/components/rule-node/external/ai-config.component.ts +++ b/ui-ngx/src/app/modules/home/components/rule-node/external/ai-config.component.ts @@ -23,6 +23,7 @@ import { AIModelDialogComponent, AIModelDialogData } from '@home/components/ai-m import { AiModel, AiRuleNodeResponseFormatTypeOnlyText, ResponseFormat } from '@shared/models/ai-model.models'; import { deepTrim } from '@core/utils'; import { TranslateService } from '@ngx-translate/core'; +import { jsonRequired } from '@shared/components/json-object-edit.component'; @Component({ selector: 'tb-external-node-ai-config', @@ -37,8 +38,6 @@ export class AiConfigComponent extends RuleNodeConfigurationComponent { responseFormat = ResponseFormat; - disabledResponseFormatType: boolean; - constructor(private fb: UntypedFormBuilder, private translate: TranslateService, private dialog: MatDialog) { @@ -56,7 +55,7 @@ export class AiConfigComponent extends RuleNodeConfigurationComponent { userPrompt: [configuration?.userPrompt ?? '', [Validators.required, Validators.maxLength(10000), Validators.pattern(/.*\S.*/)]], responseFormat: this.fb.group({ type: [configuration?.responseFormat?.type ?? ResponseFormat.JSON, []], - schema: [configuration?.responseFormat?.schema ?? null, [Validators.required]], + schema: [configuration?.responseFormat?.schema ?? null, [jsonRequired]], }), timeoutSeconds: [configuration?.timeoutSeconds ?? 60, []], forceAck: [configuration?.forceAck ?? true, []] @@ -88,10 +87,10 @@ export class AiConfigComponent extends RuleNodeConfigurationComponent { if (this.aiConfigForm.get('responseFormat.type').value !== ResponseFormat.TEXT) { this.aiConfigForm.get('responseFormat.type').patchValue(ResponseFormat.TEXT, {emitEvent: true}); } - this.disabledResponseFormatType = true; + this.aiConfigForm.get('responseFormat.type').disable(); } } else { - this.disabledResponseFormatType = false; + this.aiConfigForm.get('responseFormat.type').enable(); } } diff --git a/ui-ngx/src/app/modules/home/pages/ai-model/ai-model-table-header.component.ts b/ui-ngx/src/app/modules/home/pages/ai-model/ai-model-table-header.component.ts index 48889dc877..013642dcb2 100644 --- a/ui-ngx/src/app/modules/home/pages/ai-model/ai-model-table-header.component.ts +++ b/ui-ngx/src/app/modules/home/pages/ai-model/ai-model-table-header.component.ts @@ -23,6 +23,11 @@ import { AiModel } from '@shared/models/ai-model.models'; @Component({ selector: 'tb-ai-model-table-header', templateUrl: './ai-model-table-header.component.html', + styles: [` + :host { + width: 100%; + } + `], styleUrls: [] }) export class AiModelTableHeaderComponent extends EntityTableHeaderComponent {