Added additional check that device belongs to current node before adding to states map. Added clean up in case device doesnt belong to this node anymore
This commit is contained in:
parent
7be8572364
commit
2d4831af39
@ -381,11 +381,11 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
|
||||
}
|
||||
|
||||
private void reportSessionOpen() {
|
||||
systemContext.getDeviceStateService().onDeviceConnect(deviceId);
|
||||
systemContext.getDeviceStateService().onDeviceConnect(tenantId, deviceId);
|
||||
}
|
||||
|
||||
private void reportSessionClose() {
|
||||
systemContext.getDeviceStateService().onDeviceDisconnect(deviceId);
|
||||
systemContext.getDeviceStateService().onDeviceDisconnect(tenantId, deviceId);
|
||||
}
|
||||
|
||||
private void handleGetAttributesRequest(TbActorCtx context, SessionInfoProto sessionInfo, GetAttributeRequestMsg request) {
|
||||
@ -590,7 +590,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
|
||||
if (sessions.size() == 1) {
|
||||
reportSessionOpen();
|
||||
}
|
||||
systemContext.getDeviceStateService().onDeviceActivity(deviceId, System.currentTimeMillis());
|
||||
systemContext.getDeviceStateService().onDeviceActivity(tenantId, deviceId, System.currentTimeMillis());
|
||||
dumpSessions();
|
||||
} else if (msg.getEvent() == SessionEvent.CLOSED) {
|
||||
log.debug("[{}] Canceling subscriptions for closed session [{}]", deviceId, sessionId);
|
||||
@ -620,7 +620,7 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
|
||||
if (subscriptionInfo.getRpcSubscription()) {
|
||||
rpcSubscriptions.putIfAbsent(sessionId, sessionMD.getSessionInfo());
|
||||
}
|
||||
systemContext.getDeviceStateService().onDeviceActivity(deviceId, subscriptionInfo.getLastActivityTime());
|
||||
systemContext.getDeviceStateService().onDeviceActivity(tenantId, deviceId, subscriptionInfo.getLastActivityTime());
|
||||
dumpSessions();
|
||||
}
|
||||
|
||||
|
||||
@ -137,7 +137,6 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
|
||||
private ExecutorService deviceStateExecutor;
|
||||
private final ConcurrentMap<TopicPartitionInfo, Set<DeviceId>> partitionedDevices = new ConcurrentHashMap<>();
|
||||
final ConcurrentMap<DeviceId, DeviceStateData> deviceStates = new ConcurrentHashMap<>();
|
||||
private final ConcurrentMap<DeviceId, Long> deviceLastSavedActivity = new ConcurrentHashMap<>();
|
||||
|
||||
final Queue<Set<TopicPartitionInfo>> subscribeQueue = new ConcurrentLinkedQueue<>();
|
||||
|
||||
@ -192,7 +191,7 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onDeviceConnect(DeviceId deviceId) {
|
||||
public void onDeviceConnect(TenantId tenantId, DeviceId deviceId) {
|
||||
log.trace("on Device Connect [{}]", deviceId.getId());
|
||||
DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
|
||||
long ts = System.currentTimeMillis();
|
||||
@ -200,23 +199,23 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
|
||||
save(deviceId, LAST_CONNECT_TIME, ts);
|
||||
pushRuleEngineMessage(stateData, CONNECT_EVENT);
|
||||
checkAndUpdateState(deviceId, stateData);
|
||||
cleanDeviceStateIfBelongsExternalPartition(tenantId, deviceId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onDeviceActivity(DeviceId deviceId, long lastReportedActivity) {
|
||||
public void onDeviceActivity(TenantId tenantId, DeviceId deviceId, long lastReportedActivity) {
|
||||
log.trace("on Device Activity [{}], lastReportedActivity [{}]", deviceId.getId(), lastReportedActivity);
|
||||
long lastSavedActivity = deviceLastSavedActivity.getOrDefault(deviceId, 0L);
|
||||
if (lastReportedActivity > 0 && lastReportedActivity > lastSavedActivity) {
|
||||
final DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
|
||||
if (lastReportedActivity > 0 && lastReportedActivity > stateData.getState().getLastActivityTime()) {
|
||||
updateActivityState(deviceId, stateData, lastReportedActivity);
|
||||
}
|
||||
cleanDeviceStateIfBelongsExternalPartition(tenantId, deviceId);
|
||||
}
|
||||
|
||||
void updateActivityState(DeviceId deviceId, DeviceStateData stateData, long lastReportedActivity) {
|
||||
log.trace("updateActivityState - fetched state {} for device {}, lastReportedActivity {}", stateData, deviceId, lastReportedActivity);
|
||||
if (stateData != null) {
|
||||
save(deviceId, LAST_ACTIVITY_TIME, lastReportedActivity);
|
||||
deviceLastSavedActivity.put(deviceId, lastReportedActivity);
|
||||
DeviceState state = stateData.getState();
|
||||
state.setLastActivityTime(lastReportedActivity);
|
||||
if (!state.isActive()) {
|
||||
@ -225,21 +224,23 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
|
||||
pushRuleEngineMessage(stateData, ACTIVITY_EVENT);
|
||||
}
|
||||
} else {
|
||||
log.warn("updateActivityState - fetched state IN NULL for device {}, lastReportedActivity {}", deviceId, lastReportedActivity);
|
||||
log.debug("updateActivityState - fetched state IN NULL for device {}, lastReportedActivity {}", deviceId, lastReportedActivity);
|
||||
cleanUpDeviceStateMap(deviceId);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onDeviceDisconnect(DeviceId deviceId) {
|
||||
public void onDeviceDisconnect(TenantId tenantId, DeviceId deviceId) {
|
||||
DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
|
||||
long ts = System.currentTimeMillis();
|
||||
stateData.getState().setLastDisconnectTime(ts);
|
||||
save(deviceId, LAST_DISCONNECT_TIME, ts);
|
||||
pushRuleEngineMessage(stateData, DISCONNECT_EVENT);
|
||||
cleanDeviceStateIfBelongsExternalPartition(tenantId, deviceId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onDeviceInactivityTimeoutUpdate(DeviceId deviceId, long inactivityTimeout) {
|
||||
public void onDeviceInactivityTimeoutUpdate(TenantId tenantId, DeviceId deviceId, long inactivityTimeout) {
|
||||
if (inactivityTimeout <= 0L) {
|
||||
return;
|
||||
}
|
||||
@ -247,6 +248,7 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
|
||||
DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
|
||||
stateData.getState().setInactivityTimeout(inactivityTimeout);
|
||||
checkAndUpdateState(deviceId, stateData);
|
||||
cleanDeviceStateIfBelongsExternalPartition(tenantId, deviceId);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -283,13 +285,11 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
|
||||
}, deviceStateExecutor);
|
||||
} else if (proto.getUpdated()) {
|
||||
DeviceStateData stateData = getOrFetchDeviceStateData(device.getId());
|
||||
if (stateData != null) {
|
||||
TbMsgMetaData md = new TbMsgMetaData();
|
||||
md.putValue("deviceName", device.getName());
|
||||
md.putValue("deviceType", device.getType());
|
||||
stateData.setMetaData(md);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
//Device was probably deleted while message was in queue;
|
||||
callback.onSuccess();
|
||||
@ -356,10 +356,7 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
|
||||
// We no longer manage current partition of devices;
|
||||
removedPartitions.forEach(partition -> {
|
||||
Set<DeviceId> devices = partitionedDevices.remove(partition);
|
||||
devices.forEach(deviceId -> {
|
||||
deviceStates.remove(deviceId);
|
||||
deviceLastSavedActivity.remove(deviceId);
|
||||
});
|
||||
devices.forEach(this::cleanUpDeviceStateMap);
|
||||
});
|
||||
|
||||
addedPartitions.forEach(tpi -> partitionedDevices.computeIfAbsent(tpi, key -> ConcurrentHashMap.newKeySet()));
|
||||
@ -463,11 +460,12 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
|
||||
|
||||
void updateInactivityStateIfExpired() {
|
||||
final long ts = System.currentTimeMillis();
|
||||
log.debug("Calculating state updates for {} devices", deviceStates.size());
|
||||
Set<DeviceId> deviceIds = new HashSet<>(deviceStates.keySet());
|
||||
partitionedDevices.forEach((tpi, deviceIds) -> {
|
||||
log.debug("Calculating state updates. tpi {} for {} devices", tpi.getFullTopicName(), deviceIds.size());
|
||||
for (DeviceId deviceId : deviceIds) {
|
||||
updateInactivityStateIfExpired(ts, deviceId);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void updateInactivityStateIfExpired(long ts, DeviceId deviceId) {
|
||||
@ -488,8 +486,7 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
|
||||
}
|
||||
} else {
|
||||
log.debug("[{}] Device that belongs to other server is detected and removed.", deviceId);
|
||||
deviceStates.remove(deviceId);
|
||||
deviceLastSavedActivity.remove(deviceId);
|
||||
cleanUpDeviceStateMap(deviceId);
|
||||
}
|
||||
}
|
||||
|
||||
@ -522,6 +519,15 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
|
||||
}
|
||||
}
|
||||
|
||||
private void cleanDeviceStateIfBelongsExternalPartition(TenantId tenantId, final DeviceId deviceId) {
|
||||
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId);
|
||||
if (!partitionedDevices.containsKey(tpi)) {
|
||||
cleanUpDeviceStateMap(deviceId);
|
||||
log.debug("[{}][{}] device belongs to external partition. Probably rebalancing is in progress. Topic: {}"
|
||||
, tenantId, deviceId, tpi.getFullTopicName());
|
||||
}
|
||||
}
|
||||
|
||||
private void sendDeviceEvent(TenantId tenantId, DeviceId deviceId, boolean added, boolean updated, boolean deleted) {
|
||||
TransportProtos.DeviceStateServiceMsgProto.Builder builder = TransportProtos.DeviceStateServiceMsgProto.newBuilder();
|
||||
builder.setTenantIdMSB(tenantId.getId().getMostSignificantBits());
|
||||
@ -536,13 +542,16 @@ public class DefaultDeviceStateService extends TbApplicationEventListener<Partit
|
||||
}
|
||||
|
||||
private void onDeviceDeleted(TenantId tenantId, DeviceId deviceId) {
|
||||
deviceStates.remove(deviceId);
|
||||
deviceLastSavedActivity.remove(deviceId);
|
||||
cleanUpDeviceStateMap(deviceId);
|
||||
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId);
|
||||
Set<DeviceId> deviceIdSet = partitionedDevices.get(tpi);
|
||||
deviceIdSet.remove(deviceId);
|
||||
}
|
||||
|
||||
private void cleanUpDeviceStateMap(DeviceId deviceId) {
|
||||
deviceStates.remove(deviceId);
|
||||
}
|
||||
|
||||
private ListenableFuture<DeviceStateData> fetchDeviceState(Device device) {
|
||||
ListenableFuture<DeviceStateData> future;
|
||||
if (persistToTelemetry) {
|
||||
|
||||
@ -18,6 +18,7 @@ package org.thingsboard.server.service.state;
|
||||
import org.springframework.context.ApplicationListener;
|
||||
import org.thingsboard.server.common.data.Device;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
|
||||
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||
import org.thingsboard.server.common.msg.queue.TbCallback;
|
||||
@ -33,13 +34,13 @@ public interface DeviceStateService extends ApplicationListener<PartitionChangeE
|
||||
|
||||
void onDeviceDeleted(Device device);
|
||||
|
||||
void onDeviceConnect(DeviceId deviceId);
|
||||
void onDeviceConnect(TenantId tenantId, DeviceId deviceId);
|
||||
|
||||
void onDeviceActivity(DeviceId deviceId, long lastReportedActivityTime);
|
||||
void onDeviceActivity(TenantId tenantId, DeviceId deviceId, long lastReportedActivityTime);
|
||||
|
||||
void onDeviceDisconnect(DeviceId deviceId);
|
||||
void onDeviceDisconnect(TenantId tenantId, DeviceId deviceId);
|
||||
|
||||
void onDeviceInactivityTimeoutUpdate(DeviceId deviceId, long inactivityTimeout);
|
||||
void onDeviceInactivityTimeoutUpdate(TenantId tenantId, DeviceId deviceId, long inactivityTimeout);
|
||||
|
||||
void onQueueMsg(TransportProtos.DeviceStateServiceMsgProto proto, TbCallback bytes);
|
||||
|
||||
|
||||
@ -224,7 +224,7 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene
|
||||
return subscriptionUpdate;
|
||||
});
|
||||
if (entityId.getEntityType() == EntityType.DEVICE) {
|
||||
updateDeviceInactivityTimeout(entityId, ts);
|
||||
updateDeviceInactivityTimeout(tenantId, entityId, ts);
|
||||
}
|
||||
callback.onSuccess();
|
||||
}
|
||||
@ -259,7 +259,7 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene
|
||||
});
|
||||
if (entityId.getEntityType() == EntityType.DEVICE) {
|
||||
if (TbAttributeSubscriptionScope.SERVER_SCOPE.name().equalsIgnoreCase(scope)) {
|
||||
updateDeviceInactivityTimeout(entityId, attributes);
|
||||
updateDeviceInactivityTimeout(tenantId, entityId, attributes);
|
||||
} else if (TbAttributeSubscriptionScope.SHARED_SCOPE.name().equalsIgnoreCase(scope) && notifyDevice) {
|
||||
clusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onUpdate(tenantId,
|
||||
new DeviceId(entityId.getId()), DataConstants.SHARED_SCOPE, new ArrayList<>(attributes))
|
||||
@ -269,10 +269,10 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene
|
||||
callback.onSuccess();
|
||||
}
|
||||
|
||||
private void updateDeviceInactivityTimeout(EntityId entityId, List<? extends KvEntry> kvEntries) {
|
||||
private void updateDeviceInactivityTimeout(TenantId tenantId, EntityId entityId, List<? extends KvEntry> kvEntries) {
|
||||
for (KvEntry kvEntry : kvEntries) {
|
||||
if (kvEntry.getKey().equals(DefaultDeviceStateService.INACTIVITY_TIMEOUT)) {
|
||||
deviceStateService.onDeviceInactivityTimeoutUpdate(new DeviceId(entityId.getId()), kvEntry.getLongValue().orElse(0L));
|
||||
deviceStateService.onDeviceInactivityTimeoutUpdate(tenantId, new DeviceId(entityId.getId()), kvEntry.getLongValue().orElse(0L));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user