Device State performance improvements
This commit is contained in:
parent
ac024aee28
commit
31124311a3
@ -35,6 +35,7 @@ import java.util.List;
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Queue;
|
import java.util.Queue;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.CancellationException;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
@ -116,8 +117,9 @@ public abstract class AbstractPartitionBasedService<T extends EntityId> extends
|
|||||||
|
|
||||||
log.info("[{}] REMOVED PARTITIONS: {}", getServiceName(), removedPartitions);
|
log.info("[{}] REMOVED PARTITIONS: {}", getServiceName(), removedPartitions);
|
||||||
|
|
||||||
|
boolean partitionListChanged = false;
|
||||||
// We no longer manage current partition of entities;
|
// We no longer manage current partition of entities;
|
||||||
removedPartitions.forEach(partition -> {
|
for (var partition : removedPartitions) {
|
||||||
Set<T> entities = partitionedEntities.remove(partition);
|
Set<T> entities = partitionedEntities.remove(partition);
|
||||||
if (entities != null) {
|
if (entities != null) {
|
||||||
entities.forEach(this::cleanupEntityOnPartitionRemoval);
|
entities.forEach(this::cleanupEntityOnPartitionRemoval);
|
||||||
@ -126,7 +128,8 @@ public abstract class AbstractPartitionBasedService<T extends EntityId> extends
|
|||||||
if (fetchTasks != null) {
|
if (fetchTasks != null) {
|
||||||
fetchTasks.forEach(f -> f.cancel(true));
|
fetchTasks.forEach(f -> f.cancel(true));
|
||||||
}
|
}
|
||||||
});
|
partitionListChanged = true;
|
||||||
|
}
|
||||||
|
|
||||||
onRepartitionEvent();
|
onRepartitionEvent();
|
||||||
|
|
||||||
@ -136,21 +139,30 @@ public abstract class AbstractPartitionBasedService<T extends EntityId> extends
|
|||||||
var fetchTasks = onAddedPartitions(addedPartitions);
|
var fetchTasks = onAddedPartitions(addedPartitions);
|
||||||
if (fetchTasks != null && !fetchTasks.isEmpty()) {
|
if (fetchTasks != null && !fetchTasks.isEmpty()) {
|
||||||
partitionedFetchTasks.putAll(fetchTasks);
|
partitionedFetchTasks.putAll(fetchTasks);
|
||||||
List<ListenableFuture<?>> futures = new ArrayList<>();
|
|
||||||
fetchTasks.values().forEach(futures::addAll);
|
|
||||||
DonAsynchron.withCallback(Futures.allAsList(futures),
|
|
||||||
t -> logPartitions(), e -> log.error("Partition fetch task error", e));
|
|
||||||
} else {
|
|
||||||
logPartitions();
|
|
||||||
}
|
}
|
||||||
} else {
|
partitionListChanged = true;
|
||||||
logPartitions();
|
}
|
||||||
|
|
||||||
|
if (partitionListChanged) {
|
||||||
|
List<ListenableFuture<?>> partitionFetchFutures = new ArrayList<>();
|
||||||
|
partitionedFetchTasks.values().forEach(partitionFetchFutures::addAll);
|
||||||
|
DonAsynchron.withCallback(Futures.allAsList(partitionFetchFutures), t -> logPartitions(), this::logFailure);
|
||||||
}
|
}
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
log.warn("[{}] Failed to init entities state from DB", getServiceName(), t);
|
log.warn("[{}] Failed to init entities state from DB", getServiceName(), t);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void logFailure(Throwable e) {
|
||||||
|
if (e instanceof CancellationException) {
|
||||||
|
//Probably this is fine and happens due to re-balancing.
|
||||||
|
log.trace("Partition fetch task error", e);
|
||||||
|
} else {
|
||||||
|
log.error("Partition fetch task error", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
private void logPartitions() {
|
private void logPartitions() {
|
||||||
log.info("[{}] Managing following partitions:", getServiceName());
|
log.info("[{}] Managing following partitions:", getServiceName());
|
||||||
partitionedEntities.forEach((tpi, entities) -> {
|
partitionedEntities.forEach((tpi, entities) -> {
|
||||||
|
|||||||
@ -30,6 +30,7 @@ import org.springframework.beans.factory.annotation.Value;
|
|||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.springframework.util.StringUtils;
|
import org.springframework.util.StringUtils;
|
||||||
import org.thingsboard.common.util.JacksonUtil;
|
import org.thingsboard.common.util.JacksonUtil;
|
||||||
|
import org.thingsboard.common.util.ThingsBoardExecutors;
|
||||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||||
import org.thingsboard.server.cluster.TbClusterService;
|
import org.thingsboard.server.cluster.TbClusterService;
|
||||||
import org.thingsboard.server.common.data.DataConstants;
|
import org.thingsboard.server.common.data.DataConstants;
|
||||||
@ -64,6 +65,7 @@ import org.thingsboard.server.dao.device.DeviceService;
|
|||||||
import org.thingsboard.server.dao.sql.query.EntityQueryRepository;
|
import org.thingsboard.server.dao.sql.query.EntityQueryRepository;
|
||||||
import org.thingsboard.server.dao.tenant.TenantService;
|
import org.thingsboard.server.dao.tenant.TenantService;
|
||||||
import org.thingsboard.server.dao.timeseries.TimeseriesService;
|
import org.thingsboard.server.dao.timeseries.TimeseriesService;
|
||||||
|
import org.thingsboard.server.dao.util.DbTypeInfoComponent;
|
||||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||||
import org.thingsboard.server.queue.discovery.PartitionService;
|
import org.thingsboard.server.queue.discovery.PartitionService;
|
||||||
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
|
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
|
||||||
@ -81,6 +83,7 @@ import java.util.Collections;
|
|||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
@ -89,6 +92,7 @@ import java.util.concurrent.ConcurrentMap;
|
|||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
@ -144,6 +148,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
|
|||||||
private final PartitionService partitionService;
|
private final PartitionService partitionService;
|
||||||
private final TbServiceInfoProvider serviceInfoProvider;
|
private final TbServiceInfoProvider serviceInfoProvider;
|
||||||
private final EntityQueryRepository entityQueryRepository;
|
private final EntityQueryRepository entityQueryRepository;
|
||||||
|
private final DbTypeInfoComponent dbTypeInfoComponent;
|
||||||
|
|
||||||
private TelemetrySubscriptionService tsSubService;
|
private TelemetrySubscriptionService tsSubService;
|
||||||
|
|
||||||
@ -171,7 +176,8 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
|
|||||||
AttributesService attributesService, TimeseriesService tsService,
|
AttributesService attributesService, TimeseriesService tsService,
|
||||||
TbClusterService clusterService, PartitionService partitionService,
|
TbClusterService clusterService, PartitionService partitionService,
|
||||||
TbServiceInfoProvider serviceInfoProvider,
|
TbServiceInfoProvider serviceInfoProvider,
|
||||||
EntityQueryRepository entityQueryRepository) {
|
EntityQueryRepository entityQueryRepository,
|
||||||
|
DbTypeInfoComponent dbTypeInfoComponent) {
|
||||||
this.tenantService = tenantService;
|
this.tenantService = tenantService;
|
||||||
this.deviceService = deviceService;
|
this.deviceService = deviceService;
|
||||||
this.attributesService = attributesService;
|
this.attributesService = attributesService;
|
||||||
@ -180,6 +186,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
|
|||||||
this.partitionService = partitionService;
|
this.partitionService = partitionService;
|
||||||
this.serviceInfoProvider = serviceInfoProvider;
|
this.serviceInfoProvider = serviceInfoProvider;
|
||||||
this.entityQueryRepository = entityQueryRepository;
|
this.entityQueryRepository = entityQueryRepository;
|
||||||
|
this.dbTypeInfoComponent = dbTypeInfoComponent;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
@ -190,8 +197,8 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
|
|||||||
@PostConstruct
|
@PostConstruct
|
||||||
public void init() {
|
public void init() {
|
||||||
super.init();
|
super.init();
|
||||||
deviceStateExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(
|
deviceStateExecutor = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newWorkStealingPool(
|
||||||
Runtime.getRuntime().availableProcessors(), ThingsBoardThreadFactory.forName("device-state")));
|
Math.max(4, Runtime.getRuntime().availableProcessors()), "device-state"));
|
||||||
scheduledExecutor.scheduleAtFixedRate(this::updateInactivityStateIfExpired, new Random().nextInt(defaultStateCheckIntervalInSec), defaultStateCheckIntervalInSec, TimeUnit.SECONDS);
|
scheduledExecutor.scheduleAtFixedRate(this::updateInactivityStateIfExpired, new Random().nextInt(defaultStateCheckIntervalInSec), defaultStateCheckIntervalInSec, TimeUnit.SECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -351,10 +358,16 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
|
|||||||
|
|
||||||
for (var entry : tpiDeviceMap.entrySet()) {
|
for (var entry : tpiDeviceMap.entrySet()) {
|
||||||
AtomicInteger counter = new AtomicInteger(0);
|
AtomicInteger counter = new AtomicInteger(0);
|
||||||
|
// hard-coded limit of 1000 is due to the Entity Data Query limitations and should not be changed.
|
||||||
for (List<DeviceIdInfo> partition : Lists.partition(entry.getValue(), 1000)) {
|
for (List<DeviceIdInfo> partition : Lists.partition(entry.getValue(), 1000)) {
|
||||||
log.info("[{}] Submit task for device states: {}", entry.getKey(), partition.size());
|
log.info("[{}] Submit task for device states: {}", entry.getKey(), partition.size());
|
||||||
var devicePackFuture = deviceStateExecutor.submit(() -> {
|
var devicePackFuture = deviceStateExecutor.submit(() -> {
|
||||||
var states = fetchDeviceStateData(partition);
|
List<DeviceStateData> states;
|
||||||
|
if (persistToTelemetry && !dbTypeInfoComponent.isLatestTsDaoStoredToSql()) {
|
||||||
|
states = fetchDeviceStateDataUsingSeparateRequests(partition);
|
||||||
|
} else {
|
||||||
|
states = fetchDeviceStateDataUsingEntityDataQuery(partition);
|
||||||
|
}
|
||||||
for (var state : states) {
|
for (var state : states) {
|
||||||
addDeviceUsingState(entry.getKey(), state);
|
addDeviceUsingState(entry.getKey(), state);
|
||||||
checkAndUpdateState(state.getDeviceId(), state);
|
checkAndUpdateState(state.getDeviceId(), state);
|
||||||
@ -445,10 +458,10 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
|
|||||||
if (deviceStateData != null) {
|
if (deviceStateData != null) {
|
||||||
return deviceStateData;
|
return deviceStateData;
|
||||||
}
|
}
|
||||||
return fetchDeviceStateData(deviceId);
|
return fetchDeviceStateDataUsingEntityDataQuery(deviceId);
|
||||||
}
|
}
|
||||||
|
|
||||||
DeviceStateData fetchDeviceStateData(final DeviceId deviceId) {
|
DeviceStateData fetchDeviceStateDataUsingEntityDataQuery(final DeviceId deviceId) {
|
||||||
final Device device = deviceService.findDeviceById(TenantId.SYS_TENANT_ID, deviceId);
|
final Device device = deviceService.findDeviceById(TenantId.SYS_TENANT_ID, deviceId);
|
||||||
if (device == null) {
|
if (device == null) {
|
||||||
log.warn("[{}] Failed to fetch device by Id!", deviceId);
|
log.warn("[{}] Failed to fetch device by Id!", deviceId);
|
||||||
@ -562,7 +575,30 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<DeviceStateData> fetchDeviceStateData(List<DeviceIdInfo> deviceIds) {
|
private List<DeviceStateData> fetchDeviceStateDataUsingSeparateRequests(List<DeviceIdInfo> deviceIds) {
|
||||||
|
List<Device> devices = deviceService.findDevicesByIds(deviceIds.stream().map(DeviceIdInfo::getDeviceId).collect(Collectors.toList()));
|
||||||
|
List<ListenableFuture<DeviceStateData>> deviceStateFutures = new ArrayList<>();
|
||||||
|
for (Device device : devices) {
|
||||||
|
deviceStateFutures.add(fetchDeviceState(device));
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
List<DeviceStateData> result = Futures.successfulAsList(deviceStateFutures).get(5, TimeUnit.MINUTES);
|
||||||
|
boolean success = true;
|
||||||
|
for (int i = 0; i < result.size(); i++) {
|
||||||
|
success = false;
|
||||||
|
if (result.get(i) == null) {
|
||||||
|
DeviceIdInfo deviceIdInfo = deviceIds.get(i);
|
||||||
|
log.warn("[{}][{}] Failed to initialized device state due to:", deviceIdInfo.getTenantId(), deviceIdInfo.getDeviceId());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return success ? result : result.stream().filter(Objects::nonNull).collect(Collectors.toList());
|
||||||
|
} catch (InterruptedException | ExecutionException | TimeoutException e) {
|
||||||
|
log.warn("Failed to initialized device state futures for ids: {} due to:", deviceIds, e);
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<DeviceStateData> fetchDeviceStateDataUsingEntityDataQuery(List<DeviceIdInfo> deviceIds) {
|
||||||
EntityListFilter ef = new EntityListFilter();
|
EntityListFilter ef = new EntityListFilter();
|
||||||
ef.setEntityType(EntityType.DEVICE);
|
ef.setEntityType(EntityType.DEVICE);
|
||||||
ef.setEntityList(deviceIds.stream().map(DeviceIdInfo::getDeviceId).map(DeviceId::getId).map(UUID::toString).collect(Collectors.toList()));
|
ef.setEntityList(deviceIds.stream().map(DeviceIdInfo::getDeviceId).map(DeviceId::getId).map(UUID::toString).collect(Collectors.toList()));
|
||||||
|
|||||||
@ -63,7 +63,7 @@ public class DefaultDeviceStateServiceTest {
|
|||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() {
|
public void setUp() {
|
||||||
service = spy(new DefaultDeviceStateService(tenantService, deviceService, attributesService, tsService, clusterService, partitionService, serviceInfoProvider, null));
|
service = spy(new DefaultDeviceStateService(tenantService, deviceService, attributesService, tsService, clusterService, partitionService, serviceInfoProvider, null, null));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -71,16 +71,16 @@ public class DefaultDeviceStateServiceTest {
|
|||||||
service.deviceStates.put(deviceId, deviceStateDataMock);
|
service.deviceStates.put(deviceId, deviceStateDataMock);
|
||||||
DeviceStateData deviceStateData = service.getOrFetchDeviceStateData(deviceId);
|
DeviceStateData deviceStateData = service.getOrFetchDeviceStateData(deviceId);
|
||||||
assertThat(deviceStateData, is(deviceStateDataMock));
|
assertThat(deviceStateData, is(deviceStateDataMock));
|
||||||
Mockito.verify(service, never()).fetchDeviceStateData(deviceId);
|
Mockito.verify(service, never()).fetchDeviceStateDataUsingEntityDataQuery(deviceId);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void givenDeviceIdWithoutDeviceStateInMap_whenGetOrFetchDeviceStateData_thenFetchDeviceStateData() {
|
public void givenDeviceIdWithoutDeviceStateInMap_whenGetOrFetchDeviceStateData_thenFetchDeviceStateData() {
|
||||||
service.deviceStates.clear();
|
service.deviceStates.clear();
|
||||||
willReturn(deviceStateDataMock).given(service).fetchDeviceStateData(deviceId);
|
willReturn(deviceStateDataMock).given(service).fetchDeviceStateDataUsingEntityDataQuery(deviceId);
|
||||||
DeviceStateData deviceStateData = service.getOrFetchDeviceStateData(deviceId);
|
DeviceStateData deviceStateData = service.getOrFetchDeviceStateData(deviceId);
|
||||||
assertThat(deviceStateData, is(deviceStateDataMock));
|
assertThat(deviceStateData, is(deviceStateDataMock));
|
||||||
Mockito.verify(service, times(1)).fetchDeviceStateData(deviceId);
|
Mockito.verify(service, times(1)).fetchDeviceStateDataUsingEntityDataQuery(deviceId);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -81,6 +81,8 @@ public interface DeviceService {
|
|||||||
|
|
||||||
ListenableFuture<List<Device>> findDevicesByTenantIdAndIdsAsync(TenantId tenantId, List<DeviceId> deviceIds);
|
ListenableFuture<List<Device>> findDevicesByTenantIdAndIdsAsync(TenantId tenantId, List<DeviceId> deviceIds);
|
||||||
|
|
||||||
|
List<Device> findDevicesByIds(List<DeviceId> deviceIds);
|
||||||
|
|
||||||
ListenableFuture<List<Device>> findDevicesByIdsAsync(List<DeviceId> deviceIds);
|
ListenableFuture<List<Device>> findDevicesByIdsAsync(List<DeviceId> deviceIds);
|
||||||
|
|
||||||
void deleteDevicesByTenantId(TenantId tenantId);
|
void deleteDevicesByTenantId(TenantId tenantId);
|
||||||
|
|||||||
@ -0,0 +1,22 @@
|
|||||||
|
/**
|
||||||
|
* Copyright © 2016-2022 The Thingsboard Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.thingsboard.server.dao.util;
|
||||||
|
|
||||||
|
public interface DbTypeInfoComponent {
|
||||||
|
|
||||||
|
boolean isLatestTsDaoStoredToSql();
|
||||||
|
|
||||||
|
}
|
||||||
@ -0,0 +1,33 @@
|
|||||||
|
/**
|
||||||
|
* Copyright © 2016-2022 The Thingsboard Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.thingsboard.server.dao.util;
|
||||||
|
|
||||||
|
import lombok.Getter;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
public class DefaultDbTypeInfoComponent implements DbTypeInfoComponent {
|
||||||
|
|
||||||
|
@Value("${database.ts_latest.type:sql}")
|
||||||
|
@Getter
|
||||||
|
private String latestTsDbType;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isLatestTsDaoStoredToSql() {
|
||||||
|
return !latestTsDbType.equalsIgnoreCase("cassandra");
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -129,6 +129,14 @@ public interface DeviceDao extends Dao<Device>, TenantEntityDao, ExportableEntit
|
|||||||
*/
|
*/
|
||||||
ListenableFuture<List<Device>> findDevicesByTenantIdAndIdsAsync(UUID tenantId, List<UUID> deviceIds);
|
ListenableFuture<List<Device>> findDevicesByTenantIdAndIdsAsync(UUID tenantId, List<UUID> deviceIds);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Find devices by devices Ids.
|
||||||
|
*
|
||||||
|
* @param deviceIds the device Ids
|
||||||
|
* @return the list of device objects
|
||||||
|
*/
|
||||||
|
List<Device> findDevicesByIds(List<UUID> deviceIds);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Find devices by devices Ids.
|
* Find devices by devices Ids.
|
||||||
*
|
*
|
||||||
|
|||||||
@ -409,6 +409,13 @@ public class DeviceServiceImpl extends AbstractCachedEntityService<DeviceCacheKe
|
|||||||
return deviceDao.findDevicesByTenantIdAndIdsAsync(tenantId.getId(), toUUIDs(deviceIds));
|
return deviceDao.findDevicesByTenantIdAndIdsAsync(tenantId.getId(), toUUIDs(deviceIds));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<Device> findDevicesByIds(List<DeviceId> deviceIds) {
|
||||||
|
log.trace("Executing findDevicesByIdsAsync, deviceIds [{}]", deviceIds);
|
||||||
|
validateIds(deviceIds, "Incorrect deviceIds " + deviceIds);
|
||||||
|
return deviceDao.findDevicesByIds(toUUIDs(deviceIds));
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ListenableFuture<List<Device>> findDevicesByIdsAsync(List<DeviceId> deviceIds) {
|
public ListenableFuture<List<Device>> findDevicesByIdsAsync(List<DeviceId> deviceIds) {
|
||||||
log.trace("Executing findDevicesByIdsAsync, deviceIds [{}]", deviceIds);
|
log.trace("Executing findDevicesByIdsAsync, deviceIds [{}]", deviceIds);
|
||||||
|
|||||||
@ -0,0 +1,64 @@
|
|||||||
|
/**
|
||||||
|
* Copyright © 2016-2022 The Thingsboard Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.thingsboard.server.dao.sql.device;
|
||||||
|
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.data.domain.Pageable;
|
||||||
|
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
|
||||||
|
import org.springframework.stereotype.Repository;
|
||||||
|
import org.springframework.transaction.support.TransactionTemplate;
|
||||||
|
import org.thingsboard.server.common.data.DeviceIdInfo;
|
||||||
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
|
import org.thingsboard.server.common.data.page.PageData;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.UUID;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
@Repository
|
||||||
|
@Slf4j
|
||||||
|
public class DefaultNativeDeviceRepository implements NativeDeviceRepository {
|
||||||
|
|
||||||
|
private final String COUNT_QUERY = "SELECT count(id) FROM device;";
|
||||||
|
private final String QUERY = "SELECT tenant_id as tenantId, customer_id as customerId, id as id FROM device ORDER BY created_time ASC LIMIT %s OFFSET %s";
|
||||||
|
private final NamedParameterJdbcTemplate jdbcTemplate;
|
||||||
|
private final TransactionTemplate transactionTemplate;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public PageData<DeviceIdInfo> findDeviceIdInfos(Pageable pageable) {
|
||||||
|
return transactionTemplate.execute(status -> {
|
||||||
|
long startTs = System.currentTimeMillis();
|
||||||
|
int totalElements = jdbcTemplate.queryForObject(COUNT_QUERY, Collections.emptyMap(), Integer.class);
|
||||||
|
log.debug("Count query took {} ms", System.currentTimeMillis() - startTs);
|
||||||
|
startTs = System.currentTimeMillis();
|
||||||
|
List<Map<String, Object>> rows = jdbcTemplate.queryForList(String.format(QUERY, pageable.getPageSize(), pageable.getOffset()), Collections.emptyMap());
|
||||||
|
log.debug("Main query took {} ms", System.currentTimeMillis() - startTs);
|
||||||
|
int totalPages = pageable.getPageSize() > 0 ? (int) Math.ceil((float) totalElements / pageable.getPageSize()) : 1;
|
||||||
|
boolean hasNext = pageable.getPageSize() > 0 && totalElements > pageable.getOffset() + rows.size();
|
||||||
|
var data = rows.stream().map(row -> {
|
||||||
|
UUID id = (UUID) row.get("id");
|
||||||
|
var tenantIdObj = row.get("tenantId");
|
||||||
|
var customerIdObj = row.get("customerId");
|
||||||
|
return new DeviceIdInfo(tenantIdObj != null ? (UUID) tenantIdObj : TenantId.SYS_TENANT_ID.getId(), customerIdObj != null ? (UUID) customerIdObj : null, id);
|
||||||
|
}).collect(Collectors.toList());
|
||||||
|
return new PageData<>(data, totalPages, totalElements, hasNext);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -252,7 +252,4 @@ public interface DeviceRepository extends JpaRepository<DeviceEntity, UUID>, Exp
|
|||||||
@Query("SELECT externalId FROM DeviceEntity WHERE id = :id")
|
@Query("SELECT externalId FROM DeviceEntity WHERE id = :id")
|
||||||
UUID getExternalIdById(@Param("id") UUID id);
|
UUID getExternalIdById(@Param("id") UUID id);
|
||||||
|
|
||||||
@Query("SELECT new org.thingsboard.server.common.data.DeviceIdInfo(d.tenantId, d.customerId, d.id) FROM DeviceEntity d")
|
|
||||||
Page<DeviceIdInfo> findDeviceIdInfos(Pageable pageable);
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -59,6 +59,9 @@ public class JpaDeviceDao extends JpaAbstractSearchTextDao<DeviceEntity, Device>
|
|||||||
@Autowired
|
@Autowired
|
||||||
private DeviceRepository deviceRepository;
|
private DeviceRepository deviceRepository;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private NativeDeviceRepository nativeDeviceRepository;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Class<DeviceEntity> getEntityClass() {
|
protected Class<DeviceEntity> getEntityClass() {
|
||||||
return DeviceEntity.class;
|
return DeviceEntity.class;
|
||||||
@ -112,9 +115,14 @@ public class JpaDeviceDao extends JpaAbstractSearchTextDao<DeviceEntity, Device>
|
|||||||
return service.submit(() -> DaoUtil.convertDataList(deviceRepository.findDevicesByTenantIdAndIdIn(tenantId, deviceIds)));
|
return service.submit(() -> DaoUtil.convertDataList(deviceRepository.findDevicesByTenantIdAndIdIn(tenantId, deviceIds)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<Device> findDevicesByIds(List<UUID> deviceIds) {
|
||||||
|
return DaoUtil.convertDataList(deviceRepository.findDevicesByIdIn(deviceIds));
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ListenableFuture<List<Device>> findDevicesByIdsAsync(List<UUID> deviceIds) {
|
public ListenableFuture<List<Device>> findDevicesByIdsAsync(List<UUID> deviceIds) {
|
||||||
return service.submit(() -> DaoUtil.convertDataList(deviceRepository.findDevicesByIdIn(deviceIds)));
|
return service.submit(() -> findDevicesByIds(deviceIds));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -313,8 +321,7 @@ public class JpaDeviceDao extends JpaAbstractSearchTextDao<DeviceEntity, Device>
|
|||||||
@Override
|
@Override
|
||||||
public PageData<DeviceIdInfo> findDeviceIdInfos(PageLink pageLink) {
|
public PageData<DeviceIdInfo> findDeviceIdInfos(PageLink pageLink) {
|
||||||
log.debug("Try to find tenant device id infos by pageLink [{}]", pageLink);
|
log.debug("Try to find tenant device id infos by pageLink [{}]", pageLink);
|
||||||
var page = deviceRepository.findDeviceIdInfos(DaoUtil.toPageable(pageLink));
|
return nativeDeviceRepository.findDeviceIdInfos(DaoUtil.toPageable(pageLink));
|
||||||
return new PageData<>(page.getContent(), page.getTotalPages(), page.getTotalElements(), page.hasNext());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@ -0,0 +1,26 @@
|
|||||||
|
/**
|
||||||
|
* Copyright © 2016-2022 The Thingsboard Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.thingsboard.server.dao.sql.device;
|
||||||
|
|
||||||
|
import org.springframework.data.domain.Pageable;
|
||||||
|
import org.thingsboard.server.common.data.DeviceIdInfo;
|
||||||
|
import org.thingsboard.server.common.data.page.PageData;
|
||||||
|
|
||||||
|
public interface NativeDeviceRepository {
|
||||||
|
|
||||||
|
PageData<DeviceIdInfo> findDeviceIdInfos(Pageable pageable);
|
||||||
|
|
||||||
|
}
|
||||||
@ -7,7 +7,5 @@ TRANSPORT_TYPE=remote
|
|||||||
|
|
||||||
HTTP_LOG_CONTROLLER_ERROR_STACK_TRACE=false
|
HTTP_LOG_CONTROLLER_ERROR_STACK_TRACE=false
|
||||||
|
|
||||||
TB_QUEUE_PARTITIONS_VIRTUAL_NODES_SIZE=64
|
|
||||||
|
|
||||||
METRICS_ENABLED=true
|
METRICS_ENABLED=true
|
||||||
METRICS_ENDPOINTS_EXPOSE=prometheus
|
METRICS_ENDPOINTS_EXPOSE=prometheus
|
||||||
|
|||||||
@ -223,7 +223,6 @@ queue:
|
|||||||
notifications: "${TB_QUEUE_RABBIT_MQ_NOTIFICATIONS_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}"
|
notifications: "${TB_QUEUE_RABBIT_MQ_NOTIFICATIONS_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}"
|
||||||
partitions:
|
partitions:
|
||||||
hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}"
|
hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}"
|
||||||
virtual_nodes_size: "${TB_QUEUE_PARTITIONS_VIRTUAL_NODES_SIZE:16}"
|
|
||||||
transport_api:
|
transport_api:
|
||||||
requests_topic: "${TB_QUEUE_TRANSPORT_API_REQUEST_TOPIC:tb_transport.api.requests}"
|
requests_topic: "${TB_QUEUE_TRANSPORT_API_REQUEST_TOPIC:tb_transport.api.requests}"
|
||||||
responses_topic: "${TB_QUEUE_TRANSPORT_API_RESPONSE_TOPIC:tb_transport.api.responses}"
|
responses_topic: "${TB_QUEUE_TRANSPORT_API_RESPONSE_TOPIC:tb_transport.api.responses}"
|
||||||
|
|||||||
@ -210,7 +210,6 @@ queue:
|
|||||||
notifications: "${TB_QUEUE_RABBIT_MQ_NOTIFICATIONS_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}"
|
notifications: "${TB_QUEUE_RABBIT_MQ_NOTIFICATIONS_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}"
|
||||||
partitions:
|
partitions:
|
||||||
hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}"
|
hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}"
|
||||||
virtual_nodes_size: "${TB_QUEUE_PARTITIONS_VIRTUAL_NODES_SIZE:16}"
|
|
||||||
transport_api:
|
transport_api:
|
||||||
requests_topic: "${TB_QUEUE_TRANSPORT_API_REQUEST_TOPIC:tb_transport.api.requests}"
|
requests_topic: "${TB_QUEUE_TRANSPORT_API_REQUEST_TOPIC:tb_transport.api.requests}"
|
||||||
responses_topic: "${TB_QUEUE_TRANSPORT_API_RESPONSE_TOPIC:tb_transport.api.responses}"
|
responses_topic: "${TB_QUEUE_TRANSPORT_API_RESPONSE_TOPIC:tb_transport.api.responses}"
|
||||||
|
|||||||
@ -288,7 +288,6 @@ queue:
|
|||||||
notifications: "${TB_QUEUE_RABBIT_MQ_NOTIFICATIONS_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}"
|
notifications: "${TB_QUEUE_RABBIT_MQ_NOTIFICATIONS_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}"
|
||||||
partitions:
|
partitions:
|
||||||
hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}"
|
hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}"
|
||||||
virtual_nodes_size: "${TB_QUEUE_PARTITIONS_VIRTUAL_NODES_SIZE:16}"
|
|
||||||
transport_api:
|
transport_api:
|
||||||
requests_topic: "${TB_QUEUE_TRANSPORT_API_REQUEST_TOPIC:tb_transport.api.requests}"
|
requests_topic: "${TB_QUEUE_TRANSPORT_API_REQUEST_TOPIC:tb_transport.api.requests}"
|
||||||
responses_topic: "${TB_QUEUE_TRANSPORT_API_RESPONSE_TOPIC:tb_transport.api.responses}"
|
responses_topic: "${TB_QUEUE_TRANSPORT_API_RESPONSE_TOPIC:tb_transport.api.responses}"
|
||||||
|
|||||||
@ -240,7 +240,6 @@ queue:
|
|||||||
notifications: "${TB_QUEUE_RABBIT_MQ_NOTIFICATIONS_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}"
|
notifications: "${TB_QUEUE_RABBIT_MQ_NOTIFICATIONS_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}"
|
||||||
partitions:
|
partitions:
|
||||||
hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}"
|
hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}"
|
||||||
virtual_nodes_size: "${TB_QUEUE_PARTITIONS_VIRTUAL_NODES_SIZE:16}"
|
|
||||||
transport_api:
|
transport_api:
|
||||||
requests_topic: "${TB_QUEUE_TRANSPORT_API_REQUEST_TOPIC:tb_transport.api.requests}"
|
requests_topic: "${TB_QUEUE_TRANSPORT_API_REQUEST_TOPIC:tb_transport.api.requests}"
|
||||||
responses_topic: "${TB_QUEUE_TRANSPORT_API_RESPONSE_TOPIC:tb_transport.api.responses}"
|
responses_topic: "${TB_QUEUE_TRANSPORT_API_RESPONSE_TOPIC:tb_transport.api.responses}"
|
||||||
|
|||||||
@ -190,7 +190,6 @@ queue:
|
|||||||
notifications: "${TB_QUEUE_RABBIT_MQ_NOTIFICATIONS_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}"
|
notifications: "${TB_QUEUE_RABBIT_MQ_NOTIFICATIONS_QUEUE_PROPERTIES:x-max-length-bytes:1048576000;x-message-ttl:604800000}"
|
||||||
partitions:
|
partitions:
|
||||||
hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}"
|
hash_function_name: "${TB_QUEUE_PARTITIONS_HASH_FUNCTION_NAME:murmur3_128}"
|
||||||
virtual_nodes_size: "${TB_QUEUE_PARTITIONS_VIRTUAL_NODES_SIZE:16}"
|
|
||||||
transport_api:
|
transport_api:
|
||||||
requests_topic: "${TB_QUEUE_TRANSPORT_API_REQUEST_TOPIC:tb_transport.api.requests}"
|
requests_topic: "${TB_QUEUE_TRANSPORT_API_REQUEST_TOPIC:tb_transport.api.requests}"
|
||||||
responses_topic: "${TB_QUEUE_TRANSPORT_API_RESPONSE_TOPIC:tb_transport.api.responses}"
|
responses_topic: "${TB_QUEUE_TRANSPORT_API_RESPONSE_TOPIC:tb_transport.api.responses}"
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user