Merge branch 'develop/3.2' of https://github.com/thingsboard/thingsboard into feature/device-provision-3.2-onlyProfileVersion
This commit is contained in:
commit
e86c07599c
@ -282,6 +282,8 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
private <T> TransportServiceCallback<Void> getPubAckCallback(final ChannelHandlerContext ctx, final int msgId, final T msg) {
|
||||
return new TransportServiceCallback<Void>() {
|
||||
@Override
|
||||
|
||||
@ -34,7 +34,7 @@ public class JacksonUtil {
|
||||
return fromValue != null ? OBJECT_MAPPER.convertValue(fromValue, toValueType) : null;
|
||||
} catch (IllegalArgumentException e) {
|
||||
throw new IllegalArgumentException("The given object value: "
|
||||
+ fromValue + " cannot be converted to " + toValueType);
|
||||
+ fromValue + " cannot be converted to " + toValueType, e);
|
||||
}
|
||||
}
|
||||
|
||||
@ -43,7 +43,7 @@ public class JacksonUtil {
|
||||
return string != null ? OBJECT_MAPPER.readValue(string, clazz) : null;
|
||||
} catch (IOException e) {
|
||||
throw new IllegalArgumentException("The given string value: "
|
||||
+ string + " cannot be transformed to Json object");
|
||||
+ string + " cannot be transformed to Json object", e);
|
||||
}
|
||||
}
|
||||
|
||||
@ -52,7 +52,7 @@ public class JacksonUtil {
|
||||
return value != null ? OBJECT_MAPPER.writeValueAsString(value) : null;
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new IllegalArgumentException("The given Json object value: "
|
||||
+ value + " cannot be transformed to a String");
|
||||
+ value + " cannot be transformed to a String", e);
|
||||
}
|
||||
}
|
||||
|
||||
@ -71,7 +71,7 @@ public class JacksonUtil {
|
||||
return fromString(toString(value), (Class<T>) value.getClass());
|
||||
}
|
||||
|
||||
public static JsonNode valueToTree(Alarm alarm) {
|
||||
public static <T> JsonNode valueToTree(T alarm) {
|
||||
return OBJECT_MAPPER.valueToTree(alarm);
|
||||
}
|
||||
}
|
||||
|
||||
@ -15,6 +15,6 @@
|
||||
--
|
||||
|
||||
DROP TYPE json IF EXISTS;
|
||||
CREATE TYPE json AS text;
|
||||
CREATE TYPE json AS varchar;
|
||||
DROP TYPE jsonb IF EXISTS;
|
||||
CREATE TYPE jsonb AS other;
|
||||
|
||||
@ -47,16 +47,14 @@ import java.util.concurrent.ExecutionException;
|
||||
class DeviceProfileAlarmState {
|
||||
|
||||
private final EntityId originator;
|
||||
private final DeviceProfileAlarm alarmDefinition;
|
||||
private DeviceProfileAlarm alarmDefinition;
|
||||
private volatile Map<AlarmSeverity, AlarmRule> createRulesSortedBySeverityDesc;
|
||||
private volatile Alarm currentAlarm;
|
||||
private volatile boolean initialFetchDone;
|
||||
|
||||
public DeviceProfileAlarmState(EntityId originator, DeviceProfileAlarm alarmDefinition) {
|
||||
this.originator = originator;
|
||||
this.alarmDefinition = alarmDefinition;
|
||||
this.createRulesSortedBySeverityDesc = new TreeMap<>(Comparator.comparingInt(AlarmSeverity::ordinal));
|
||||
this.createRulesSortedBySeverityDesc.putAll(alarmDefinition.getCreateRules());
|
||||
this.updateState(alarmDefinition);
|
||||
}
|
||||
|
||||
public void process(TbContext ctx, TbMsg msg, DeviceDataSnapshot data) throws ExecutionException, InterruptedException {
|
||||
@ -111,6 +109,12 @@ class DeviceProfileAlarmState {
|
||||
ctx.tellNext(newMsg, relationType);
|
||||
}
|
||||
|
||||
public void updateState(DeviceProfileAlarm alarm) {
|
||||
this.alarmDefinition = alarm;
|
||||
this.createRulesSortedBySeverityDesc = new TreeMap<>(Comparator.comparingInt(AlarmSeverity::ordinal));
|
||||
this.createRulesSortedBySeverityDesc.putAll(alarmDefinition.getCreateRules());
|
||||
}
|
||||
|
||||
private TbAlarmResult calculateAlarmResult(TbContext ctx, AlarmSeverity severity) {
|
||||
if (currentAlarm != null) {
|
||||
currentAlarm.setEndTs(System.currentTimeMillis());
|
||||
|
||||
@ -20,6 +20,7 @@ import lombok.Getter;
|
||||
import org.thingsboard.server.common.data.DeviceProfile;
|
||||
import org.thingsboard.server.common.data.device.profile.AlarmRule;
|
||||
import org.thingsboard.server.common.data.device.profile.DeviceProfileAlarm;
|
||||
import org.thingsboard.server.common.data.id.DeviceProfileId;
|
||||
import org.thingsboard.server.common.data.query.EntityKey;
|
||||
import org.thingsboard.server.common.data.query.KeyFilter;
|
||||
|
||||
@ -55,4 +56,8 @@ class DeviceProfileState {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public DeviceProfileId getProfileId() {
|
||||
return deviceProfile.getId();
|
||||
}
|
||||
}
|
||||
|
||||
@ -20,8 +20,10 @@ import org.thingsboard.rule.engine.api.TbContext;
|
||||
import org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode;
|
||||
import org.thingsboard.server.common.data.DataConstants;
|
||||
import org.thingsboard.server.common.data.Device;
|
||||
import org.thingsboard.server.common.data.DeviceProfile;
|
||||
import org.thingsboard.server.common.data.device.profile.DeviceProfileAlarm;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.DeviceProfileId;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
|
||||
import org.thingsboard.server.common.data.kv.KvEntry;
|
||||
@ -40,20 +42,44 @@ import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
class DeviceState {
|
||||
|
||||
private final DeviceId deviceId;
|
||||
private DeviceProfileState deviceProfile;
|
||||
private DeviceDataSnapshot latestValues;
|
||||
private final ConcurrentMap<String, DeviceProfileAlarmState> alarmStates = new ConcurrentHashMap<>();
|
||||
|
||||
public DeviceState(DeviceProfileState deviceProfile) {
|
||||
public DeviceState(DeviceId deviceId, DeviceProfileState deviceProfile) {
|
||||
this.deviceId = deviceId;
|
||||
this.deviceProfile = deviceProfile;
|
||||
}
|
||||
|
||||
public void updateProfile(TbContext ctx, DeviceProfile deviceProfile) throws ExecutionException, InterruptedException {
|
||||
Set<EntityKey> oldKeys = this.deviceProfile.getEntityKeys();
|
||||
this.deviceProfile.updateDeviceProfile(deviceProfile);
|
||||
if (latestValues != null) {
|
||||
Set<EntityKey> keysToFetch = new HashSet<>(this.deviceProfile.getEntityKeys());
|
||||
keysToFetch.removeAll(oldKeys);
|
||||
if (!keysToFetch.isEmpty()) {
|
||||
addEntityKeysToSnapshot(ctx, deviceId, keysToFetch, latestValues);
|
||||
}
|
||||
}
|
||||
Set<String> newAlarmStateIds = this.deviceProfile.getAlarmSettings().stream().map(DeviceProfileAlarm::getId).collect(Collectors.toSet());
|
||||
alarmStates.keySet().removeIf(id -> !newAlarmStateIds.contains(id));
|
||||
for (DeviceProfileAlarm alarm : this.deviceProfile.getAlarmSettings()) {
|
||||
if (alarmStates.containsKey(alarm.getId())) {
|
||||
alarmStates.get(alarm.getId()).updateState(alarm);
|
||||
} else {
|
||||
alarmStates.putIfAbsent(alarm.getId(), new DeviceProfileAlarmState(deviceId, alarm));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void process(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException {
|
||||
if (latestValues == null) {
|
||||
latestValues = fetchLatestValues(ctx, msg.getOriginator());
|
||||
latestValues = fetchLatestValues(ctx, deviceId);
|
||||
}
|
||||
if (msg.getType().equals(SessionMsgType.POST_TELEMETRY_REQUEST.name())) {
|
||||
processTelemetry(ctx, msg);
|
||||
@ -69,7 +95,7 @@ class DeviceState {
|
||||
List<KvEntry> data = entry.getValue();
|
||||
latestValues = merge(latestValues, ts, data);
|
||||
for (DeviceProfileAlarm alarm : deviceProfile.getAlarmSettings()) {
|
||||
DeviceProfileAlarmState alarmState = alarmStates.computeIfAbsent(alarm.getId(), a -> new DeviceProfileAlarmState(msg.getOriginator(), alarm));
|
||||
DeviceProfileAlarmState alarmState = alarmStates.computeIfAbsent(alarm.getId(), a -> new DeviceProfileAlarmState(deviceId, alarm));
|
||||
alarmState.process(ctx, msg, latestValues);
|
||||
}
|
||||
}
|
||||
@ -85,8 +111,13 @@ class DeviceState {
|
||||
}
|
||||
|
||||
private DeviceDataSnapshot fetchLatestValues(TbContext ctx, EntityId originator) throws ExecutionException, InterruptedException {
|
||||
DeviceDataSnapshot result = new DeviceDataSnapshot(deviceProfile.getEntityKeys());
|
||||
Set<EntityKey> entityKeysToFetch = deviceProfile.getEntityKeys();
|
||||
DeviceDataSnapshot result = new DeviceDataSnapshot(entityKeysToFetch);
|
||||
addEntityKeysToSnapshot(ctx, originator, entityKeysToFetch, result);
|
||||
return result;
|
||||
}
|
||||
|
||||
private void addEntityKeysToSnapshot(TbContext ctx, EntityId originator, Set<EntityKey> entityKeysToFetch, DeviceDataSnapshot result) throws InterruptedException, ExecutionException {
|
||||
Set<String> serverAttributeKeys = new HashSet<>();
|
||||
Set<String> clientAttributeKeys = new HashSet<>();
|
||||
Set<String> sharedAttributeKeys = new HashSet<>();
|
||||
@ -94,7 +125,7 @@ class DeviceState {
|
||||
Set<String> latestTsKeys = new HashSet<>();
|
||||
|
||||
Device device = null;
|
||||
for (EntityKey entityKey : deviceProfile.getEntityKeys()) {
|
||||
for (EntityKey entityKey : entityKeysToFetch) {
|
||||
String key = entityKey.getKey();
|
||||
switch (entityKey.getType()) {
|
||||
case SERVER_ATTRIBUTE:
|
||||
@ -159,8 +190,6 @@ class DeviceState {
|
||||
addToSnapshot(result, commonAttributeKeys,
|
||||
ctx.getAttributesService().find(ctx.getTenantId(), originator, DataConstants.SERVER_SCOPE, serverAttributeKeys).get());
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
private void addToSnapshot(DeviceDataSnapshot snapshot, Set<String> commonAttributeKeys, List<AttributeKvEntry> data) {
|
||||
@ -192,4 +221,8 @@ class DeviceState {
|
||||
}
|
||||
}
|
||||
|
||||
public DeviceProfileId getProfileId() {
|
||||
return deviceProfile.getProfileId();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -16,15 +16,6 @@
|
||||
package org.thingsboard.rule.engine.profile;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.BooleanUtils;
|
||||
import org.apache.kafka.clients.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.Producer;
|
||||
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.apache.kafka.clients.producer.RecordMetadata;
|
||||
import org.apache.kafka.common.header.Headers;
|
||||
import org.apache.kafka.common.header.internals.RecordHeader;
|
||||
import org.apache.kafka.common.header.internals.RecordHeaders;
|
||||
import org.thingsboard.rule.engine.api.EmptyNodeConfiguration;
|
||||
import org.thingsboard.rule.engine.api.RuleEngineDeviceProfileCache;
|
||||
import org.thingsboard.rule.engine.api.RuleNode;
|
||||
@ -32,22 +23,14 @@ import org.thingsboard.rule.engine.api.TbContext;
|
||||
import org.thingsboard.rule.engine.api.TbNode;
|
||||
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
|
||||
import org.thingsboard.rule.engine.api.TbNodeException;
|
||||
import org.thingsboard.rule.engine.api.TbRelationTypes;
|
||||
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
|
||||
import org.thingsboard.rule.engine.kafka.TbKafkaNodeConfiguration;
|
||||
import org.thingsboard.server.common.data.DeviceProfile;
|
||||
import org.thingsboard.server.common.data.EntityType;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.plugin.ComponentType;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
||||
import org.thingsboard.server.dao.util.mapping.JacksonUtil;
|
||||
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
@ -76,7 +59,6 @@ public class TbDeviceProfileNode implements TbNode {
|
||||
/**
|
||||
* TODO:
|
||||
* 1. Duration in the alarm conditions;
|
||||
* 2. Update of the Profile (rules);
|
||||
* 3. Update of the Device attributes (client, server and shared);
|
||||
* 4. Dynamic values evaluation;
|
||||
*/
|
||||
@ -86,26 +68,37 @@ public class TbDeviceProfileNode implements TbNode {
|
||||
EntityType originatorType = msg.getOriginator().getEntityType();
|
||||
if (EntityType.DEVICE.equals(originatorType)) {
|
||||
DeviceId deviceId = new DeviceId(msg.getOriginator().getId());
|
||||
DeviceState deviceState = getOrCreateDeviceState(ctx, msg, deviceId);
|
||||
if (deviceState != null) {
|
||||
deviceState.process(ctx, msg);
|
||||
if (msg.getType().equals("ENTITY_UPDATED")) {
|
||||
//TODO: handle if device profile id has changed.
|
||||
} else {
|
||||
ctx.tellFailure(msg, new IllegalStateException("Device profile for device [" + deviceId + "] not found!"));
|
||||
DeviceState deviceState = getOrCreateDeviceState(ctx, deviceId);
|
||||
if (deviceState != null) {
|
||||
deviceState.process(ctx, msg);
|
||||
} else {
|
||||
ctx.tellFailure(msg, new IllegalStateException("Device profile for device [" + deviceId + "] not found!"));
|
||||
}
|
||||
}
|
||||
} else if (EntityType.DEVICE_PROFILE.equals(originatorType)) {
|
||||
//TODO: check that the profile rule set was changed. If yes - invalidate the rules.
|
||||
if (msg.getType().equals("ENTITY_UPDATED")) {
|
||||
DeviceProfile deviceProfile = JacksonUtil.fromString(msg.getData(), DeviceProfile.class);
|
||||
for (DeviceState state : deviceStates.values()) {
|
||||
if (deviceProfile.getId().equals(state.getProfileId())) {
|
||||
state.updateProfile(ctx, deviceProfile);
|
||||
}
|
||||
}
|
||||
}
|
||||
ctx.tellSuccess(msg);
|
||||
} else {
|
||||
ctx.tellSuccess(msg);
|
||||
}
|
||||
}
|
||||
|
||||
private DeviceState getOrCreateDeviceState(TbContext ctx, TbMsg msg, DeviceId deviceId) {
|
||||
private DeviceState getOrCreateDeviceState(TbContext ctx, DeviceId deviceId) {
|
||||
DeviceState deviceState = deviceStates.get(deviceId);
|
||||
if (deviceState == null) {
|
||||
DeviceProfile deviceProfile = cache.get(ctx.getTenantId(), deviceId);
|
||||
if (deviceProfile != null) {
|
||||
deviceState = new DeviceState(new DeviceProfileState(deviceProfile));
|
||||
deviceState = new DeviceState(deviceId, new DeviceProfileState(deviceProfile));
|
||||
deviceStates.put(deviceId, deviceState);
|
||||
}
|
||||
}
|
||||
|
||||
@ -127,8 +127,8 @@ export class DeviceProfileComponent extends EntityComponent<DeviceProfile> {
|
||||
|
||||
updateForm(entity: DeviceProfile) {
|
||||
this.entityForm.patchValue({name: entity.name});
|
||||
this.entityForm.patchValue({type: entity.type});
|
||||
this.entityForm.patchValue({transportType: entity.transportType});
|
||||
this.entityForm.patchValue({type: entity.type}, {emitEvent: false});
|
||||
this.entityForm.patchValue({transportType: entity.transportType}, {emitEvent: false});
|
||||
this.entityForm.patchValue({profileData: entity.profileData});
|
||||
this.entityForm.patchValue({defaultRuleChainId: entity.defaultRuleChainId ? entity.defaultRuleChainId.id : null});
|
||||
this.entityForm.patchValue({description: entity.description});
|
||||
|
||||
@ -17,11 +17,11 @@
|
||||
-->
|
||||
<mat-form-field [formGroup]="selectRuleChainFormGroup" class="mat-block">
|
||||
<input matInput type="text" placeholder="{{ ruleChainLabel | translate }}"
|
||||
#entityInput
|
||||
#ruleChainInput
|
||||
formControlName="ruleChainId"
|
||||
(focusin)="onFocus()"
|
||||
[required]="required"
|
||||
[matAutocomplete]="entityAutocomplete">
|
||||
[matAutocomplete]="ruleChainAutocomplete">
|
||||
<button *ngIf="selectRuleChainFormGroup.get('ruleChainId').value && !disabled"
|
||||
type="button"
|
||||
matSuffix mat-button mat-icon-button aria-label="Clear"
|
||||
@ -29,10 +29,10 @@
|
||||
<mat-icon class="material-icons">close</mat-icon>
|
||||
</button>
|
||||
<mat-autocomplete class="tb-autocomplete"
|
||||
#entityAutocomplete="matAutocomplete"
|
||||
[displayWith]="displayEntityFn">
|
||||
<mat-option *ngFor="let entity of filteredRuleChains | async" [value]="entity">
|
||||
<span [innerHTML]="entity.name | highlight:searchText"></span>
|
||||
#ruleChainAutocomplete="matAutocomplete"
|
||||
[displayWith]="displayRuleChainFn">
|
||||
<mat-option *ngFor="let ruleChain of filteredRuleChains | async" [value]="ruleChain">
|
||||
<span [innerHTML]="ruleChain.name | highlight:searchText"></span>
|
||||
</mat-option>
|
||||
<mat-option *ngIf="!(filteredRuleChains | async)?.length" [value]="null" class="tb-not-found">
|
||||
<div class="tb-not-found-content" (click)="$event.stopPropagation()">
|
||||
|
||||
@ -66,7 +66,8 @@ export class RuleChainAutocompleteComponent implements ControlValueAccessor, OnI
|
||||
@Input()
|
||||
disabled: boolean;
|
||||
|
||||
@ViewChild('entityInput', {static: true}) entityInput: ElementRef;
|
||||
@ViewChild('ruleChainInput', {static: true}) ruleChainInput: ElementRef;
|
||||
@ViewChild('ruleChainInput', {read: MatAutocompleteTrigger}) ruleChainAutocomplete: MatAutocompleteTrigger;
|
||||
|
||||
filteredRuleChains: Observable<Array<BaseData<EntityId>>>;
|
||||
|
||||
@ -117,9 +118,9 @@ export class RuleChainAutocompleteComponent implements ControlValueAccessor, OnI
|
||||
ngAfterViewInit(): void {}
|
||||
|
||||
getCurrentEntity(): BaseData<EntityId> | null {
|
||||
const currentEntity = this.selectRuleChainFormGroup.get('ruleChainId').value;
|
||||
if (currentEntity && typeof currentEntity !== 'string') {
|
||||
return currentEntity as BaseData<EntityId>;
|
||||
const currentRuleChain = this.selectRuleChainFormGroup.get('ruleChainId').value;
|
||||
if (currentRuleChain && typeof currentRuleChain !== 'string') {
|
||||
return currentRuleChain as BaseData<EntityId>;
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
@ -180,8 +181,8 @@ export class RuleChainAutocompleteComponent implements ControlValueAccessor, OnI
|
||||
}
|
||||
}
|
||||
|
||||
displayEntityFn(entity?: BaseData<EntityId>): string | undefined {
|
||||
return entity ? entity.name : undefined;
|
||||
displayRuleChainFn(ruleChain?: BaseData<EntityId>): string | undefined {
|
||||
return ruleChain ? ruleChain.name : undefined;
|
||||
}
|
||||
|
||||
fetchRuleChain(searchText?: string): Observable<Array<BaseData<EntityId>>> {
|
||||
@ -193,12 +194,14 @@ export class RuleChainAutocompleteComponent implements ControlValueAccessor, OnI
|
||||
clear() {
|
||||
this.selectRuleChainFormGroup.get('ruleChainId').patchValue('', {emitEvent: true});
|
||||
setTimeout(() => {
|
||||
this.entityInput.nativeElement.blur();
|
||||
this.entityInput.nativeElement.focus();
|
||||
this.ruleChainInput.nativeElement.blur();
|
||||
this.ruleChainInput.nativeElement.focus();
|
||||
}, 0);
|
||||
}
|
||||
|
||||
createDefaultRuleChain($event: Event, ruleChainName: string) {
|
||||
$event.preventDefault();
|
||||
this.ruleChainAutocomplete.closePanel();
|
||||
this.ruleChainService.createDefaultRuleChain(ruleChainName).subscribe((ruleChain) => {
|
||||
this.updateView(ruleChain.id.id);
|
||||
});
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user