From 31124311a3108300c014feafb8ee0076df22f027 Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Thu, 4 Aug 2022 14:59:15 +0300 Subject: [PATCH] Device State performance improvements --- .../AbstractPartitionBasedService.java | 32 +++++++--- .../state/DefaultDeviceStateService.java | 50 +++++++++++++-- .../state/DefaultDeviceStateServiceTest.java | 8 +-- .../server/dao/device/DeviceService.java | 2 + .../server/dao/util/DbTypeInfoComponent.java | 22 +++++++ .../dao/util/DefaultDbTypeInfoComponent.java | 33 ++++++++++ .../server/dao/device/DeviceDao.java | 8 +++ .../server/dao/device/DeviceServiceImpl.java | 7 ++ .../device/DefaultNativeDeviceRepository.java | 64 +++++++++++++++++++ .../dao/sql/device/DeviceRepository.java | 3 - .../server/dao/sql/device/JpaDeviceDao.java | 13 +++- .../sql/device/NativeDeviceRepository.java | 26 ++++++++ docker/tb-node.env | 2 - .../src/main/resources/tb-coap-transport.yml | 1 - .../src/main/resources/tb-http-transport.yml | 1 - .../src/main/resources/tb-lwm2m-transport.yml | 1 - .../src/main/resources/tb-mqtt-transport.yml | 1 - .../src/main/resources/tb-snmp-transport.yml | 1 - 18 files changed, 241 insertions(+), 34 deletions(-) create mode 100644 common/dao-api/src/main/java/org/thingsboard/server/dao/util/DbTypeInfoComponent.java create mode 100644 common/dao-api/src/main/java/org/thingsboard/server/dao/util/DefaultDbTypeInfoComponent.java create mode 100644 dao/src/main/java/org/thingsboard/server/dao/sql/device/DefaultNativeDeviceRepository.java create mode 100644 dao/src/main/java/org/thingsboard/server/dao/sql/device/NativeDeviceRepository.java diff --git a/application/src/main/java/org/thingsboard/server/service/partition/AbstractPartitionBasedService.java b/application/src/main/java/org/thingsboard/server/service/partition/AbstractPartitionBasedService.java index 1958e03b19..56a78474a1 100644 --- a/application/src/main/java/org/thingsboard/server/service/partition/AbstractPartitionBasedService.java +++ b/application/src/main/java/org/thingsboard/server/service/partition/AbstractPartitionBasedService.java @@ -35,6 +35,7 @@ import java.util.List; import java.util.Map; import java.util.Queue; import java.util.Set; +import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; @@ -116,8 +117,9 @@ public abstract class AbstractPartitionBasedService extends log.info("[{}] REMOVED PARTITIONS: {}", getServiceName(), removedPartitions); + boolean partitionListChanged = false; // We no longer manage current partition of entities; - removedPartitions.forEach(partition -> { + for (var partition : removedPartitions) { Set entities = partitionedEntities.remove(partition); if (entities != null) { entities.forEach(this::cleanupEntityOnPartitionRemoval); @@ -126,7 +128,8 @@ public abstract class AbstractPartitionBasedService extends if (fetchTasks != null) { fetchTasks.forEach(f -> f.cancel(true)); } - }); + partitionListChanged = true; + } onRepartitionEvent(); @@ -136,21 +139,30 @@ public abstract class AbstractPartitionBasedService extends var fetchTasks = onAddedPartitions(addedPartitions); if (fetchTasks != null && !fetchTasks.isEmpty()) { partitionedFetchTasks.putAll(fetchTasks); - List> futures = new ArrayList<>(); - fetchTasks.values().forEach(futures::addAll); - DonAsynchron.withCallback(Futures.allAsList(futures), - t -> logPartitions(), e -> log.error("Partition fetch task error", e)); - } else { - logPartitions(); } - } else { - logPartitions(); + partitionListChanged = true; + } + + if (partitionListChanged) { + List> partitionFetchFutures = new ArrayList<>(); + partitionedFetchTasks.values().forEach(partitionFetchFutures::addAll); + DonAsynchron.withCallback(Futures.allAsList(partitionFetchFutures), t -> logPartitions(), this::logFailure); } } catch (Throwable t) { log.warn("[{}] Failed to init entities state from DB", getServiceName(), t); } } + private void logFailure(Throwable e) { + if (e instanceof CancellationException) { + //Probably this is fine and happens due to re-balancing. + log.trace("Partition fetch task error", e); + } else { + log.error("Partition fetch task error", e); + } + + } + private void logPartitions() { log.info("[{}] Managing following partitions:", getServiceName()); partitionedEntities.forEach((tpi, entities) -> { diff --git a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java index e34ef5ebe0..667323c487 100644 --- a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java @@ -30,6 +30,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.DataConstants; @@ -64,6 +65,7 @@ import org.thingsboard.server.dao.device.DeviceService; import org.thingsboard.server.dao.sql.query.EntityQueryRepository; import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.dao.timeseries.TimeseriesService; +import org.thingsboard.server.dao.util.DbTypeInfoComponent; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; @@ -81,6 +83,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Random; import java.util.Set; import java.util.UUID; @@ -89,6 +92,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -144,6 +148,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService partition : Lists.partition(entry.getValue(), 1000)) { log.info("[{}] Submit task for device states: {}", entry.getKey(), partition.size()); var devicePackFuture = deviceStateExecutor.submit(() -> { - var states = fetchDeviceStateData(partition); + List states; + if (persistToTelemetry && !dbTypeInfoComponent.isLatestTsDaoStoredToSql()) { + states = fetchDeviceStateDataUsingSeparateRequests(partition); + } else { + states = fetchDeviceStateDataUsingEntityDataQuery(partition); + } for (var state : states) { addDeviceUsingState(entry.getKey(), state); checkAndUpdateState(state.getDeviceId(), state); @@ -445,10 +458,10 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService fetchDeviceStateData(List deviceIds) { + private List fetchDeviceStateDataUsingSeparateRequests(List deviceIds) { + List devices = deviceService.findDevicesByIds(deviceIds.stream().map(DeviceIdInfo::getDeviceId).collect(Collectors.toList())); + List> deviceStateFutures = new ArrayList<>(); + for (Device device : devices) { + deviceStateFutures.add(fetchDeviceState(device)); + } + try { + List result = Futures.successfulAsList(deviceStateFutures).get(5, TimeUnit.MINUTES); + boolean success = true; + for (int i = 0; i < result.size(); i++) { + success = false; + if (result.get(i) == null) { + DeviceIdInfo deviceIdInfo = deviceIds.get(i); + log.warn("[{}][{}] Failed to initialized device state due to:", deviceIdInfo.getTenantId(), deviceIdInfo.getDeviceId()); + } + } + return success ? result : result.stream().filter(Objects::nonNull).collect(Collectors.toList()); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + log.warn("Failed to initialized device state futures for ids: {} due to:", deviceIds, e); + throw new RuntimeException(e); + } + } + + private List fetchDeviceStateDataUsingEntityDataQuery(List deviceIds) { EntityListFilter ef = new EntityListFilter(); ef.setEntityType(EntityType.DEVICE); ef.setEntityList(deviceIds.stream().map(DeviceIdInfo::getDeviceId).map(DeviceId::getId).map(UUID::toString).collect(Collectors.toList())); diff --git a/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateServiceTest.java b/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateServiceTest.java index 17e562fccf..d5d74f2512 100644 --- a/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateServiceTest.java @@ -63,7 +63,7 @@ public class DefaultDeviceStateServiceTest { @Before public void setUp() { - service = spy(new DefaultDeviceStateService(tenantService, deviceService, attributesService, tsService, clusterService, partitionService, serviceInfoProvider, null)); + service = spy(new DefaultDeviceStateService(tenantService, deviceService, attributesService, tsService, clusterService, partitionService, serviceInfoProvider, null, null)); } @Test @@ -71,16 +71,16 @@ public class DefaultDeviceStateServiceTest { service.deviceStates.put(deviceId, deviceStateDataMock); DeviceStateData deviceStateData = service.getOrFetchDeviceStateData(deviceId); assertThat(deviceStateData, is(deviceStateDataMock)); - Mockito.verify(service, never()).fetchDeviceStateData(deviceId); + Mockito.verify(service, never()).fetchDeviceStateDataUsingEntityDataQuery(deviceId); } @Test public void givenDeviceIdWithoutDeviceStateInMap_whenGetOrFetchDeviceStateData_thenFetchDeviceStateData() { service.deviceStates.clear(); - willReturn(deviceStateDataMock).given(service).fetchDeviceStateData(deviceId); + willReturn(deviceStateDataMock).given(service).fetchDeviceStateDataUsingEntityDataQuery(deviceId); DeviceStateData deviceStateData = service.getOrFetchDeviceStateData(deviceId); assertThat(deviceStateData, is(deviceStateDataMock)); - Mockito.verify(service, times(1)).fetchDeviceStateData(deviceId); + Mockito.verify(service, times(1)).fetchDeviceStateDataUsingEntityDataQuery(deviceId); } } \ No newline at end of file diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/device/DeviceService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/device/DeviceService.java index 8f0c4b6451..d68e8e1bf6 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/device/DeviceService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/device/DeviceService.java @@ -81,6 +81,8 @@ public interface DeviceService { ListenableFuture> findDevicesByTenantIdAndIdsAsync(TenantId tenantId, List deviceIds); + List findDevicesByIds(List deviceIds); + ListenableFuture> findDevicesByIdsAsync(List deviceIds); void deleteDevicesByTenantId(TenantId tenantId); diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/util/DbTypeInfoComponent.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/util/DbTypeInfoComponent.java new file mode 100644 index 0000000000..a0a0255bb5 --- /dev/null +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/util/DbTypeInfoComponent.java @@ -0,0 +1,22 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.util; + +public interface DbTypeInfoComponent { + + boolean isLatestTsDaoStoredToSql(); + +} diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/util/DefaultDbTypeInfoComponent.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/util/DefaultDbTypeInfoComponent.java new file mode 100644 index 0000000000..59f595f35a --- /dev/null +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/util/DefaultDbTypeInfoComponent.java @@ -0,0 +1,33 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.util; + +import lombok.Getter; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +@Component +public class DefaultDbTypeInfoComponent implements DbTypeInfoComponent { + + @Value("${database.ts_latest.type:sql}") + @Getter + private String latestTsDbType; + + @Override + public boolean isLatestTsDaoStoredToSql() { + return !latestTsDbType.equalsIgnoreCase("cassandra"); + } +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceDao.java b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceDao.java index 5996801454..7a77c5687e 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceDao.java @@ -129,6 +129,14 @@ public interface DeviceDao extends Dao, TenantEntityDao, ExportableEntit */ ListenableFuture> findDevicesByTenantIdAndIdsAsync(UUID tenantId, List deviceIds); + /** + * Find devices by devices Ids. + * + * @param deviceIds the device Ids + * @return the list of device objects + */ + List findDevicesByIds(List deviceIds); + /** * Find devices by devices Ids. * diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java index 0f76e9a6b9..a4a68f5a79 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java @@ -409,6 +409,13 @@ public class DeviceServiceImpl extends AbstractCachedEntityService findDevicesByIds(List deviceIds) { + log.trace("Executing findDevicesByIdsAsync, deviceIds [{}]", deviceIds); + validateIds(deviceIds, "Incorrect deviceIds " + deviceIds); + return deviceDao.findDevicesByIds(toUUIDs(deviceIds)); + } + @Override public ListenableFuture> findDevicesByIdsAsync(List deviceIds) { log.trace("Executing findDevicesByIdsAsync, deviceIds [{}]", deviceIds); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/device/DefaultNativeDeviceRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/device/DefaultNativeDeviceRepository.java new file mode 100644 index 0000000000..d08ba10b44 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/device/DefaultNativeDeviceRepository.java @@ -0,0 +1,64 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.sql.device; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.domain.Pageable; +import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate; +import org.springframework.stereotype.Repository; +import org.springframework.transaction.support.TransactionTemplate; +import org.thingsboard.server.common.data.DeviceIdInfo; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageData; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; + +@RequiredArgsConstructor +@Repository +@Slf4j +public class DefaultNativeDeviceRepository implements NativeDeviceRepository { + + private final String COUNT_QUERY = "SELECT count(id) FROM device;"; + private final String QUERY = "SELECT tenant_id as tenantId, customer_id as customerId, id as id FROM device ORDER BY created_time ASC LIMIT %s OFFSET %s"; + private final NamedParameterJdbcTemplate jdbcTemplate; + private final TransactionTemplate transactionTemplate; + + @Override + public PageData findDeviceIdInfos(Pageable pageable) { + return transactionTemplate.execute(status -> { + long startTs = System.currentTimeMillis(); + int totalElements = jdbcTemplate.queryForObject(COUNT_QUERY, Collections.emptyMap(), Integer.class); + log.debug("Count query took {} ms", System.currentTimeMillis() - startTs); + startTs = System.currentTimeMillis(); + List> rows = jdbcTemplate.queryForList(String.format(QUERY, pageable.getPageSize(), pageable.getOffset()), Collections.emptyMap()); + log.debug("Main query took {} ms", System.currentTimeMillis() - startTs); + int totalPages = pageable.getPageSize() > 0 ? (int) Math.ceil((float) totalElements / pageable.getPageSize()) : 1; + boolean hasNext = pageable.getPageSize() > 0 && totalElements > pageable.getOffset() + rows.size(); + var data = rows.stream().map(row -> { + UUID id = (UUID) row.get("id"); + var tenantIdObj = row.get("tenantId"); + var customerIdObj = row.get("customerId"); + return new DeviceIdInfo(tenantIdObj != null ? (UUID) tenantIdObj : TenantId.SYS_TENANT_ID.getId(), customerIdObj != null ? (UUID) customerIdObj : null, id); + }).collect(Collectors.toList()); + return new PageData<>(data, totalPages, totalElements, hasNext); + }); + } +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/device/DeviceRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/device/DeviceRepository.java index 701f9266a9..519d665ae8 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/device/DeviceRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/device/DeviceRepository.java @@ -252,7 +252,4 @@ public interface DeviceRepository extends JpaRepository, Exp @Query("SELECT externalId FROM DeviceEntity WHERE id = :id") UUID getExternalIdById(@Param("id") UUID id); - @Query("SELECT new org.thingsboard.server.common.data.DeviceIdInfo(d.tenantId, d.customerId, d.id) FROM DeviceEntity d") - Page findDeviceIdInfos(Pageable pageable); - } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/device/JpaDeviceDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/device/JpaDeviceDao.java index 2858bf8284..2e3fe80448 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/device/JpaDeviceDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/device/JpaDeviceDao.java @@ -59,6 +59,9 @@ public class JpaDeviceDao extends JpaAbstractSearchTextDao @Autowired private DeviceRepository deviceRepository; + @Autowired + private NativeDeviceRepository nativeDeviceRepository; + @Override protected Class getEntityClass() { return DeviceEntity.class; @@ -112,9 +115,14 @@ public class JpaDeviceDao extends JpaAbstractSearchTextDao return service.submit(() -> DaoUtil.convertDataList(deviceRepository.findDevicesByTenantIdAndIdIn(tenantId, deviceIds))); } + @Override + public List findDevicesByIds(List deviceIds) { + return DaoUtil.convertDataList(deviceRepository.findDevicesByIdIn(deviceIds)); + } + @Override public ListenableFuture> findDevicesByIdsAsync(List deviceIds) { - return service.submit(() -> DaoUtil.convertDataList(deviceRepository.findDevicesByIdIn(deviceIds))); + return service.submit(() -> findDevicesByIds(deviceIds)); } @Override @@ -313,8 +321,7 @@ public class JpaDeviceDao extends JpaAbstractSearchTextDao @Override public PageData findDeviceIdInfos(PageLink pageLink) { log.debug("Try to find tenant device id infos by pageLink [{}]", pageLink); - var page = deviceRepository.findDeviceIdInfos(DaoUtil.toPageable(pageLink)); - return new PageData<>(page.getContent(), page.getTotalPages(), page.getTotalElements(), page.hasNext()); + return nativeDeviceRepository.findDeviceIdInfos(DaoUtil.toPageable(pageLink)); } @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/device/NativeDeviceRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/device/NativeDeviceRepository.java new file mode 100644 index 0000000000..b88b0a4340 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/device/NativeDeviceRepository.java @@ -0,0 +1,26 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.sql.device; + +import org.springframework.data.domain.Pageable; +import org.thingsboard.server.common.data.DeviceIdInfo; +import org.thingsboard.server.common.data.page.PageData; + +public interface NativeDeviceRepository { + + PageData findDeviceIdInfos(Pageable pageable); + +} diff --git a/docker/tb-node.env b/docker/tb-node.env index ba66757ecc..85a60eb51a 100644 --- a/docker/tb-node.env +++ b/docker/tb-node.env @@ -7,7 +7,5 @@ TRANSPORT_TYPE=remote HTTP_LOG_CONTROLLER_ERROR_STACK_TRACE=false -TB_QUEUE_PARTITIONS_VIRTUAL_NODES_SIZE=64 - METRICS_ENABLED=true METRICS_ENDPOINTS_EXPOSE=prometheus diff --git a/transport/coap/src/main/resources/tb-coap-transport.yml b/transport/coap/src/main/resources/tb-coap-transport.yml index 516d062589..11f4e91141 100644 --- a/transport/coap/src/main/resources/tb-coap-transport.yml +++ b/transport/coap/src/main/resources/tb-coap-transport.yml @@ -223,7 +223,6 @@ queue: notifications: "${TB_QUEUE_RABBIT_MQ_NOTIFICATIONS_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}" partitions: hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}" - virtual_nodes_size: "${TB_QUEUE_PARTITIONS_VIRTUAL_NODES_SIZE:16}" transport_api: requests_topic: "${TB_QUEUE_TRANSPORT_API_REQUEST_TOPIC:tb_transport.api.requests}" responses_topic: "${TB_QUEUE_TRANSPORT_API_RESPONSE_TOPIC:tb_transport.api.responses}" diff --git a/transport/http/src/main/resources/tb-http-transport.yml b/transport/http/src/main/resources/tb-http-transport.yml index d42e2bf158..79efbd0a3d 100644 --- a/transport/http/src/main/resources/tb-http-transport.yml +++ b/transport/http/src/main/resources/tb-http-transport.yml @@ -210,7 +210,6 @@ queue: notifications: "${TB_QUEUE_RABBIT_MQ_NOTIFICATIONS_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}" partitions: hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}" - virtual_nodes_size: "${TB_QUEUE_PARTITIONS_VIRTUAL_NODES_SIZE:16}" transport_api: requests_topic: "${TB_QUEUE_TRANSPORT_API_REQUEST_TOPIC:tb_transport.api.requests}" responses_topic: "${TB_QUEUE_TRANSPORT_API_RESPONSE_TOPIC:tb_transport.api.responses}" diff --git a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml index 914a9a973b..0f28100385 100644 --- a/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml +++ b/transport/lwm2m/src/main/resources/tb-lwm2m-transport.yml @@ -288,7 +288,6 @@ queue: notifications: "${TB_QUEUE_RABBIT_MQ_NOTIFICATIONS_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}" partitions: hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}" - virtual_nodes_size: "${TB_QUEUE_PARTITIONS_VIRTUAL_NODES_SIZE:16}" transport_api: requests_topic: "${TB_QUEUE_TRANSPORT_API_REQUEST_TOPIC:tb_transport.api.requests}" responses_topic: "${TB_QUEUE_TRANSPORT_API_RESPONSE_TOPIC:tb_transport.api.responses}" diff --git a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml index b11587043d..68a1f71f64 100644 --- a/transport/mqtt/src/main/resources/tb-mqtt-transport.yml +++ b/transport/mqtt/src/main/resources/tb-mqtt-transport.yml @@ -240,7 +240,6 @@ queue: notifications: "${TB_QUEUE_RABBIT_MQ_NOTIFICATIONS_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}" partitions: hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}" - virtual_nodes_size: "${TB_QUEUE_PARTITIONS_VIRTUAL_NODES_SIZE:16}" transport_api: requests_topic: "${TB_QUEUE_TRANSPORT_API_REQUEST_TOPIC:tb_transport.api.requests}" responses_topic: "${TB_QUEUE_TRANSPORT_API_RESPONSE_TOPIC:tb_transport.api.responses}" diff --git a/transport/snmp/src/main/resources/tb-snmp-transport.yml b/transport/snmp/src/main/resources/tb-snmp-transport.yml index 5beb1d5f3f..2ff2c37a90 100644 --- a/transport/snmp/src/main/resources/tb-snmp-transport.yml +++ b/transport/snmp/src/main/resources/tb-snmp-transport.yml @@ -190,7 +190,6 @@ queue: notifications: "${TB_QUEUE_RABBIT_MQ_NOTIFICATIONS_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}" partitions: hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}" - virtual_nodes_size: "${TB_QUEUE_PARTITIONS_VIRTUAL_NODES_SIZE:16}" transport_api: requests_topic: "${TB_QUEUE_TRANSPORT_API_REQUEST_TOPIC:tb_transport.api.requests}" responses_topic: "${TB_QUEUE_TRANSPORT_API_RESPONSE_TOPIC:tb_transport.api.responses}"