From 8dc01ea5dbfb69457c81fcc8030e4b3f46f91210 Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Wed, 3 Aug 2022 18:52:10 +0300 Subject: [PATCH] Major improvement to device state service --- .../DefaultTbApiUsageStateService.java | 11 +- .../AbstractPartitionBasedService.java | 39 ++- .../state/DefaultDeviceStateService.java | 230 +++++++++++++----- .../state/DefaultDeviceStateServiceTest.java | 5 +- .../server/dao/device/DeviceService.java | 5 + .../server/common/data/DeviceIdInfo.java | 42 ++++ .../server/dao/device/DeviceDao.java | 13 + .../server/dao/device/DeviceServiceImpl.java | 16 +- .../dao/sql/device/DeviceRepository.java | 6 + .../server/dao/sql/device/JpaDeviceDao.java | 13 + .../query/DefaultEntityQueryRepository.java | 14 +- .../dao/sql/query/EntityQueryRepository.java | 2 + .../server/dao/sql/query/QueryContext.java | 4 + .../dao/sql/query/QuerySecurityContext.java | 5 + 14 files changed, 325 insertions(+), 80 deletions(-) create mode 100644 common/data/src/main/java/org/thingsboard/server/common/data/DeviceIdInfo.java diff --git a/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java b/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java index 7ccfd6fe4d..d5c86a67db 100644 --- a/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java @@ -70,6 +70,7 @@ import javax.annotation.PreDestroy; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -79,6 +80,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -500,19 +502,19 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService } @Override - protected void onAddedPartitions(Set addedPartitions) { + protected Map>> onAddedPartitions(Set addedPartitions) { + var result = new HashMap>>(); try { log.info("Initializing tenant states."); updateLock.lock(); try { PageDataIterable tenantIterator = new PageDataIterable<>(tenantService::findTenants, 1024); - List> futures = new ArrayList<>(); for (Tenant tenant : tenantIterator) { TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenant.getId(), tenant.getId()); if (addedPartitions.contains(tpi)) { if (!myUsageStates.containsKey(tenant.getId()) && tpi.isMyPartition()) { log.debug("[{}] Initializing tenant state.", tenant.getId()); - futures.add(dbExecutor.submit(() -> { + result.computeIfAbsent(tpi, tmp -> new ArrayList<>()).add(dbExecutor.submit(() -> { try { updateTenantState((TenantApiUsageState) getOrFetchState(tenant.getId(), tenant.getId()), tenantProfileCache.get(tenant.getTenantProfileId())); log.debug("[{}] Initialized tenant state.", tenant.getId()); @@ -526,14 +528,13 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService log.debug("[{}][{}] Tenant doesn't belong to current partition. tpi [{}]", tenant.getName(), tenant.getId(), tpi); } } - Futures.whenAllComplete(futures); } finally { updateLock.unlock(); } - log.info("Initialized {} tenant states.", myUsageStates.size()); } catch (Exception e) { log.warn("Unknown failure", e); } + return result; } @PreDestroy diff --git a/application/src/main/java/org/thingsboard/server/service/partition/AbstractPartitionBasedService.java b/application/src/main/java/org/thingsboard/server/service/partition/AbstractPartitionBasedService.java index bf0bc429c8..71469605fd 100644 --- a/application/src/main/java/org/thingsboard/server/service/partition/AbstractPartitionBasedService.java +++ b/application/src/main/java/org/thingsboard/server/service/partition/AbstractPartitionBasedService.java @@ -15,9 +15,12 @@ */ package org.thingsboard.server.service.partition; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; +import org.thingsboard.common.util.DonAsynchron; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.msg.queue.ServiceType; @@ -25,7 +28,11 @@ import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.queue.discovery.TbApplicationEventListener; import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; +import java.util.ArrayList; +import java.util.Collection; import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -37,6 +44,7 @@ import java.util.concurrent.Executors; public abstract class AbstractPartitionBasedService extends TbApplicationEventListener { protected final ConcurrentMap> partitionedEntities = new ConcurrentHashMap<>(); + protected final ConcurrentMap>> partitionedFetchTasks = new ConcurrentHashMap<>(); final Queue> subscribeQueue = new ConcurrentLinkedQueue<>(); protected ListeningScheduledExecutorService scheduledExecutor; @@ -45,7 +53,7 @@ public abstract class AbstractPartitionBasedService extends abstract protected String getSchedulerExecutorName(); - abstract protected void onAddedPartitions(Set addedPartitions); + abstract protected Map>> onAddedPartitions(Set addedPartitions); abstract protected void cleanupEntityOnPartitionRemoval(T entityId); @@ -112,6 +120,10 @@ public abstract class AbstractPartitionBasedService extends removedPartitions.forEach(partition -> { Set entities = partitionedEntities.remove(partition); entities.forEach(this::cleanupEntityOnPartitionRemoval); + List> fetchTasks = partitionedFetchTasks.remove(partition); + if (fetchTasks != null) { + fetchTasks.forEach(f -> f.cancel(true)); + } }); onRepartitionEvent(); @@ -119,18 +131,31 @@ public abstract class AbstractPartitionBasedService extends addedPartitions.forEach(tpi -> partitionedEntities.computeIfAbsent(tpi, key -> ConcurrentHashMap.newKeySet())); if (!addedPartitions.isEmpty()) { - onAddedPartitions(addedPartitions); + var fetchTasks = onAddedPartitions(addedPartitions); + if (fetchTasks != null && !fetchTasks.isEmpty()) { + partitionedFetchTasks.putAll(fetchTasks); + List> futures = new ArrayList<>(); + fetchTasks.values().forEach(futures::addAll); + DonAsynchron.withCallback(Futures.allAsList(futures), + t -> logPartitions(), e -> log.error("Partition fetch task error", e)); + } else { + logPartitions(); + } + } else { + logPartitions(); } - - log.info("[{}] Managing following partitions:", getServiceName()); - partitionedEntities.forEach((tpi, entities) -> { - log.info("[{}][{}]: {} entities", getServiceName(), tpi.getFullTopicName(), entities.size()); - }); } catch (Throwable t) { log.warn("[{}] Failed to init entities state from DB", getServiceName(), t); } } + private void logPartitions() { + log.info("[{}] Managing following partitions:", getServiceName()); + partitionedEntities.forEach((tpi, entities) -> { + log.info("[{}][{}]: {} entities", getServiceName(), tpi.getFullTopicName(), entities.size()); + }); + } + protected void onRepartitionEvent() { } diff --git a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java index d74b0fca87..eee098d186 100644 --- a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java @@ -17,9 +17,12 @@ package org.thingsboard.server.service.state; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Function; +import com.google.common.collect.Lists; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -31,8 +34,10 @@ import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Device; -import org.thingsboard.server.common.data.Tenant; +import org.thingsboard.server.common.data.DeviceIdInfo; +import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; @@ -42,7 +47,13 @@ import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.page.PageDataIterable; -import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.common.data.query.EntityData; +import org.thingsboard.server.common.data.query.EntityDataPageLink; +import org.thingsboard.server.common.data.query.EntityDataQuery; +import org.thingsboard.server.common.data.query.EntityDataSortOrder; +import org.thingsboard.server.common.data.query.EntityKey; +import org.thingsboard.server.common.data.query.EntityKeyType; +import org.thingsboard.server.common.data.query.EntityListFilter; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgDataType; import org.thingsboard.server.common.msg.TbMsgMetaData; @@ -51,10 +62,12 @@ import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.device.DeviceService; +import org.thingsboard.server.dao.sql.query.EntityQueryRepository; import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.discovery.PartitionService; +import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.partition.AbstractPartitionBasedService; import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; @@ -66,16 +79,19 @@ import javax.annotation.PreDestroy; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import static org.thingsboard.server.common.data.DataConstants.ACTIVITY_EVENT; import static org.thingsboard.server.common.data.DataConstants.CONNECT_EVENT; @@ -98,8 +114,28 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService PERSISTENT_TELEMETRY_KEYS = Arrays.asList( + new EntityKey(EntityKeyType.TIME_SERIES, LAST_ACTIVITY_TIME), + new EntityKey(EntityKeyType.TIME_SERIES, INACTIVITY_ALARM_TIME), + new EntityKey(EntityKeyType.TIME_SERIES, INACTIVITY_TIMEOUT), + new EntityKey(EntityKeyType.TIME_SERIES, ACTIVITY_STATE), + new EntityKey(EntityKeyType.TIME_SERIES, LAST_CONNECT_TIME), + new EntityKey(EntityKeyType.TIME_SERIES, LAST_DISCONNECT_TIME)); + + private static final List PERSISTENT_ATTRIBUTE_KEYS = Arrays.asList( + new EntityKey(EntityKeyType.SERVER_ATTRIBUTE, LAST_ACTIVITY_TIME), + new EntityKey(EntityKeyType.SERVER_ATTRIBUTE, INACTIVITY_ALARM_TIME), + new EntityKey(EntityKeyType.SERVER_ATTRIBUTE, INACTIVITY_TIMEOUT), + new EntityKey(EntityKeyType.SERVER_ATTRIBUTE, ACTIVITY_STATE), + new EntityKey(EntityKeyType.SERVER_ATTRIBUTE, LAST_CONNECT_TIME), + new EntityKey(EntityKeyType.SERVER_ATTRIBUTE, LAST_DISCONNECT_TIME)); + public static final List PERSISTENT_ATTRIBUTES = Arrays.asList(ACTIVITY_STATE, LAST_CONNECT_TIME, LAST_DISCONNECT_TIME, LAST_ACTIVITY_TIME, INACTIVITY_ALARM_TIME, INACTIVITY_TIMEOUT); + private static final List PERSISTENT_ENTITY_FIELDS = Arrays.asList( + new EntityKey(EntityKeyType.ENTITY_FIELD, "name"), + new EntityKey(EntityKeyType.ENTITY_FIELD, "type"), + new EntityKey(EntityKeyType.ENTITY_FIELD, "createdTime")); private final TenantService tenantService; private final DeviceService deviceService; @@ -107,6 +143,8 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService deviceStates = new ConcurrentHashMap<>(); public DefaultDeviceStateService(TenantService tenantService, DeviceService deviceService, AttributesService attributesService, TimeseriesService tsService, - TbClusterService clusterService, PartitionService partitionService) { + TbClusterService clusterService, PartitionService partitionService, + TbServiceInfoProvider serviceInfoProvider, + EntityQueryRepository entityQueryRepository) { this.tenantService = tenantService; this.deviceService = deviceService; this.attributesService = attributesService; this.tsService = tsService; this.clusterService = clusterService; this.partitionService = partitionService; + this.serviceInfoProvider = serviceInfoProvider; + this.entityQueryRepository = entityQueryRepository; } @Autowired @@ -149,8 +191,8 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService addedPartitions) { - PageDataIterable tenantIterator = new PageDataIterable<>(tenantService::findTenants, 1024); - for (Tenant tenant : tenantIterator) { - log.debug("Finding devices for tenant [{}]", tenant.getName()); - final PageLink pageLink = new PageLink(initFetchPackSize); - processPageAndSubmitNextPage(addedPartitions, tenant, pageLink); - } - } + protected Map>> onAddedPartitions(Set addedPartitions) { + var result = new HashMap>>(); + PageDataIterable deviceIdInfos = new PageDataIterable<>(deviceService::findDeviceIdInfos, 1); + Map> tpiDeviceMap = new HashMap<>(); - private void processPageAndSubmitNextPage(final Set addedPartitions, final Tenant tenant, final PageLink pageLink) { - log.trace("[{}] Process page {} from {}", tenant, pageLink.getPage(), pageLink.getPageSize()); - List> fetchFutures = new ArrayList<>(); - PageData page = deviceService.findDevicesByTenantId(tenant.getId(), pageLink); - for (Device device : page.getData()) { - TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenant.getId(), device.getId()); - if (addedPartitions.contains(tpi) && !deviceStates.containsKey(device.getId())) { - log.debug("[{}][{}] Device belong to current partition. tpi [{}]. Fetching state from DB", device.getName(), device.getId(), tpi); - ListenableFuture future = Futures.transform(fetchDeviceState(device), new Function<>() { - @Nullable - @Override - public Void apply(@Nullable DeviceStateData state) { - if (state != null) { - addDeviceUsingState(tpi, state); - checkAndUpdateState(device.getId(), state); - } else { - log.warn("{}][{}] Fetched null state from DB", device.getName(), device.getId()); - } - return null; + for (DeviceIdInfo idInfo : deviceIdInfos) { + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, idInfo.getTenantId(), idInfo.getDeviceId()); + if (addedPartitions.contains(tpi) && !deviceStates.containsKey(idInfo.getDeviceId())) { + tpiDeviceMap.computeIfAbsent(tpi, tmp -> new ArrayList<>()).add(idInfo); + } + } + + for (var entry : tpiDeviceMap.entrySet()) { + AtomicInteger counter = new AtomicInteger(0); + for (List partition : Lists.partition(entry.getValue(), initFetchPackSize)) { + log.info("[{}] Submit task for device states: {}", entry.getKey(), partition.size()); + var devicePackFuture = deviceStateExecutor.submit(() -> { + var states = fetchDeviceStateData(partition); + for (var state : states) { + addDeviceUsingState(entry.getKey(), state); + checkAndUpdateState(state.getDeviceId(), state); } - }, deviceStateExecutor); - fetchFutures.add(future); - } else { - log.debug("[{}][{}] Device doesn't belong to current partition. tpi [{}]", device.getName(), device.getId(), tpi); + log.info("[{}] Initialized {} out of {} device states", entry.getKey().getPartition().orElse(0), counter.addAndGet(states.size()), entry.getValue().size()); + }); + result.computeIfAbsent(entry.getKey(), tmp -> new ArrayList<>()).add(devicePackFuture); } } - - Futures.addCallback(Futures.successfulAsList(fetchFutures), new FutureCallback<>() { - @Override - public void onSuccess(List result) { - log.trace("[{}] Success init device state from DB for batch size {}", tenant.getId(), result.size()); - } - - @Override - public void onFailure(Throwable t) { - log.warn("[" + tenant.getId() + "] Failed to init device state service from DB", t); - log.warn("[{}] Failed to init device state service from DB", tenant.getId(), t); - } - }, deviceStateExecutor); - - final PageLink nextPageLink = page.hasNext() ? pageLink.nextPageLink() : null; - if (nextPageLink != null) { - log.trace("[{}] Submit next page {} from {}", tenant, nextPageLink.getPage(), nextPageLink.getPageSize()); - processPageAndSubmitNextPage(addedPartitions, tenant, nextPageLink); - } + return result; } void checkAndUpdateState(@Nonnull DeviceId deviceId, @Nonnull DeviceStateData state) { @@ -376,13 +391,21 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService { - log.debug("Calculating state updates. tpi {} for {} devices", tpi.getFullTopicName(), deviceIds.size()); - for (DeviceId deviceId : deviceIds) { - updateInactivityStateIfExpired(ts, deviceId); - } - }); + try { + final long ts = System.currentTimeMillis(); + partitionedEntities.forEach((tpi, deviceIds) -> { + log.debug("Calculating state updates. tpi {} for {} devices", tpi.getFullTopicName(), deviceIds.size()); + for (DeviceId deviceId : deviceIds) { + try { + updateInactivityStateIfExpired(ts, deviceId); + } catch (Exception e) { + log.warn("[{}] Failed to update inactivity state", deviceId, e); + } + } + }); + } catch (Throwable t) { + log.warn("Failed to update inactivity states", t); + } } void updateInactivityStateIfExpired(long ts, DeviceId deviceId) { @@ -540,6 +563,83 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService fetchDeviceStateData(List deviceIds) { + EntityListFilter ef = new EntityListFilter(); + ef.setEntityType(EntityType.DEVICE); + ef.setEntityList(deviceIds.stream().map(DeviceIdInfo::getDeviceId).map(DeviceId::getId).map(UUID::toString).collect(Collectors.toList())); + + EntityDataQuery query = new EntityDataQuery(ef, + new EntityDataPageLink(deviceIds.size(), 0, null, new EntityDataSortOrder(new EntityKey(EntityKeyType.ENTITY_FIELD, "id"))), + PERSISTENT_ENTITY_FIELDS, + persistToTelemetry ? PERSISTENT_TELEMETRY_KEYS : PERSISTENT_ATTRIBUTE_KEYS, Collections.emptyList()); + PageData queryResult = entityQueryRepository.findEntityDataByQueryInternal(query); + + Map deviceIdInfos = deviceIds.stream().collect(Collectors.toMap(DeviceIdInfo::getDeviceId, java.util.function.Function.identity())); + + return queryResult.getData().stream().map(ed -> toDeviceStateData(ed, deviceIdInfos.get(ed.getEntityId()))).collect(Collectors.toList()); + + } + + private DeviceStateData toDeviceStateData(EntityData ed, DeviceIdInfo deviceIdInfo) { + long lastActivityTime = getEntryValue(ed, getKeyType(), LAST_ACTIVITY_TIME, 0L); + long inactivityAlarmTime = getEntryValue(ed, getKeyType(), INACTIVITY_ALARM_TIME, 0L); + long inactivityTimeout = getEntryValue(ed, getKeyType(), INACTIVITY_TIMEOUT, TimeUnit.SECONDS.toMillis(defaultInactivityTimeoutInSec)); + //Actual active state by wall-clock will updated outside this method. This method is only for fetch persistent state + final boolean active = getEntryValue(ed, getKeyType(), ACTIVITY_STATE, false); + DeviceState deviceState = DeviceState.builder() + .active(active) + .lastConnectTime(getEntryValue(ed, getKeyType(), LAST_CONNECT_TIME, 0L)) + .lastDisconnectTime(getEntryValue(ed, getKeyType(), LAST_DISCONNECT_TIME, 0L)) + .lastActivityTime(lastActivityTime) + .lastInactivityAlarmTime(inactivityAlarmTime) + .inactivityTimeout(inactivityTimeout) + .build(); + TbMsgMetaData md = new TbMsgMetaData(); + md.putValue("deviceName", getEntryValue(ed, EntityKeyType.ENTITY_FIELD, "name", "")); + md.putValue("deviceType", getEntryValue(ed, EntityKeyType.ENTITY_FIELD, "type", "")); + return DeviceStateData.builder() + .customerId(deviceIdInfo.getCustomerId()) + .tenantId(deviceIdInfo.getTenantId()) + .deviceId(deviceIdInfo.getDeviceId()) + .deviceCreationTime(getEntryValue(ed, EntityKeyType.ENTITY_FIELD, "createdTime", 0L)) + .metaData(md) + .state(deviceState).build(); + } + + private EntityKeyType getKeyType() { + return persistToTelemetry ? EntityKeyType.TIME_SERIES : EntityKeyType.SERVER_ATTRIBUTE; + } + + private String getEntryValue(EntityData ed, EntityKeyType keyType, String keyName, String defaultValue) { + return getEntryValue(ed, keyType, keyName, s -> s, defaultValue); + } + + private long getEntryValue(EntityData ed, EntityKeyType keyType, String keyName, long defaultValue) { + return getEntryValue(ed, keyType, keyName, Long::parseLong, defaultValue); + } + + private boolean getEntryValue(EntityData ed, EntityKeyType keyType, String keyName, boolean defaultValue) { + return getEntryValue(ed, keyType, keyName, Boolean::parseBoolean, defaultValue); + } + + private T getEntryValue(EntityData ed, EntityKeyType entityKeyType, String attributeName, Function converter, T defaultValue) { + if (ed != null && ed.getLatest() != null) { + var map = ed.getLatest().get(entityKeyType); + if (map != null) { + var value = map.get(attributeName); + if (value != null && !StringUtils.isEmpty(value.getValue())) { + try { + return converter.apply(value.getValue()); + } catch (Exception e) { + return defaultValue; + } + } + } + } + return defaultValue; + } + + private long getEntryValue(List kvEntries, String attributeName, long defaultValue) { if (kvEntries != null) { for (KvEntry entry : kvEntries) { diff --git a/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateServiceTest.java b/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateServiceTest.java index 89a19d0e58..17e562fccf 100644 --- a/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateServiceTest.java @@ -28,6 +28,7 @@ import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.dao.timeseries.TimeseriesService; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.cluster.TbClusterService; +import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; @@ -53,6 +54,8 @@ public class DefaultDeviceStateServiceTest { PartitionService partitionService; @Mock DeviceStateData deviceStateDataMock; + @Mock + TbServiceInfoProvider serviceInfoProvider; DeviceId deviceId = DeviceId.fromString("00797a3b-7aeb-4b5b-b57a-c2a810d0f112"); @@ -60,7 +63,7 @@ public class DefaultDeviceStateServiceTest { @Before public void setUp() { - service = spy(new DefaultDeviceStateService(tenantService, deviceService, attributesService, tsService, clusterService, partitionService)); + service = spy(new DefaultDeviceStateService(tenantService, deviceService, attributesService, tsService, clusterService, partitionService, serviceInfoProvider, null)); } @Test diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/device/DeviceService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/device/DeviceService.java index e187ac8bf9..8f0c4b6451 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/device/DeviceService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/device/DeviceService.java @@ -21,6 +21,7 @@ import org.thingsboard.server.common.data.DeviceInfo; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.DeviceTransportType; import org.thingsboard.server.common.data.EntitySubtype; +import org.thingsboard.server.common.data.DeviceIdInfo; import org.thingsboard.server.common.data.device.DeviceSearchQuery; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; @@ -66,6 +67,8 @@ public interface DeviceService { PageData findDeviceInfosByTenantId(TenantId tenantId, PageLink pageLink); + PageData findDeviceIdInfos(PageLink pageLink); + PageData findDevicesByTenantIdAndType(TenantId tenantId, String type, PageLink pageLink); PageData findDevicesByTenantIdAndTypeAndEmptyOtaPackage(TenantId tenantId, DeviceProfileId deviceProfileId, OtaPackageType type, PageLink pageLink); @@ -78,6 +81,8 @@ public interface DeviceService { ListenableFuture> findDevicesByTenantIdAndIdsAsync(TenantId tenantId, List deviceIds); + ListenableFuture> findDevicesByIdsAsync(List deviceIds); + void deleteDevicesByTenantId(TenantId tenantId); PageData findDevicesByTenantIdAndCustomerId(TenantId tenantId, CustomerId customerId, PageLink pageLink); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/DeviceIdInfo.java b/common/data/src/main/java/org/thingsboard/server/common/data/DeviceIdInfo.java new file mode 100644 index 0000000000..156b8edbb7 --- /dev/null +++ b/common/data/src/main/java/org/thingsboard/server/common/data/DeviceIdInfo.java @@ -0,0 +1,42 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.data; + +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.TenantId; + +import java.io.Serializable; +import java.util.UUID; + +@Data +@Slf4j +public class DeviceIdInfo implements Serializable, HasTenantId { + + private static final long serialVersionUID = 2233745129677581815L; + + private final TenantId tenantId; + private final CustomerId customerId; + private final DeviceId deviceId; + + public DeviceIdInfo(UUID tenantId, UUID customerId, UUID deviceId) { + this.tenantId = new TenantId(tenantId); + this.customerId = customerId != null ? new CustomerId(customerId) : null; + this.deviceId = new DeviceId(deviceId); + } +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceDao.java b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceDao.java index c96f196d44..5996801454 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceDao.java @@ -20,6 +20,7 @@ import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceInfo; import org.thingsboard.server.common.data.DeviceTransportType; import org.thingsboard.server.common.data.EntitySubtype; +import org.thingsboard.server.common.data.DeviceIdInfo; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.ota.OtaPackageType; @@ -128,6 +129,14 @@ public interface DeviceDao extends Dao, TenantEntityDao, ExportableEntit */ ListenableFuture> findDevicesByTenantIdAndIdsAsync(UUID tenantId, List deviceIds); + /** + * Find devices by devices Ids. + * + * @param deviceIds the device Ids + * @return the list of device objects + */ + ListenableFuture> findDevicesByIdsAsync(List deviceIds); + /** * Find devices by tenantId, customerId and page link. * @@ -257,4 +266,8 @@ public interface DeviceDao extends Dao, TenantEntityDao, ExportableEntit * @return the list of device objects */ PageData findDevicesByTenantIdAndEdgeIdAndType(UUID tenantId, UUID edgeId, String type, PageLink pageLink); + + PageData findDeviceIdInfos(PageLink pageLink); + + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java index e7b10b8a0e..0f76e9a6b9 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/device/DeviceServiceImpl.java @@ -21,7 +21,6 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.RandomStringUtils; -import org.hibernate.exception.ConstraintViolationException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -39,6 +38,7 @@ import org.thingsboard.server.common.data.EntitySubtype; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityView; import org.thingsboard.server.common.data.StringUtils; +import org.thingsboard.server.common.data.DeviceIdInfo; import org.thingsboard.server.common.data.device.DeviceSearchQuery; import org.thingsboard.server.common.data.device.credentials.BasicMqttCredentials; import org.thingsboard.server.common.data.device.data.CoapDeviceTransportConfiguration; @@ -346,6 +346,13 @@ public class DeviceServiceImpl extends AbstractCachedEntityService findDeviceIdInfos(PageLink pageLink) { + log.trace("Executing findTenantDeviceIdPairs, pageLink [{}]", pageLink); + validatePageLink(pageLink); + return deviceDao.findDeviceIdInfos(pageLink); + } + @Override public PageData findDevicesByTenantIdAndType(TenantId tenantId, String type, PageLink pageLink) { log.trace("Executing findDevicesByTenantIdAndType, tenantId [{}], type [{}], pageLink [{}]", tenantId, type, pageLink); @@ -402,6 +409,13 @@ public class DeviceServiceImpl extends AbstractCachedEntityService> findDevicesByIdsAsync(List deviceIds) { + log.trace("Executing findDevicesByIdsAsync, deviceIds [{}]", deviceIds); + validateIds(deviceIds, "Incorrect deviceIds " + deviceIds); + return deviceDao.findDevicesByIdsAsync(toUUIDs(deviceIds)); + } + @Transactional @Override public void deleteDevicesByTenantId(TenantId tenantId) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/device/DeviceRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/device/DeviceRepository.java index 405dd58b2e..701f9266a9 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/device/DeviceRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/device/DeviceRepository.java @@ -21,6 +21,7 @@ import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.Query; import org.springframework.data.repository.query.Param; import org.thingsboard.server.common.data.DeviceTransportType; +import org.thingsboard.server.common.data.DeviceIdInfo; import org.thingsboard.server.dao.ExportableEntityRepository; import org.thingsboard.server.dao.model.sql.DeviceEntity; import org.thingsboard.server.dao.model.sql.DeviceInfoEntity; @@ -204,6 +205,8 @@ public interface DeviceRepository extends JpaRepository, Exp List findDevicesByTenantIdAndIdIn(UUID tenantId, List deviceIds); + List findDevicesByIdIn(List deviceIds); + DeviceEntity findByTenantIdAndId(UUID tenantId, UUID id); Long countByDeviceProfileId(UUID deviceProfileId); @@ -249,4 +252,7 @@ public interface DeviceRepository extends JpaRepository, Exp @Query("SELECT externalId FROM DeviceEntity WHERE id = :id") UUID getExternalIdById(@Param("id") UUID id); + @Query("SELECT new org.thingsboard.server.common.data.DeviceIdInfo(d.tenantId, d.customerId, d.id) FROM DeviceEntity d") + Page findDeviceIdInfos(Pageable pageable); + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/device/JpaDeviceDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/device/JpaDeviceDao.java index 8513e440b7..2858bf8284 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/device/JpaDeviceDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/device/JpaDeviceDao.java @@ -29,6 +29,7 @@ import org.thingsboard.server.common.data.DeviceInfo; import org.thingsboard.server.common.data.DeviceTransportType; import org.thingsboard.server.common.data.EntitySubtype; import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.DeviceIdInfo; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.ota.OtaPackageType; @@ -111,6 +112,11 @@ public class JpaDeviceDao extends JpaAbstractSearchTextDao return service.submit(() -> DaoUtil.convertDataList(deviceRepository.findDevicesByTenantIdAndIdIn(tenantId, deviceIds))); } + @Override + public ListenableFuture> findDevicesByIdsAsync(List deviceIds) { + return service.submit(() -> DaoUtil.convertDataList(deviceRepository.findDevicesByIdIn(deviceIds))); + } + @Override public PageData findDevicesByTenantIdAndCustomerId(UUID tenantId, UUID customerId, PageLink pageLink) { return DaoUtil.toPageData( @@ -304,6 +310,13 @@ public class JpaDeviceDao extends JpaAbstractSearchTextDao DaoUtil.toPageable(pageLink))); } + @Override + public PageData findDeviceIdInfos(PageLink pageLink) { + log.debug("Try to find tenant device id infos by pageLink [{}]", pageLink); + var page = deviceRepository.findDeviceIdInfos(DaoUtil.toPageable(pageLink)); + return new PageData<>(page.getContent(), page.getTotalPages(), page.getTotalElements(), page.hasNext()); + } + @Override public Device findByTenantIdAndExternalId(UUID tenantId, UUID externalId) { return DaoUtil.getData(deviceRepository.findByTenantIdAndExternalId(tenantId, externalId)); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultEntityQueryRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultEntityQueryRepository.java index 8d6af88c59..dde5f35f28 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultEntityQueryRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultEntityQueryRepository.java @@ -377,11 +377,20 @@ public class DefaultEntityQueryRepository implements EntityQueryRepository { } } + @Override + public PageData findEntityDataByQueryInternal(EntityDataQuery query) { + return findEntityDataByQuery(null, null, query, true); + } + @Override public PageData findEntityDataByQuery(TenantId tenantId, CustomerId customerId, EntityDataQuery query) { + return findEntityDataByQuery(tenantId, customerId, query, false); + } + + public PageData findEntityDataByQuery(TenantId tenantId, CustomerId customerId, EntityDataQuery query, boolean ignorePermissionCheck) { return transactionTemplate.execute(status -> { EntityType entityType = resolveEntityType(query.getEntityFilter()); - QueryContext ctx = new QueryContext(new QuerySecurityContext(tenantId, customerId, entityType)); + QueryContext ctx = new QueryContext(new QuerySecurityContext(tenantId, customerId, entityType, ignorePermissionCheck)); EntityDataPageLink pageLink = query.getPageLink(); List mappings = EntityKeyMapping.prepareKeyMapping(query); @@ -504,6 +513,9 @@ public class DefaultEntityQueryRepository implements EntityQueryRepository { } private String buildPermissionQuery(QueryContext ctx, EntityFilter entityFilter) { + if(ctx.isIgnorePermissionCheck()){ + return "1=1"; + } switch (entityFilter.getType()) { case RELATIONS_QUERY: case DEVICE_SEARCH_QUERY: diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/query/EntityQueryRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/query/EntityQueryRepository.java index e350896e66..264c1b0722 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/query/EntityQueryRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/query/EntityQueryRepository.java @@ -28,4 +28,6 @@ public interface EntityQueryRepository { PageData findEntityDataByQuery(TenantId tenantId, CustomerId customerId, EntityDataQuery query); + PageData findEntityDataByQueryInternal(EntityDataQuery query); + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/query/QueryContext.java b/dao/src/main/java/org/thingsboard/server/dao/sql/query/QueryContext.java index 4fa74e3ac0..012e9b7761 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/query/QueryContext.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/query/QueryContext.java @@ -146,4 +146,8 @@ public class QueryContext implements SqlParameterSource { public EntityType getEntityType() { return securityCtx.getEntityType(); } + + public boolean isIgnorePermissionCheck() { + return securityCtx.isIgnorePermissionCheck(); + } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/query/QuerySecurityContext.java b/dao/src/main/java/org/thingsboard/server/dao/sql/query/QuerySecurityContext.java index 72da81786a..eb2caf627f 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/query/QuerySecurityContext.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/query/QuerySecurityContext.java @@ -30,5 +30,10 @@ public class QuerySecurityContext { private final CustomerId customerId; @Getter private final EntityType entityType; + @Getter + private final boolean ignorePermissionCheck; + public QuerySecurityContext(TenantId tenantId, CustomerId customerId, EntityType entityType) { + this(tenantId, customerId, entityType, false); + } } \ No newline at end of file