Major improvement to device state service
This commit is contained in:
parent
499a804e8f
commit
8dc01ea5db
@ -70,6 +70,7 @@ import javax.annotation.PreDestroy;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@ -79,6 +80,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
@ -500,19 +502,19 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onAddedPartitions(Set<TopicPartitionInfo> addedPartitions) {
|
||||
protected Map<TopicPartitionInfo, List<ListenableFuture<?>>> onAddedPartitions(Set<TopicPartitionInfo> addedPartitions) {
|
||||
var result = new HashMap<TopicPartitionInfo, List<ListenableFuture<?>>>();
|
||||
try {
|
||||
log.info("Initializing tenant states.");
|
||||
updateLock.lock();
|
||||
try {
|
||||
PageDataIterable<Tenant> tenantIterator = new PageDataIterable<>(tenantService::findTenants, 1024);
|
||||
List<ListenableFuture<?>> futures = new ArrayList<>();
|
||||
for (Tenant tenant : tenantIterator) {
|
||||
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenant.getId(), tenant.getId());
|
||||
if (addedPartitions.contains(tpi)) {
|
||||
if (!myUsageStates.containsKey(tenant.getId()) && tpi.isMyPartition()) {
|
||||
log.debug("[{}] Initializing tenant state.", tenant.getId());
|
||||
futures.add(dbExecutor.submit(() -> {
|
||||
result.computeIfAbsent(tpi, tmp -> new ArrayList<>()).add(dbExecutor.submit(() -> {
|
||||
try {
|
||||
updateTenantState((TenantApiUsageState) getOrFetchState(tenant.getId(), tenant.getId()), tenantProfileCache.get(tenant.getTenantProfileId()));
|
||||
log.debug("[{}] Initialized tenant state.", tenant.getId());
|
||||
@ -526,14 +528,13 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService
|
||||
log.debug("[{}][{}] Tenant doesn't belong to current partition. tpi [{}]", tenant.getName(), tenant.getId(), tpi);
|
||||
}
|
||||
}
|
||||
Futures.whenAllComplete(futures);
|
||||
} finally {
|
||||
updateLock.unlock();
|
||||
}
|
||||
log.info("Initialized {} tenant states.", myUsageStates.size());
|
||||
} catch (Exception e) {
|
||||
log.warn("Unknown failure", e);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
|
||||
@ -15,9 +15,12 @@
|
||||
*/
|
||||
package org.thingsboard.server.service.partition;
|
||||
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.thingsboard.common.util.DonAsynchron;
|
||||
import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||
@ -25,7 +28,11 @@ import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||
import org.thingsboard.server.queue.discovery.TbApplicationEventListener;
|
||||
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
@ -37,6 +44,7 @@ import java.util.concurrent.Executors;
|
||||
public abstract class AbstractPartitionBasedService<T extends EntityId> extends TbApplicationEventListener<PartitionChangeEvent> {
|
||||
|
||||
protected final ConcurrentMap<TopicPartitionInfo, Set<T>> partitionedEntities = new ConcurrentHashMap<>();
|
||||
protected final ConcurrentMap<TopicPartitionInfo, List<ListenableFuture<?>>> partitionedFetchTasks = new ConcurrentHashMap<>();
|
||||
final Queue<Set<TopicPartitionInfo>> subscribeQueue = new ConcurrentLinkedQueue<>();
|
||||
|
||||
protected ListeningScheduledExecutorService scheduledExecutor;
|
||||
@ -45,7 +53,7 @@ public abstract class AbstractPartitionBasedService<T extends EntityId> extends
|
||||
|
||||
abstract protected String getSchedulerExecutorName();
|
||||
|
||||
abstract protected void onAddedPartitions(Set<TopicPartitionInfo> addedPartitions);
|
||||
abstract protected Map<TopicPartitionInfo, List<ListenableFuture<?>>> onAddedPartitions(Set<TopicPartitionInfo> addedPartitions);
|
||||
|
||||
abstract protected void cleanupEntityOnPartitionRemoval(T entityId);
|
||||
|
||||
@ -112,6 +120,10 @@ public abstract class AbstractPartitionBasedService<T extends EntityId> extends
|
||||
removedPartitions.forEach(partition -> {
|
||||
Set<T> entities = partitionedEntities.remove(partition);
|
||||
entities.forEach(this::cleanupEntityOnPartitionRemoval);
|
||||
List<ListenableFuture<?>> fetchTasks = partitionedFetchTasks.remove(partition);
|
||||
if (fetchTasks != null) {
|
||||
fetchTasks.forEach(f -> f.cancel(true));
|
||||
}
|
||||
});
|
||||
|
||||
onRepartitionEvent();
|
||||
@ -119,16 +131,29 @@ public abstract class AbstractPartitionBasedService<T extends EntityId> extends
|
||||
addedPartitions.forEach(tpi -> partitionedEntities.computeIfAbsent(tpi, key -> ConcurrentHashMap.newKeySet()));
|
||||
|
||||
if (!addedPartitions.isEmpty()) {
|
||||
onAddedPartitions(addedPartitions);
|
||||
var fetchTasks = onAddedPartitions(addedPartitions);
|
||||
if (fetchTasks != null && !fetchTasks.isEmpty()) {
|
||||
partitionedFetchTasks.putAll(fetchTasks);
|
||||
List<ListenableFuture<?>> futures = new ArrayList<>();
|
||||
fetchTasks.values().forEach(futures::addAll);
|
||||
DonAsynchron.withCallback(Futures.allAsList(futures),
|
||||
t -> logPartitions(), e -> log.error("Partition fetch task error", e));
|
||||
} else {
|
||||
logPartitions();
|
||||
}
|
||||
} else {
|
||||
logPartitions();
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
log.warn("[{}] Failed to init entities state from DB", getServiceName(), t);
|
||||
}
|
||||
}
|
||||
|
||||
private void logPartitions() {
|
||||
log.info("[{}] Managing following partitions:", getServiceName());
|
||||
partitionedEntities.forEach((tpi, entities) -> {
|
||||
log.info("[{}][{}]: {} entities", getServiceName(), tpi.getFullTopicName(), entities.size());
|
||||
});
|
||||
} catch (Throwable t) {
|
||||
log.warn("[{}] Failed to init entities state from DB", getServiceName(), t);
|
||||
}
|
||||
}
|
||||
|
||||
protected void onRepartitionEvent() {
|
||||
|
||||
@ -17,9 +17,12 @@ package org.thingsboard.server.service.state;
|
||||
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.util.concurrent.FutureCallback;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import lombok.Getter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
@ -31,8 +34,10 @@ import org.thingsboard.common.util.ThingsBoardThreadFactory;
|
||||
import org.thingsboard.server.cluster.TbClusterService;
|
||||
import org.thingsboard.server.common.data.DataConstants;
|
||||
import org.thingsboard.server.common.data.Device;
|
||||
import org.thingsboard.server.common.data.Tenant;
|
||||
import org.thingsboard.server.common.data.DeviceIdInfo;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
|
||||
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
|
||||
@ -42,7 +47,13 @@ import org.thingsboard.server.common.data.kv.LongDataEntry;
|
||||
import org.thingsboard.server.common.data.kv.TsKvEntry;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.PageDataIterable;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.common.data.query.EntityData;
|
||||
import org.thingsboard.server.common.data.query.EntityDataPageLink;
|
||||
import org.thingsboard.server.common.data.query.EntityDataQuery;
|
||||
import org.thingsboard.server.common.data.query.EntityDataSortOrder;
|
||||
import org.thingsboard.server.common.data.query.EntityKey;
|
||||
import org.thingsboard.server.common.data.query.EntityKeyType;
|
||||
import org.thingsboard.server.common.data.query.EntityListFilter;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
import org.thingsboard.server.common.msg.TbMsgDataType;
|
||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
||||
@ -51,10 +62,12 @@ import org.thingsboard.server.common.msg.queue.TbCallback;
|
||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||
import org.thingsboard.server.dao.attributes.AttributesService;
|
||||
import org.thingsboard.server.dao.device.DeviceService;
|
||||
import org.thingsboard.server.dao.sql.query.EntityQueryRepository;
|
||||
import org.thingsboard.server.dao.tenant.TenantService;
|
||||
import org.thingsboard.server.dao.timeseries.TimeseriesService;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.queue.discovery.PartitionService;
|
||||
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
|
||||
import org.thingsboard.server.queue.util.TbCoreComponent;
|
||||
import org.thingsboard.server.service.partition.AbstractPartitionBasedService;
|
||||
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
|
||||
@ -66,16 +79,19 @@ import javax.annotation.PreDestroy;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.thingsboard.server.common.data.DataConstants.ACTIVITY_EVENT;
|
||||
import static org.thingsboard.server.common.data.DataConstants.CONNECT_EVENT;
|
||||
@ -98,8 +114,28 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
|
||||
public static final String INACTIVITY_ALARM_TIME = "inactivityAlarmTime";
|
||||
public static final String INACTIVITY_TIMEOUT = "inactivityTimeout";
|
||||
|
||||
private static final List<EntityKey> PERSISTENT_TELEMETRY_KEYS = Arrays.asList(
|
||||
new EntityKey(EntityKeyType.TIME_SERIES, LAST_ACTIVITY_TIME),
|
||||
new EntityKey(EntityKeyType.TIME_SERIES, INACTIVITY_ALARM_TIME),
|
||||
new EntityKey(EntityKeyType.TIME_SERIES, INACTIVITY_TIMEOUT),
|
||||
new EntityKey(EntityKeyType.TIME_SERIES, ACTIVITY_STATE),
|
||||
new EntityKey(EntityKeyType.TIME_SERIES, LAST_CONNECT_TIME),
|
||||
new EntityKey(EntityKeyType.TIME_SERIES, LAST_DISCONNECT_TIME));
|
||||
|
||||
private static final List<EntityKey> PERSISTENT_ATTRIBUTE_KEYS = Arrays.asList(
|
||||
new EntityKey(EntityKeyType.SERVER_ATTRIBUTE, LAST_ACTIVITY_TIME),
|
||||
new EntityKey(EntityKeyType.SERVER_ATTRIBUTE, INACTIVITY_ALARM_TIME),
|
||||
new EntityKey(EntityKeyType.SERVER_ATTRIBUTE, INACTIVITY_TIMEOUT),
|
||||
new EntityKey(EntityKeyType.SERVER_ATTRIBUTE, ACTIVITY_STATE),
|
||||
new EntityKey(EntityKeyType.SERVER_ATTRIBUTE, LAST_CONNECT_TIME),
|
||||
new EntityKey(EntityKeyType.SERVER_ATTRIBUTE, LAST_DISCONNECT_TIME));
|
||||
|
||||
public static final List<String> PERSISTENT_ATTRIBUTES = Arrays.asList(ACTIVITY_STATE, LAST_CONNECT_TIME,
|
||||
LAST_DISCONNECT_TIME, LAST_ACTIVITY_TIME, INACTIVITY_ALARM_TIME, INACTIVITY_TIMEOUT);
|
||||
private static final List<EntityKey> PERSISTENT_ENTITY_FIELDS = Arrays.asList(
|
||||
new EntityKey(EntityKeyType.ENTITY_FIELD, "name"),
|
||||
new EntityKey(EntityKeyType.ENTITY_FIELD, "type"),
|
||||
new EntityKey(EntityKeyType.ENTITY_FIELD, "createdTime"));
|
||||
|
||||
private final TenantService tenantService;
|
||||
private final DeviceService deviceService;
|
||||
@ -107,6 +143,8 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
|
||||
private final TimeseriesService tsService;
|
||||
private final TbClusterService clusterService;
|
||||
private final PartitionService partitionService;
|
||||
private final TbServiceInfoProvider serviceInfoProvider;
|
||||
private final EntityQueryRepository entityQueryRepository;
|
||||
|
||||
private TelemetrySubscriptionService tsSubService;
|
||||
|
||||
@ -126,19 +164,23 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
|
||||
@Getter
|
||||
private int initFetchPackSize;
|
||||
|
||||
private ExecutorService deviceStateExecutor;
|
||||
private ListeningExecutorService deviceStateExecutor;
|
||||
|
||||
final ConcurrentMap<DeviceId, DeviceStateData> deviceStates = new ConcurrentHashMap<>();
|
||||
|
||||
public DefaultDeviceStateService(TenantService tenantService, DeviceService deviceService,
|
||||
AttributesService attributesService, TimeseriesService tsService,
|
||||
TbClusterService clusterService, PartitionService partitionService) {
|
||||
TbClusterService clusterService, PartitionService partitionService,
|
||||
TbServiceInfoProvider serviceInfoProvider,
|
||||
EntityQueryRepository entityQueryRepository) {
|
||||
this.tenantService = tenantService;
|
||||
this.deviceService = deviceService;
|
||||
this.attributesService = attributesService;
|
||||
this.tsService = tsService;
|
||||
this.clusterService = clusterService;
|
||||
this.partitionService = partitionService;
|
||||
this.serviceInfoProvider = serviceInfoProvider;
|
||||
this.entityQueryRepository = entityQueryRepository;
|
||||
}
|
||||
|
||||
@Autowired
|
||||
@ -149,8 +191,8 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
super.init();
|
||||
deviceStateExecutor = Executors.newFixedThreadPool(
|
||||
Runtime.getRuntime().availableProcessors(), ThingsBoardThreadFactory.forName("device-state"));
|
||||
deviceStateExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(
|
||||
Runtime.getRuntime().availableProcessors(), ThingsBoardThreadFactory.forName("device-state")));
|
||||
scheduledExecutor.scheduleAtFixedRate(this::updateInactivityStateIfExpired, new Random().nextInt(defaultStateCheckIntervalInSec), defaultStateCheckIntervalInSec, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
@ -167,7 +209,6 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
|
||||
return "Device State";
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected String getSchedulerExecutorName() {
|
||||
return "device-state-scheduled";
|
||||
@ -297,60 +338,34 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onAddedPartitions(Set<TopicPartitionInfo> addedPartitions) {
|
||||
PageDataIterable<Tenant> tenantIterator = new PageDataIterable<>(tenantService::findTenants, 1024);
|
||||
for (Tenant tenant : tenantIterator) {
|
||||
log.debug("Finding devices for tenant [{}]", tenant.getName());
|
||||
final PageLink pageLink = new PageLink(initFetchPackSize);
|
||||
processPageAndSubmitNextPage(addedPartitions, tenant, pageLink);
|
||||
protected Map<TopicPartitionInfo, List<ListenableFuture<?>>> onAddedPartitions(Set<TopicPartitionInfo> addedPartitions) {
|
||||
var result = new HashMap<TopicPartitionInfo, List<ListenableFuture<?>>>();
|
||||
PageDataIterable<DeviceIdInfo> deviceIdInfos = new PageDataIterable<>(deviceService::findDeviceIdInfos, 1);
|
||||
Map<TopicPartitionInfo, List<DeviceIdInfo>> tpiDeviceMap = new HashMap<>();
|
||||
|
||||
for (DeviceIdInfo idInfo : deviceIdInfos) {
|
||||
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, idInfo.getTenantId(), idInfo.getDeviceId());
|
||||
if (addedPartitions.contains(tpi) && !deviceStates.containsKey(idInfo.getDeviceId())) {
|
||||
tpiDeviceMap.computeIfAbsent(tpi, tmp -> new ArrayList<>()).add(idInfo);
|
||||
}
|
||||
}
|
||||
|
||||
private void processPageAndSubmitNextPage(final Set<TopicPartitionInfo> addedPartitions, final Tenant tenant, final PageLink pageLink) {
|
||||
log.trace("[{}] Process page {} from {}", tenant, pageLink.getPage(), pageLink.getPageSize());
|
||||
List<ListenableFuture<Void>> fetchFutures = new ArrayList<>();
|
||||
PageData<Device> page = deviceService.findDevicesByTenantId(tenant.getId(), pageLink);
|
||||
for (Device device : page.getData()) {
|
||||
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenant.getId(), device.getId());
|
||||
if (addedPartitions.contains(tpi) && !deviceStates.containsKey(device.getId())) {
|
||||
log.debug("[{}][{}] Device belong to current partition. tpi [{}]. Fetching state from DB", device.getName(), device.getId(), tpi);
|
||||
ListenableFuture<Void> future = Futures.transform(fetchDeviceState(device), new Function<>() {
|
||||
@Nullable
|
||||
@Override
|
||||
public Void apply(@Nullable DeviceStateData state) {
|
||||
if (state != null) {
|
||||
addDeviceUsingState(tpi, state);
|
||||
checkAndUpdateState(device.getId(), state);
|
||||
} else {
|
||||
log.warn("{}][{}] Fetched null state from DB", device.getName(), device.getId());
|
||||
for (var entry : tpiDeviceMap.entrySet()) {
|
||||
AtomicInteger counter = new AtomicInteger(0);
|
||||
for (List<DeviceIdInfo> partition : Lists.partition(entry.getValue(), initFetchPackSize)) {
|
||||
log.info("[{}] Submit task for device states: {}", entry.getKey(), partition.size());
|
||||
var devicePackFuture = deviceStateExecutor.submit(() -> {
|
||||
var states = fetchDeviceStateData(partition);
|
||||
for (var state : states) {
|
||||
addDeviceUsingState(entry.getKey(), state);
|
||||
checkAndUpdateState(state.getDeviceId(), state);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}, deviceStateExecutor);
|
||||
fetchFutures.add(future);
|
||||
} else {
|
||||
log.debug("[{}][{}] Device doesn't belong to current partition. tpi [{}]", device.getName(), device.getId(), tpi);
|
||||
log.info("[{}] Initialized {} out of {} device states", entry.getKey().getPartition().orElse(0), counter.addAndGet(states.size()), entry.getValue().size());
|
||||
});
|
||||
result.computeIfAbsent(entry.getKey(), tmp -> new ArrayList<>()).add(devicePackFuture);
|
||||
}
|
||||
}
|
||||
|
||||
Futures.addCallback(Futures.successfulAsList(fetchFutures), new FutureCallback<>() {
|
||||
@Override
|
||||
public void onSuccess(List<Void> result) {
|
||||
log.trace("[{}] Success init device state from DB for batch size {}", tenant.getId(), result.size());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
log.warn("[" + tenant.getId() + "] Failed to init device state service from DB", t);
|
||||
log.warn("[{}] Failed to init device state service from DB", tenant.getId(), t);
|
||||
}
|
||||
}, deviceStateExecutor);
|
||||
|
||||
final PageLink nextPageLink = page.hasNext() ? pageLink.nextPageLink() : null;
|
||||
if (nextPageLink != null) {
|
||||
log.trace("[{}] Submit next page {} from {}", tenant, nextPageLink.getPage(), nextPageLink.getPageSize());
|
||||
processPageAndSubmitNextPage(addedPartitions, tenant, nextPageLink);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
void checkAndUpdateState(@Nonnull DeviceId deviceId, @Nonnull DeviceStateData state) {
|
||||
@ -376,13 +391,21 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
|
||||
}
|
||||
|
||||
void updateInactivityStateIfExpired() {
|
||||
try {
|
||||
final long ts = System.currentTimeMillis();
|
||||
partitionedEntities.forEach((tpi, deviceIds) -> {
|
||||
log.debug("Calculating state updates. tpi {} for {} devices", tpi.getFullTopicName(), deviceIds.size());
|
||||
for (DeviceId deviceId : deviceIds) {
|
||||
try {
|
||||
updateInactivityStateIfExpired(ts, deviceId);
|
||||
} catch (Exception e) {
|
||||
log.warn("[{}] Failed to update inactivity state", deviceId, e);
|
||||
}
|
||||
}
|
||||
});
|
||||
} catch (Throwable t) {
|
||||
log.warn("Failed to update inactivity states", t);
|
||||
}
|
||||
}
|
||||
|
||||
void updateInactivityStateIfExpired(long ts, DeviceId deviceId) {
|
||||
@ -540,6 +563,83 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
|
||||
};
|
||||
}
|
||||
|
||||
private List<DeviceStateData> fetchDeviceStateData(List<DeviceIdInfo> deviceIds) {
|
||||
EntityListFilter ef = new EntityListFilter();
|
||||
ef.setEntityType(EntityType.DEVICE);
|
||||
ef.setEntityList(deviceIds.stream().map(DeviceIdInfo::getDeviceId).map(DeviceId::getId).map(UUID::toString).collect(Collectors.toList()));
|
||||
|
||||
EntityDataQuery query = new EntityDataQuery(ef,
|
||||
new EntityDataPageLink(deviceIds.size(), 0, null, new EntityDataSortOrder(new EntityKey(EntityKeyType.ENTITY_FIELD, "id"))),
|
||||
PERSISTENT_ENTITY_FIELDS,
|
||||
persistToTelemetry ? PERSISTENT_TELEMETRY_KEYS : PERSISTENT_ATTRIBUTE_KEYS, Collections.emptyList());
|
||||
PageData<EntityData> queryResult = entityQueryRepository.findEntityDataByQueryInternal(query);
|
||||
|
||||
Map<EntityId, DeviceIdInfo> deviceIdInfos = deviceIds.stream().collect(Collectors.toMap(DeviceIdInfo::getDeviceId, java.util.function.Function.identity()));
|
||||
|
||||
return queryResult.getData().stream().map(ed -> toDeviceStateData(ed, deviceIdInfos.get(ed.getEntityId()))).collect(Collectors.toList());
|
||||
|
||||
}
|
||||
|
||||
private DeviceStateData toDeviceStateData(EntityData ed, DeviceIdInfo deviceIdInfo) {
|
||||
long lastActivityTime = getEntryValue(ed, getKeyType(), LAST_ACTIVITY_TIME, 0L);
|
||||
long inactivityAlarmTime = getEntryValue(ed, getKeyType(), INACTIVITY_ALARM_TIME, 0L);
|
||||
long inactivityTimeout = getEntryValue(ed, getKeyType(), INACTIVITY_TIMEOUT, TimeUnit.SECONDS.toMillis(defaultInactivityTimeoutInSec));
|
||||
//Actual active state by wall-clock will updated outside this method. This method is only for fetch persistent state
|
||||
final boolean active = getEntryValue(ed, getKeyType(), ACTIVITY_STATE, false);
|
||||
DeviceState deviceState = DeviceState.builder()
|
||||
.active(active)
|
||||
.lastConnectTime(getEntryValue(ed, getKeyType(), LAST_CONNECT_TIME, 0L))
|
||||
.lastDisconnectTime(getEntryValue(ed, getKeyType(), LAST_DISCONNECT_TIME, 0L))
|
||||
.lastActivityTime(lastActivityTime)
|
||||
.lastInactivityAlarmTime(inactivityAlarmTime)
|
||||
.inactivityTimeout(inactivityTimeout)
|
||||
.build();
|
||||
TbMsgMetaData md = new TbMsgMetaData();
|
||||
md.putValue("deviceName", getEntryValue(ed, EntityKeyType.ENTITY_FIELD, "name", ""));
|
||||
md.putValue("deviceType", getEntryValue(ed, EntityKeyType.ENTITY_FIELD, "type", ""));
|
||||
return DeviceStateData.builder()
|
||||
.customerId(deviceIdInfo.getCustomerId())
|
||||
.tenantId(deviceIdInfo.getTenantId())
|
||||
.deviceId(deviceIdInfo.getDeviceId())
|
||||
.deviceCreationTime(getEntryValue(ed, EntityKeyType.ENTITY_FIELD, "createdTime", 0L))
|
||||
.metaData(md)
|
||||
.state(deviceState).build();
|
||||
}
|
||||
|
||||
private EntityKeyType getKeyType() {
|
||||
return persistToTelemetry ? EntityKeyType.TIME_SERIES : EntityKeyType.SERVER_ATTRIBUTE;
|
||||
}
|
||||
|
||||
private String getEntryValue(EntityData ed, EntityKeyType keyType, String keyName, String defaultValue) {
|
||||
return getEntryValue(ed, keyType, keyName, s -> s, defaultValue);
|
||||
}
|
||||
|
||||
private long getEntryValue(EntityData ed, EntityKeyType keyType, String keyName, long defaultValue) {
|
||||
return getEntryValue(ed, keyType, keyName, Long::parseLong, defaultValue);
|
||||
}
|
||||
|
||||
private boolean getEntryValue(EntityData ed, EntityKeyType keyType, String keyName, boolean defaultValue) {
|
||||
return getEntryValue(ed, keyType, keyName, Boolean::parseBoolean, defaultValue);
|
||||
}
|
||||
|
||||
private <T> T getEntryValue(EntityData ed, EntityKeyType entityKeyType, String attributeName, Function<String, T> converter, T defaultValue) {
|
||||
if (ed != null && ed.getLatest() != null) {
|
||||
var map = ed.getLatest().get(entityKeyType);
|
||||
if (map != null) {
|
||||
var value = map.get(attributeName);
|
||||
if (value != null && !StringUtils.isEmpty(value.getValue())) {
|
||||
try {
|
||||
return converter.apply(value.getValue());
|
||||
} catch (Exception e) {
|
||||
return defaultValue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return defaultValue;
|
||||
}
|
||||
|
||||
|
||||
private long getEntryValue(List<? extends KvEntry> kvEntries, String attributeName, long defaultValue) {
|
||||
if (kvEntries != null) {
|
||||
for (KvEntry entry : kvEntries) {
|
||||
|
||||
@ -28,6 +28,7 @@ import org.thingsboard.server.dao.tenant.TenantService;
|
||||
import org.thingsboard.server.dao.timeseries.TimeseriesService;
|
||||
import org.thingsboard.server.queue.discovery.PartitionService;
|
||||
import org.thingsboard.server.cluster.TbClusterService;
|
||||
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
@ -53,6 +54,8 @@ public class DefaultDeviceStateServiceTest {
|
||||
PartitionService partitionService;
|
||||
@Mock
|
||||
DeviceStateData deviceStateDataMock;
|
||||
@Mock
|
||||
TbServiceInfoProvider serviceInfoProvider;
|
||||
|
||||
DeviceId deviceId = DeviceId.fromString("00797a3b-7aeb-4b5b-b57a-c2a810d0f112");
|
||||
|
||||
@ -60,7 +63,7 @@ public class DefaultDeviceStateServiceTest {
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
service = spy(new DefaultDeviceStateService(tenantService, deviceService, attributesService, tsService, clusterService, partitionService));
|
||||
service = spy(new DefaultDeviceStateService(tenantService, deviceService, attributesService, tsService, clusterService, partitionService, serviceInfoProvider, null));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@ -21,6 +21,7 @@ import org.thingsboard.server.common.data.DeviceInfo;
|
||||
import org.thingsboard.server.common.data.DeviceProfile;
|
||||
import org.thingsboard.server.common.data.DeviceTransportType;
|
||||
import org.thingsboard.server.common.data.EntitySubtype;
|
||||
import org.thingsboard.server.common.data.DeviceIdInfo;
|
||||
import org.thingsboard.server.common.data.device.DeviceSearchQuery;
|
||||
import org.thingsboard.server.common.data.id.CustomerId;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
@ -66,6 +67,8 @@ public interface DeviceService {
|
||||
|
||||
PageData<DeviceInfo> findDeviceInfosByTenantId(TenantId tenantId, PageLink pageLink);
|
||||
|
||||
PageData<DeviceIdInfo> findDeviceIdInfos(PageLink pageLink);
|
||||
|
||||
PageData<Device> findDevicesByTenantIdAndType(TenantId tenantId, String type, PageLink pageLink);
|
||||
|
||||
PageData<Device> findDevicesByTenantIdAndTypeAndEmptyOtaPackage(TenantId tenantId, DeviceProfileId deviceProfileId, OtaPackageType type, PageLink pageLink);
|
||||
@ -78,6 +81,8 @@ public interface DeviceService {
|
||||
|
||||
ListenableFuture<List<Device>> findDevicesByTenantIdAndIdsAsync(TenantId tenantId, List<DeviceId> deviceIds);
|
||||
|
||||
ListenableFuture<List<Device>> findDevicesByIdsAsync(List<DeviceId> deviceIds);
|
||||
|
||||
void deleteDevicesByTenantId(TenantId tenantId);
|
||||
|
||||
PageData<Device> findDevicesByTenantIdAndCustomerId(TenantId tenantId, CustomerId customerId, PageLink pageLink);
|
||||
|
||||
@ -0,0 +1,42 @@
|
||||
/**
|
||||
* Copyright © 2016-2022 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.common.data;
|
||||
|
||||
import lombok.Data;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.thingsboard.server.common.data.id.CustomerId;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.UUID;
|
||||
|
||||
@Data
|
||||
@Slf4j
|
||||
public class DeviceIdInfo implements Serializable, HasTenantId {
|
||||
|
||||
private static final long serialVersionUID = 2233745129677581815L;
|
||||
|
||||
private final TenantId tenantId;
|
||||
private final CustomerId customerId;
|
||||
private final DeviceId deviceId;
|
||||
|
||||
public DeviceIdInfo(UUID tenantId, UUID customerId, UUID deviceId) {
|
||||
this.tenantId = new TenantId(tenantId);
|
||||
this.customerId = customerId != null ? new CustomerId(customerId) : null;
|
||||
this.deviceId = new DeviceId(deviceId);
|
||||
}
|
||||
}
|
||||
@ -20,6 +20,7 @@ import org.thingsboard.server.common.data.Device;
|
||||
import org.thingsboard.server.common.data.DeviceInfo;
|
||||
import org.thingsboard.server.common.data.DeviceTransportType;
|
||||
import org.thingsboard.server.common.data.EntitySubtype;
|
||||
import org.thingsboard.server.common.data.DeviceIdInfo;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.ota.OtaPackageType;
|
||||
@ -128,6 +129,14 @@ public interface DeviceDao extends Dao<Device>, TenantEntityDao, ExportableEntit
|
||||
*/
|
||||
ListenableFuture<List<Device>> findDevicesByTenantIdAndIdsAsync(UUID tenantId, List<UUID> deviceIds);
|
||||
|
||||
/**
|
||||
* Find devices by devices Ids.
|
||||
*
|
||||
* @param deviceIds the device Ids
|
||||
* @return the list of device objects
|
||||
*/
|
||||
ListenableFuture<List<Device>> findDevicesByIdsAsync(List<UUID> deviceIds);
|
||||
|
||||
/**
|
||||
* Find devices by tenantId, customerId and page link.
|
||||
*
|
||||
@ -257,4 +266,8 @@ public interface DeviceDao extends Dao<Device>, TenantEntityDao, ExportableEntit
|
||||
* @return the list of device objects
|
||||
*/
|
||||
PageData<Device> findDevicesByTenantIdAndEdgeIdAndType(UUID tenantId, UUID edgeId, String type, PageLink pageLink);
|
||||
|
||||
PageData<DeviceIdInfo> findDeviceIdInfos(PageLink pageLink);
|
||||
|
||||
|
||||
}
|
||||
|
||||
@ -21,7 +21,6 @@ import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.RandomStringUtils;
|
||||
import org.hibernate.exception.ConstraintViolationException;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
@ -39,6 +38,7 @@ import org.thingsboard.server.common.data.EntitySubtype;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
import org.thingsboard.server.common.data.EntityView;
|
||||
import org.thingsboard.server.common.data.StringUtils;
|
||||
import org.thingsboard.server.common.data.DeviceIdInfo;
|
||||
import org.thingsboard.server.common.data.device.DeviceSearchQuery;
|
||||
import org.thingsboard.server.common.data.device.credentials.BasicMqttCredentials;
|
||||
import org.thingsboard.server.common.data.device.data.CoapDeviceTransportConfiguration;
|
||||
@ -346,6 +346,13 @@ public class DeviceServiceImpl extends AbstractCachedEntityService<DeviceCacheKe
|
||||
return deviceDao.findDeviceInfosByTenantId(tenantId.getId(), pageLink);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PageData<DeviceIdInfo> findDeviceIdInfos(PageLink pageLink) {
|
||||
log.trace("Executing findTenantDeviceIdPairs, pageLink [{}]", pageLink);
|
||||
validatePageLink(pageLink);
|
||||
return deviceDao.findDeviceIdInfos(pageLink);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PageData<Device> findDevicesByTenantIdAndType(TenantId tenantId, String type, PageLink pageLink) {
|
||||
log.trace("Executing findDevicesByTenantIdAndType, tenantId [{}], type [{}], pageLink [{}]", tenantId, type, pageLink);
|
||||
@ -402,6 +409,13 @@ public class DeviceServiceImpl extends AbstractCachedEntityService<DeviceCacheKe
|
||||
return deviceDao.findDevicesByTenantIdAndIdsAsync(tenantId.getId(), toUUIDs(deviceIds));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<List<Device>> findDevicesByIdsAsync(List<DeviceId> deviceIds) {
|
||||
log.trace("Executing findDevicesByIdsAsync, deviceIds [{}]", deviceIds);
|
||||
validateIds(deviceIds, "Incorrect deviceIds " + deviceIds);
|
||||
return deviceDao.findDevicesByIdsAsync(toUUIDs(deviceIds));
|
||||
}
|
||||
|
||||
@Transactional
|
||||
@Override
|
||||
public void deleteDevicesByTenantId(TenantId tenantId) {
|
||||
|
||||
@ -21,6 +21,7 @@ import org.springframework.data.jpa.repository.JpaRepository;
|
||||
import org.springframework.data.jpa.repository.Query;
|
||||
import org.springframework.data.repository.query.Param;
|
||||
import org.thingsboard.server.common.data.DeviceTransportType;
|
||||
import org.thingsboard.server.common.data.DeviceIdInfo;
|
||||
import org.thingsboard.server.dao.ExportableEntityRepository;
|
||||
import org.thingsboard.server.dao.model.sql.DeviceEntity;
|
||||
import org.thingsboard.server.dao.model.sql.DeviceInfoEntity;
|
||||
@ -204,6 +205,8 @@ public interface DeviceRepository extends JpaRepository<DeviceEntity, UUID>, Exp
|
||||
|
||||
List<DeviceEntity> findDevicesByTenantIdAndIdIn(UUID tenantId, List<UUID> deviceIds);
|
||||
|
||||
List<DeviceEntity> findDevicesByIdIn(List<UUID> deviceIds);
|
||||
|
||||
DeviceEntity findByTenantIdAndId(UUID tenantId, UUID id);
|
||||
|
||||
Long countByDeviceProfileId(UUID deviceProfileId);
|
||||
@ -249,4 +252,7 @@ public interface DeviceRepository extends JpaRepository<DeviceEntity, UUID>, Exp
|
||||
@Query("SELECT externalId FROM DeviceEntity WHERE id = :id")
|
||||
UUID getExternalIdById(@Param("id") UUID id);
|
||||
|
||||
@Query("SELECT new org.thingsboard.server.common.data.DeviceIdInfo(d.tenantId, d.customerId, d.id) FROM DeviceEntity d")
|
||||
Page<DeviceIdInfo> findDeviceIdInfos(Pageable pageable);
|
||||
|
||||
}
|
||||
|
||||
@ -29,6 +29,7 @@ import org.thingsboard.server.common.data.DeviceInfo;
|
||||
import org.thingsboard.server.common.data.DeviceTransportType;
|
||||
import org.thingsboard.server.common.data.EntitySubtype;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
import org.thingsboard.server.common.data.DeviceIdInfo;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.ota.OtaPackageType;
|
||||
@ -111,6 +112,11 @@ public class JpaDeviceDao extends JpaAbstractSearchTextDao<DeviceEntity, Device>
|
||||
return service.submit(() -> DaoUtil.convertDataList(deviceRepository.findDevicesByTenantIdAndIdIn(tenantId, deviceIds)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<List<Device>> findDevicesByIdsAsync(List<UUID> deviceIds) {
|
||||
return service.submit(() -> DaoUtil.convertDataList(deviceRepository.findDevicesByIdIn(deviceIds)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public PageData<Device> findDevicesByTenantIdAndCustomerId(UUID tenantId, UUID customerId, PageLink pageLink) {
|
||||
return DaoUtil.toPageData(
|
||||
@ -304,6 +310,13 @@ public class JpaDeviceDao extends JpaAbstractSearchTextDao<DeviceEntity, Device>
|
||||
DaoUtil.toPageable(pageLink)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public PageData<DeviceIdInfo> findDeviceIdInfos(PageLink pageLink) {
|
||||
log.debug("Try to find tenant device id infos by pageLink [{}]", pageLink);
|
||||
var page = deviceRepository.findDeviceIdInfos(DaoUtil.toPageable(pageLink));
|
||||
return new PageData<>(page.getContent(), page.getTotalPages(), page.getTotalElements(), page.hasNext());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Device findByTenantIdAndExternalId(UUID tenantId, UUID externalId) {
|
||||
return DaoUtil.getData(deviceRepository.findByTenantIdAndExternalId(tenantId, externalId));
|
||||
|
||||
@ -377,11 +377,20 @@ public class DefaultEntityQueryRepository implements EntityQueryRepository {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public PageData<EntityData> findEntityDataByQueryInternal(EntityDataQuery query) {
|
||||
return findEntityDataByQuery(null, null, query, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PageData<EntityData> findEntityDataByQuery(TenantId tenantId, CustomerId customerId, EntityDataQuery query) {
|
||||
return findEntityDataByQuery(tenantId, customerId, query, false);
|
||||
}
|
||||
|
||||
public PageData<EntityData> findEntityDataByQuery(TenantId tenantId, CustomerId customerId, EntityDataQuery query, boolean ignorePermissionCheck) {
|
||||
return transactionTemplate.execute(status -> {
|
||||
EntityType entityType = resolveEntityType(query.getEntityFilter());
|
||||
QueryContext ctx = new QueryContext(new QuerySecurityContext(tenantId, customerId, entityType));
|
||||
QueryContext ctx = new QueryContext(new QuerySecurityContext(tenantId, customerId, entityType, ignorePermissionCheck));
|
||||
EntityDataPageLink pageLink = query.getPageLink();
|
||||
|
||||
List<EntityKeyMapping> mappings = EntityKeyMapping.prepareKeyMapping(query);
|
||||
@ -504,6 +513,9 @@ public class DefaultEntityQueryRepository implements EntityQueryRepository {
|
||||
}
|
||||
|
||||
private String buildPermissionQuery(QueryContext ctx, EntityFilter entityFilter) {
|
||||
if(ctx.isIgnorePermissionCheck()){
|
||||
return "1=1";
|
||||
}
|
||||
switch (entityFilter.getType()) {
|
||||
case RELATIONS_QUERY:
|
||||
case DEVICE_SEARCH_QUERY:
|
||||
|
||||
@ -28,4 +28,6 @@ public interface EntityQueryRepository {
|
||||
|
||||
PageData<EntityData> findEntityDataByQuery(TenantId tenantId, CustomerId customerId, EntityDataQuery query);
|
||||
|
||||
PageData<EntityData> findEntityDataByQueryInternal(EntityDataQuery query);
|
||||
|
||||
}
|
||||
|
||||
@ -146,4 +146,8 @@ public class QueryContext implements SqlParameterSource {
|
||||
public EntityType getEntityType() {
|
||||
return securityCtx.getEntityType();
|
||||
}
|
||||
|
||||
public boolean isIgnorePermissionCheck() {
|
||||
return securityCtx.isIgnorePermissionCheck();
|
||||
}
|
||||
}
|
||||
|
||||
@ -30,5 +30,10 @@ public class QuerySecurityContext {
|
||||
private final CustomerId customerId;
|
||||
@Getter
|
||||
private final EntityType entityType;
|
||||
@Getter
|
||||
private final boolean ignorePermissionCheck;
|
||||
|
||||
public QuerySecurityContext(TenantId tenantId, CustomerId customerId, EntityType entityType) {
|
||||
this(tenantId, customerId, entityType, false);
|
||||
}
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user