diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index 866a4f7627..af4bc4d0d7 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -243,6 +243,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } } + + private TransportServiceCallback getPubAckCallback(final ChannelHandlerContext ctx, final int msgId, final T msg) { return new TransportServiceCallback() { @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/util/mapping/JacksonUtil.java b/dao/src/main/java/org/thingsboard/server/dao/util/mapping/JacksonUtil.java index d17fbe1e83..6b73b8c6b2 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/util/mapping/JacksonUtil.java +++ b/dao/src/main/java/org/thingsboard/server/dao/util/mapping/JacksonUtil.java @@ -34,7 +34,7 @@ public class JacksonUtil { return fromValue != null ? OBJECT_MAPPER.convertValue(fromValue, toValueType) : null; } catch (IllegalArgumentException e) { throw new IllegalArgumentException("The given object value: " - + fromValue + " cannot be converted to " + toValueType); + + fromValue + " cannot be converted to " + toValueType, e); } } @@ -43,7 +43,7 @@ public class JacksonUtil { return string != null ? OBJECT_MAPPER.readValue(string, clazz) : null; } catch (IOException e) { throw new IllegalArgumentException("The given string value: " - + string + " cannot be transformed to Json object"); + + string + " cannot be transformed to Json object", e); } } @@ -52,7 +52,7 @@ public class JacksonUtil { return value != null ? OBJECT_MAPPER.writeValueAsString(value) : null; } catch (JsonProcessingException e) { throw new IllegalArgumentException("The given Json object value: " - + value + " cannot be transformed to a String"); + + value + " cannot be transformed to a String", e); } } @@ -71,7 +71,7 @@ public class JacksonUtil { return fromString(toString(value), (Class) value.getClass()); } - public static JsonNode valueToTree(Alarm alarm) { + public static JsonNode valueToTree(T alarm) { return OBJECT_MAPPER.valueToTree(alarm); } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceProfileAlarmState.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceProfileAlarmState.java index cdc44eaf3f..83bfd2b3d3 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceProfileAlarmState.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceProfileAlarmState.java @@ -47,16 +47,14 @@ import java.util.concurrent.ExecutionException; class DeviceProfileAlarmState { private final EntityId originator; - private final DeviceProfileAlarm alarmDefinition; + private DeviceProfileAlarm alarmDefinition; private volatile Map createRulesSortedBySeverityDesc; private volatile Alarm currentAlarm; private volatile boolean initialFetchDone; public DeviceProfileAlarmState(EntityId originator, DeviceProfileAlarm alarmDefinition) { this.originator = originator; - this.alarmDefinition = alarmDefinition; - this.createRulesSortedBySeverityDesc = new TreeMap<>(Comparator.comparingInt(AlarmSeverity::ordinal)); - this.createRulesSortedBySeverityDesc.putAll(alarmDefinition.getCreateRules()); + this.updateState(alarmDefinition); } public void process(TbContext ctx, TbMsg msg, DeviceDataSnapshot data) throws ExecutionException, InterruptedException { @@ -111,6 +109,12 @@ class DeviceProfileAlarmState { ctx.tellNext(newMsg, relationType); } + public void updateState(DeviceProfileAlarm alarm) { + this.alarmDefinition = alarm; + this.createRulesSortedBySeverityDesc = new TreeMap<>(Comparator.comparingInt(AlarmSeverity::ordinal)); + this.createRulesSortedBySeverityDesc.putAll(alarmDefinition.getCreateRules()); + } + private TbAlarmResult calculateAlarmResult(TbContext ctx, AlarmSeverity severity) { if (currentAlarm != null) { currentAlarm.setEndTs(System.currentTimeMillis()); diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceProfileState.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceProfileState.java index 8a23091c2b..fd9037624e 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceProfileState.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceProfileState.java @@ -20,6 +20,7 @@ import lombok.Getter; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.device.profile.AlarmRule; import org.thingsboard.server.common.data.device.profile.DeviceProfileAlarm; +import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.query.EntityKey; import org.thingsboard.server.common.data.query.KeyFilter; @@ -55,4 +56,8 @@ class DeviceProfileState { } } } + + public DeviceProfileId getProfileId() { + return deviceProfile.getId(); + } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceState.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceState.java index 3ffe140928..3a9c08f1e8 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceState.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceState.java @@ -20,8 +20,10 @@ import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.device.profile.DeviceProfileAlarm; import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.KvEntry; @@ -40,20 +42,44 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; class DeviceState { + private final DeviceId deviceId; private DeviceProfileState deviceProfile; private DeviceDataSnapshot latestValues; private final ConcurrentMap alarmStates = new ConcurrentHashMap<>(); - public DeviceState(DeviceProfileState deviceProfile) { + public DeviceState(DeviceId deviceId, DeviceProfileState deviceProfile) { + this.deviceId = deviceId; this.deviceProfile = deviceProfile; } + public void updateProfile(TbContext ctx, DeviceProfile deviceProfile) throws ExecutionException, InterruptedException { + Set oldKeys = this.deviceProfile.getEntityKeys(); + this.deviceProfile.updateDeviceProfile(deviceProfile); + if (latestValues != null) { + Set keysToFetch = new HashSet<>(this.deviceProfile.getEntityKeys()); + keysToFetch.removeAll(oldKeys); + if (!keysToFetch.isEmpty()) { + addEntityKeysToSnapshot(ctx, deviceId, keysToFetch, latestValues); + } + } + Set newAlarmStateIds = this.deviceProfile.getAlarmSettings().stream().map(DeviceProfileAlarm::getId).collect(Collectors.toSet()); + alarmStates.keySet().removeIf(id -> !newAlarmStateIds.contains(id)); + for (DeviceProfileAlarm alarm : this.deviceProfile.getAlarmSettings()) { + if (alarmStates.containsKey(alarm.getId())) { + alarmStates.get(alarm.getId()).updateState(alarm); + } else { + alarmStates.putIfAbsent(alarm.getId(), new DeviceProfileAlarmState(deviceId, alarm)); + } + } + } + public void process(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException { if (latestValues == null) { - latestValues = fetchLatestValues(ctx, msg.getOriginator()); + latestValues = fetchLatestValues(ctx, deviceId); } if (msg.getType().equals(SessionMsgType.POST_TELEMETRY_REQUEST.name())) { processTelemetry(ctx, msg); @@ -69,7 +95,7 @@ class DeviceState { List data = entry.getValue(); latestValues = merge(latestValues, ts, data); for (DeviceProfileAlarm alarm : deviceProfile.getAlarmSettings()) { - DeviceProfileAlarmState alarmState = alarmStates.computeIfAbsent(alarm.getId(), a -> new DeviceProfileAlarmState(msg.getOriginator(), alarm)); + DeviceProfileAlarmState alarmState = alarmStates.computeIfAbsent(alarm.getId(), a -> new DeviceProfileAlarmState(deviceId, alarm)); alarmState.process(ctx, msg, latestValues); } } @@ -85,8 +111,13 @@ class DeviceState { } private DeviceDataSnapshot fetchLatestValues(TbContext ctx, EntityId originator) throws ExecutionException, InterruptedException { - DeviceDataSnapshot result = new DeviceDataSnapshot(deviceProfile.getEntityKeys()); + Set entityKeysToFetch = deviceProfile.getEntityKeys(); + DeviceDataSnapshot result = new DeviceDataSnapshot(entityKeysToFetch); + addEntityKeysToSnapshot(ctx, originator, entityKeysToFetch, result); + return result; + } + private void addEntityKeysToSnapshot(TbContext ctx, EntityId originator, Set entityKeysToFetch, DeviceDataSnapshot result) throws InterruptedException, ExecutionException { Set serverAttributeKeys = new HashSet<>(); Set clientAttributeKeys = new HashSet<>(); Set sharedAttributeKeys = new HashSet<>(); @@ -94,7 +125,7 @@ class DeviceState { Set latestTsKeys = new HashSet<>(); Device device = null; - for (EntityKey entityKey : deviceProfile.getEntityKeys()) { + for (EntityKey entityKey : entityKeysToFetch) { String key = entityKey.getKey(); switch (entityKey.getType()) { case SERVER_ATTRIBUTE: @@ -159,8 +190,6 @@ class DeviceState { addToSnapshot(result, commonAttributeKeys, ctx.getAttributesService().find(ctx.getTenantId(), originator, DataConstants.SERVER_SCOPE, serverAttributeKeys).get()); } - - return result; } private void addToSnapshot(DeviceDataSnapshot snapshot, Set commonAttributeKeys, List data) { @@ -192,4 +221,8 @@ class DeviceState { } } + public DeviceProfileId getProfileId() { + return deviceProfile.getProfileId(); + } + } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNode.java index 52b6d2aabd..4d0a714f25 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNode.java @@ -16,15 +16,6 @@ package org.thingsboard.rule.engine.profile; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.BooleanUtils; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.header.Headers; -import org.apache.kafka.common.header.internals.RecordHeader; -import org.apache.kafka.common.header.internals.RecordHeaders; import org.thingsboard.rule.engine.api.EmptyNodeConfiguration; import org.thingsboard.rule.engine.api.RuleEngineDeviceProfileCache; import org.thingsboard.rule.engine.api.RuleNode; @@ -32,22 +23,14 @@ import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; -import org.thingsboard.rule.engine.api.TbRelationTypes; -import org.thingsboard.rule.engine.api.util.TbNodeUtils; -import org.thingsboard.rule.engine.kafka.TbKafkaNodeConfiguration; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.id.DeviceId; -import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.msg.TbMsg; -import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.dao.util.mapping.JacksonUtil; -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; -import java.util.HashMap; import java.util.Map; -import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; @@ -76,7 +59,6 @@ public class TbDeviceProfileNode implements TbNode { /** * TODO: * 1. Duration in the alarm conditions; - * 2. Update of the Profile (rules); * 3. Update of the Device attributes (client, server and shared); * 4. Dynamic values evaluation; */ @@ -86,26 +68,37 @@ public class TbDeviceProfileNode implements TbNode { EntityType originatorType = msg.getOriginator().getEntityType(); if (EntityType.DEVICE.equals(originatorType)) { DeviceId deviceId = new DeviceId(msg.getOriginator().getId()); - DeviceState deviceState = getOrCreateDeviceState(ctx, msg, deviceId); - if (deviceState != null) { - deviceState.process(ctx, msg); + if (msg.getType().equals("ENTITY_UPDATED")) { + //TODO: handle if device profile id has changed. } else { - ctx.tellFailure(msg, new IllegalStateException("Device profile for device [" + deviceId + "] not found!")); + DeviceState deviceState = getOrCreateDeviceState(ctx, deviceId); + if (deviceState != null) { + deviceState.process(ctx, msg); + } else { + ctx.tellFailure(msg, new IllegalStateException("Device profile for device [" + deviceId + "] not found!")); + } } } else if (EntityType.DEVICE_PROFILE.equals(originatorType)) { - //TODO: check that the profile rule set was changed. If yes - invalidate the rules. + if (msg.getType().equals("ENTITY_UPDATED")) { + DeviceProfile deviceProfile = JacksonUtil.fromString(msg.getData(), DeviceProfile.class); + for (DeviceState state : deviceStates.values()) { + if (deviceProfile.getId().equals(state.getProfileId())) { + state.updateProfile(ctx, deviceProfile); + } + } + } ctx.tellSuccess(msg); } else { ctx.tellSuccess(msg); } } - private DeviceState getOrCreateDeviceState(TbContext ctx, TbMsg msg, DeviceId deviceId) { + private DeviceState getOrCreateDeviceState(TbContext ctx, DeviceId deviceId) { DeviceState deviceState = deviceStates.get(deviceId); if (deviceState == null) { DeviceProfile deviceProfile = cache.get(ctx.getTenantId(), deviceId); if (deviceProfile != null) { - deviceState = new DeviceState(new DeviceProfileState(deviceProfile)); + deviceState = new DeviceState(deviceId, new DeviceProfileState(deviceProfile)); deviceStates.put(deviceId, deviceState); } }