Improvements to the DeviceStateService
This commit is contained in:
parent
c4b1f9ea7a
commit
dac7c5250f
@ -762,7 +762,7 @@ public abstract class BaseController {
|
|||||||
String scope = extractParameter(String.class, 0, additionalInfo);
|
String scope = extractParameter(String.class, 0, additionalInfo);
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
List<AttributeKvEntry> attributes = extractParameter(List.class, 1, additionalInfo);
|
List<AttributeKvEntry> attributes = extractParameter(List.class, 1, additionalInfo);
|
||||||
metaData.putValue("scope", scope);
|
metaData.putValue(DataConstants.SCOPE, scope);
|
||||||
if (attributes != null) {
|
if (attributes != null) {
|
||||||
for (AttributeKvEntry attr : attributes) {
|
for (AttributeKvEntry attr : attributes) {
|
||||||
addKvEntry(entityNode, attr);
|
addKvEntry(entityNode, attr);
|
||||||
@ -772,7 +772,7 @@ public abstract class BaseController {
|
|||||||
String scope = extractParameter(String.class, 0, additionalInfo);
|
String scope = extractParameter(String.class, 0, additionalInfo);
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
List<String> keys = extractParameter(List.class, 1, additionalInfo);
|
List<String> keys = extractParameter(List.class, 1, additionalInfo);
|
||||||
metaData.putValue("scope", scope);
|
metaData.putValue(DataConstants.SCOPE, scope);
|
||||||
ArrayNode attrsArrayNode = entityNode.putArray("attributes");
|
ArrayNode attrsArrayNode = entityNode.putArray("attributes");
|
||||||
if (keys != null) {
|
if (keys != null) {
|
||||||
keys.forEach(attrsArrayNode::add);
|
keys.forEach(attrsArrayNode::add);
|
||||||
|
|||||||
@ -206,8 +206,6 @@ public class DefaultDeviceStateService implements DeviceStateService {
|
|||||||
if (!state.isActive()) {
|
if (!state.isActive()) {
|
||||||
state.setActive(true);
|
state.setActive(true);
|
||||||
save(deviceId, ACTIVITY_STATE, state.isActive());
|
save(deviceId, ACTIVITY_STATE, state.isActive());
|
||||||
stateData.getMetaData().putValue("scope", SERVER_SCOPE);
|
|
||||||
stateData.getMetaData().putValue(DataConstants.PERSIST_STATE_TO_TELEMETRY, String.valueOf(persistToTelemetry));
|
|
||||||
pushRuleEngineMessage(stateData, ACTIVITY_EVENT);
|
pushRuleEngineMessage(stateData, ACTIVITY_EVENT);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -386,7 +384,6 @@ public class DefaultDeviceStateService implements DeviceStateService {
|
|||||||
state.setActive(ts < state.getLastActivityTime() + state.getInactivityTimeout());
|
state.setActive(ts < state.getLastActivityTime() + state.getInactivityTimeout());
|
||||||
if (!state.isActive() && (state.getLastInactivityAlarmTime() == 0L || state.getLastInactivityAlarmTime() < state.getLastActivityTime()) && stateData.getDeviceCreationTime() + state.getInactivityTimeout() < ts) {
|
if (!state.isActive() && (state.getLastInactivityAlarmTime() == 0L || state.getLastInactivityAlarmTime() < state.getLastActivityTime()) && stateData.getDeviceCreationTime() + state.getInactivityTimeout() < ts) {
|
||||||
state.setLastInactivityAlarmTime(ts);
|
state.setLastInactivityAlarmTime(ts);
|
||||||
stateData.getMetaData().putValue(DataConstants.PERSIST_STATE_TO_TELEMETRY, String.valueOf(persistToTelemetry));
|
|
||||||
pushRuleEngineMessage(stateData, INACTIVITY_EVENT);
|
pushRuleEngineMessage(stateData, INACTIVITY_EVENT);
|
||||||
save(deviceId, INACTIVITY_ALARM_TIME, ts);
|
save(deviceId, INACTIVITY_ALARM_TIME, ts);
|
||||||
save(deviceId, ACTIVITY_STATE, state.isActive());
|
save(deviceId, ACTIVITY_STATE, state.isActive());
|
||||||
@ -449,7 +446,7 @@ public class DefaultDeviceStateService implements DeviceStateService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private <T extends KvEntry> Function<List<T>, DeviceStateData> extractDeviceStateData(Device device) {
|
private <T extends KvEntry> Function<List<T>, DeviceStateData> extractDeviceStateData(Device device) {
|
||||||
return new Function<List<T>, DeviceStateData>() {
|
return new Function<>() {
|
||||||
@Nullable
|
@Nullable
|
||||||
@Override
|
@Override
|
||||||
public DeviceStateData apply(@Nullable List<T> data) {
|
public DeviceStateData apply(@Nullable List<T> data) {
|
||||||
@ -505,7 +502,11 @@ public class DefaultDeviceStateService implements DeviceStateService {
|
|||||||
} else {
|
} else {
|
||||||
data = JacksonUtil.toString(state);
|
data = JacksonUtil.toString(state);
|
||||||
}
|
}
|
||||||
TbMsg tbMsg = TbMsg.newMsg(msgType, stateData.getDeviceId(), stateData.getMetaData().copy(), TbMsgDataType.JSON, data);
|
TbMsgMetaData md = stateData.getMetaData().copy();
|
||||||
|
if(!persistToTelemetry){
|
||||||
|
md.putValue(DataConstants.SCOPE, SERVER_SCOPE);
|
||||||
|
}
|
||||||
|
TbMsg tbMsg = TbMsg.newMsg(msgType, stateData.getDeviceId(), md, TbMsgDataType.JSON, data);
|
||||||
clusterService.pushMsgToRuleEngine(stateData.getTenantId(), stateData.getDeviceId(), tbMsg, null);
|
clusterService.pushMsgToRuleEngine(stateData.getTenantId(), stateData.getDeviceId(), tbMsg, null);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.warn("[{}] Failed to push inactivity alarm: {}", stateData.getDeviceId(), state, e);
|
log.warn("[{}] Failed to push inactivity alarm: {}", stateData.getDeviceId(), state, e);
|
||||||
|
|||||||
@ -24,6 +24,7 @@ public class DataConstants {
|
|||||||
public static final String CUSTOMER = "CUSTOMER";
|
public static final String CUSTOMER = "CUSTOMER";
|
||||||
public static final String DEVICE = "DEVICE";
|
public static final String DEVICE = "DEVICE";
|
||||||
|
|
||||||
|
public static final String SCOPE = "scope";
|
||||||
public static final String CLIENT_SCOPE = "CLIENT_SCOPE";
|
public static final String CLIENT_SCOPE = "CLIENT_SCOPE";
|
||||||
public static final String SERVER_SCOPE = "SERVER_SCOPE";
|
public static final String SERVER_SCOPE = "SERVER_SCOPE";
|
||||||
public static final String SHARED_SCOPE = "SHARED_SCOPE";
|
public static final String SHARED_SCOPE = "SHARED_SCOPE";
|
||||||
@ -51,7 +52,6 @@ public class DataConstants {
|
|||||||
public static final String CONNECT_EVENT = "CONNECT_EVENT";
|
public static final String CONNECT_EVENT = "CONNECT_EVENT";
|
||||||
public static final String DISCONNECT_EVENT = "DISCONNECT_EVENT";
|
public static final String DISCONNECT_EVENT = "DISCONNECT_EVENT";
|
||||||
public static final String ACTIVITY_EVENT = "ACTIVITY_EVENT";
|
public static final String ACTIVITY_EVENT = "ACTIVITY_EVENT";
|
||||||
public static final String PERSIST_STATE_TO_TELEMETRY = "persistStateToTelemetry";
|
|
||||||
|
|
||||||
public static final String ENTITY_CREATED = "ENTITY_CREATED";
|
public static final String ENTITY_CREATED = "ENTITY_CREATED";
|
||||||
public static final String ENTITY_UPDATED = "ENTITY_UPDATED";
|
public static final String ENTITY_UPDATED = "ENTITY_UPDATED";
|
||||||
|
|||||||
@ -76,7 +76,7 @@ public class TbCopyAttributesToEntityViewNode implements TbNode {
|
|||||||
if (!msg.getMetaData().getData().isEmpty()) {
|
if (!msg.getMetaData().getData().isEmpty()) {
|
||||||
long now = System.currentTimeMillis();
|
long now = System.currentTimeMillis();
|
||||||
String scope = msg.getType().equals(SessionMsgType.POST_ATTRIBUTES_REQUEST.name()) ?
|
String scope = msg.getType().equals(SessionMsgType.POST_ATTRIBUTES_REQUEST.name()) ?
|
||||||
DataConstants.CLIENT_SCOPE : msg.getMetaData().getValue("scope");
|
DataConstants.CLIENT_SCOPE : msg.getMetaData().getValue(DataConstants.SCOPE);
|
||||||
|
|
||||||
ListenableFuture<List<EntityView>> entityViewsFuture =
|
ListenableFuture<List<EntityView>> entityViewsFuture =
|
||||||
ctx.getEntityViewService().findEntityViewsByTenantIdAndEntityIdAsync(ctx.getTenantId(), msg.getOriginator());
|
ctx.getEntityViewService().findEntityViewsByTenantIdAndEntityIdAsync(ctx.getTenantId(), msg.getOriginator());
|
||||||
|
|||||||
@ -161,11 +161,12 @@ class DeviceState {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private boolean processDeviceActivityEvent(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException {
|
private boolean processDeviceActivityEvent(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException {
|
||||||
String deviceStateIsPersistedToTelemetry = msg.getMetaData().getValue(DataConstants.PERSIST_STATE_TO_TELEMETRY);
|
String scope = msg.getMetaData().getValue(DataConstants.SCOPE);
|
||||||
if (Boolean.TRUE.toString().equals(deviceStateIsPersistedToTelemetry)) {
|
if (StringUtils.isEmpty(scope)) {
|
||||||
return processTelemetry(ctx, msg);
|
return processTelemetry(ctx, msg);
|
||||||
|
} else {
|
||||||
|
return processAttributes(ctx, msg, scope);
|
||||||
}
|
}
|
||||||
return processAttributes(ctx, msg, msg.getMetaData().getValue("scope"));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean processAlarmClearNotification(TbContext ctx, TbMsg msg) {
|
private boolean processAlarmClearNotification(TbContext ctx, TbMsg msg) {
|
||||||
@ -191,7 +192,7 @@ class DeviceState {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private boolean processAttributesUpdateNotification(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException {
|
private boolean processAttributesUpdateNotification(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException {
|
||||||
String scope = msg.getMetaData().getValue("scope");
|
String scope = msg.getMetaData().getValue(DataConstants.SCOPE);
|
||||||
if (StringUtils.isEmpty(scope)) {
|
if (StringUtils.isEmpty(scope)) {
|
||||||
scope = DataConstants.CLIENT_SCOPE;
|
scope = DataConstants.CLIENT_SCOPE;
|
||||||
}
|
}
|
||||||
@ -202,7 +203,7 @@ class DeviceState {
|
|||||||
boolean stateChanged = false;
|
boolean stateChanged = false;
|
||||||
List<String> keys = new ArrayList<>();
|
List<String> keys = new ArrayList<>();
|
||||||
new JsonParser().parse(msg.getData()).getAsJsonObject().get("attributes").getAsJsonArray().forEach(e -> keys.add(e.getAsString()));
|
new JsonParser().parse(msg.getData()).getAsJsonObject().get("attributes").getAsJsonArray().forEach(e -> keys.add(e.getAsString()));
|
||||||
String scope = msg.getMetaData().getValue("scope");
|
String scope = msg.getMetaData().getValue(DataConstants.SCOPE);
|
||||||
if (StringUtils.isEmpty(scope)) {
|
if (StringUtils.isEmpty(scope)) {
|
||||||
scope = DataConstants.CLIENT_SCOPE;
|
scope = DataConstants.CLIENT_SCOPE;
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user