addede current customer and current tenant dynamic source types to DeviceProfile key filter
This commit is contained in:
parent
86abe35835
commit
e1be593e04
@ -55,8 +55,9 @@ class AlarmRuleState {
|
||||
private final Set<EntityKey> entityKeys;
|
||||
private PersistedAlarmRuleState state;
|
||||
private boolean updateFlag;
|
||||
private final DynamicPredicateValueCtx dynamicPredicateValueCtx;
|
||||
|
||||
AlarmRuleState(AlarmSeverity severity, AlarmRule alarmRule, Set<EntityKey> entityKeys, PersistedAlarmRuleState state) {
|
||||
AlarmRuleState(AlarmSeverity severity, AlarmRule alarmRule, Set<EntityKey> entityKeys, PersistedAlarmRuleState state, DynamicPredicateValueCtx dynamicPredicateValueCtx) {
|
||||
this.severity = severity;
|
||||
this.alarmRule = alarmRule;
|
||||
this.entityKeys = entityKeys;
|
||||
@ -80,6 +81,7 @@ class AlarmRuleState {
|
||||
}
|
||||
this.requiredDurationInMs = requiredDurationInMs;
|
||||
this.requiredRepeats = requiredRepeats;
|
||||
this.dynamicPredicateValueCtx = dynamicPredicateValueCtx;
|
||||
}
|
||||
|
||||
public boolean validateTsUpdate(Set<EntityKey> changedKeys) {
|
||||
@ -385,15 +387,24 @@ class AlarmRuleState {
|
||||
private <T> EntityKeyValue getDynamicPredicateValue(DataSnapshot data, FilterPredicateValue<T> value) {
|
||||
EntityKeyValue ekv = null;
|
||||
if (value.getDynamicValue() != null) {
|
||||
ekv = data.getValue(new EntityKey(EntityKeyType.ATTRIBUTE, value.getDynamicValue().getSourceAttribute()));
|
||||
if (ekv == null) {
|
||||
ekv = data.getValue(new EntityKey(EntityKeyType.SERVER_ATTRIBUTE, value.getDynamicValue().getSourceAttribute()));
|
||||
if (ekv == null) {
|
||||
ekv = data.getValue(new EntityKey(EntityKeyType.SHARED_ATTRIBUTE, value.getDynamicValue().getSourceAttribute()));
|
||||
switch (value.getDynamicValue().getSourceType()) {
|
||||
case CURRENT_TENANT:
|
||||
ekv = dynamicPredicateValueCtx.getTenantValue(value.getDynamicValue().getSourceAttribute());
|
||||
break;
|
||||
case CURRENT_CUSTOMER:
|
||||
ekv = dynamicPredicateValueCtx.getCustomerValue(value.getDynamicValue().getSourceAttribute());
|
||||
break;
|
||||
case CURRENT_DEVICE:
|
||||
ekv = data.getValue(new EntityKey(EntityKeyType.ATTRIBUTE, value.getDynamicValue().getSourceAttribute()));
|
||||
if (ekv == null) {
|
||||
ekv = data.getValue(new EntityKey(EntityKeyType.CLIENT_ATTRIBUTE, value.getDynamicValue().getSourceAttribute()));
|
||||
ekv = data.getValue(new EntityKey(EntityKeyType.SERVER_ATTRIBUTE, value.getDynamicValue().getSourceAttribute()));
|
||||
if (ekv == null) {
|
||||
ekv = data.getValue(new EntityKey(EntityKeyType.SHARED_ATTRIBUTE, value.getDynamicValue().getSourceAttribute()));
|
||||
if (ekv == null) {
|
||||
ekv = data.getValue(new EntityKey(EntityKeyType.CLIENT_ATTRIBUTE, value.getDynamicValue().getSourceAttribute()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return ekv;
|
||||
|
||||
@ -25,11 +25,14 @@ import org.thingsboard.rule.engine.api.TbContext;
|
||||
import org.thingsboard.rule.engine.profile.state.PersistedAlarmRuleState;
|
||||
import org.thingsboard.rule.engine.profile.state.PersistedAlarmState;
|
||||
import org.thingsboard.server.common.data.DataConstants;
|
||||
import org.thingsboard.server.common.data.Tenant;
|
||||
import org.thingsboard.server.common.data.alarm.Alarm;
|
||||
import org.thingsboard.server.common.data.alarm.AlarmSeverity;
|
||||
import org.thingsboard.server.common.data.alarm.AlarmStatus;
|
||||
import org.thingsboard.server.common.data.device.profile.DeviceProfileAlarm;
|
||||
import org.thingsboard.server.common.data.id.CustomerId;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.query.EntityKeyType;
|
||||
import org.thingsboard.server.common.data.query.KeyFilter;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
@ -57,10 +60,12 @@ class AlarmState {
|
||||
private volatile TbMsgMetaData lastMsgMetaData;
|
||||
private volatile String lastMsgQueueName;
|
||||
private volatile DataSnapshot dataSnapshot;
|
||||
private final DynamicPredicateValueCtx dynamicPredicateValueCtx;
|
||||
|
||||
AlarmState(ProfileState deviceProfile, EntityId originator, DeviceProfileAlarm alarmDefinition, PersistedAlarmState alarmState) {
|
||||
AlarmState(ProfileState deviceProfile, EntityId originator, DeviceProfileAlarm alarmDefinition, PersistedAlarmState alarmState, DynamicPredicateValueCtx dynamicPredicateValueCtx) {
|
||||
this.deviceProfile = deviceProfile;
|
||||
this.originator = originator;
|
||||
this.dynamicPredicateValueCtx = dynamicPredicateValueCtx;
|
||||
this.updateState(alarmDefinition, alarmState);
|
||||
}
|
||||
|
||||
@ -188,12 +193,12 @@ class AlarmState {
|
||||
}
|
||||
}
|
||||
createRulesSortedBySeverityDesc.add(new AlarmRuleState(severity, rule,
|
||||
deviceProfile.getCreateAlarmKeys(alarm.getId(), severity), ruleState));
|
||||
deviceProfile.getCreateAlarmKeys(alarm.getId(), severity), ruleState, dynamicPredicateValueCtx));
|
||||
});
|
||||
createRulesSortedBySeverityDesc.sort(Comparator.comparingInt(state -> state.getSeverity().ordinal()));
|
||||
PersistedAlarmRuleState ruleState = alarmState == null ? null : alarmState.getClearRuleState();
|
||||
if (alarmDefinition.getClearRule() != null) {
|
||||
clearState = new AlarmRuleState(null, alarmDefinition.getClearRule(), deviceProfile.getClearAlarmKeys(alarm.getId()), ruleState);
|
||||
clearState = new AlarmRuleState(null, alarmDefinition.getClearRule(), deviceProfile.getClearAlarmKeys(alarm.getId()), ruleState, dynamicPredicateValueCtx);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -63,11 +63,15 @@ class DeviceState {
|
||||
private PersistedDeviceState pds;
|
||||
private DataSnapshot latestValues;
|
||||
private final ConcurrentMap<String, AlarmState> alarmStates = new ConcurrentHashMap<>();
|
||||
private final DynamicPredicateValueCtx dynamicPredicateValueCtx;
|
||||
|
||||
DeviceState(TbContext ctx, TbDeviceProfileNodeConfiguration config, DeviceId deviceId, ProfileState deviceProfile, RuleNodeState state) {
|
||||
this.persistState = config.isPersistAlarmRulesState();
|
||||
this.deviceId = deviceId;
|
||||
this.deviceProfile = deviceProfile;
|
||||
|
||||
this.dynamicPredicateValueCtx = new DynamicPredicateValueCtxImpl(ctx.getTenantId(), deviceId, ctx);
|
||||
|
||||
if (config.isPersistAlarmRulesState()) {
|
||||
if (state != null) {
|
||||
this.state = state;
|
||||
@ -87,7 +91,7 @@ class DeviceState {
|
||||
if (pds != null) {
|
||||
for (DeviceProfileAlarm alarm : deviceProfile.getAlarmSettings()) {
|
||||
alarmStates.computeIfAbsent(alarm.getId(),
|
||||
a -> new AlarmState(deviceProfile, deviceId, alarm, getOrInitPersistedAlarmState(alarm)));
|
||||
a -> new AlarmState(deviceProfile, deviceId, alarm, getOrInitPersistedAlarmState(alarm), dynamicPredicateValueCtx));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -108,7 +112,7 @@ class DeviceState {
|
||||
if (alarmStates.containsKey(alarm.getId())) {
|
||||
alarmStates.get(alarm.getId()).updateState(alarm, getOrInitPersistedAlarmState(alarm));
|
||||
} else {
|
||||
alarmStates.putIfAbsent(alarm.getId(), new AlarmState(this.deviceProfile, deviceId, alarm, getOrInitPersistedAlarmState(alarm)));
|
||||
alarmStates.putIfAbsent(alarm.getId(), new AlarmState(this.deviceProfile, deviceId, alarm, getOrInitPersistedAlarmState(alarm), dynamicPredicateValueCtx));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -143,6 +147,9 @@ class DeviceState {
|
||||
} else if (msg.getType().equals(DataConstants.ALARM_ACK)) {
|
||||
processAlarmAckNotification(ctx, msg);
|
||||
} else {
|
||||
if (msg.getType().equals(DataConstants.ENTITY_ASSIGNED) || msg.getType().equals(DataConstants.ENTITY_UNASSIGNED)) {
|
||||
dynamicPredicateValueCtx.resetCustomer();
|
||||
}
|
||||
ctx.tellSuccess(msg);
|
||||
}
|
||||
if (persistState && stateChanged) {
|
||||
@ -156,7 +163,7 @@ class DeviceState {
|
||||
Alarm alarmNf = JacksonUtil.fromString(msg.getData(), Alarm.class);
|
||||
for (DeviceProfileAlarm alarm : deviceProfile.getAlarmSettings()) {
|
||||
AlarmState alarmState = alarmStates.computeIfAbsent(alarm.getId(),
|
||||
a -> new AlarmState(this.deviceProfile, deviceId, alarm, getOrInitPersistedAlarmState(alarm)));
|
||||
a -> new AlarmState(this.deviceProfile, deviceId, alarm, getOrInitPersistedAlarmState(alarm), dynamicPredicateValueCtx));
|
||||
stateChanged |= alarmState.processAlarmClear(ctx, alarmNf);
|
||||
}
|
||||
ctx.tellSuccess(msg);
|
||||
@ -167,7 +174,7 @@ class DeviceState {
|
||||
Alarm alarmNf = JacksonUtil.fromString(msg.getData(), Alarm.class);
|
||||
for (DeviceProfileAlarm alarm : deviceProfile.getAlarmSettings()) {
|
||||
AlarmState alarmState = alarmStates.computeIfAbsent(alarm.getId(),
|
||||
a -> new AlarmState(this.deviceProfile, deviceId, alarm, getOrInitPersistedAlarmState(alarm)));
|
||||
a -> new AlarmState(this.deviceProfile, deviceId, alarm, getOrInitPersistedAlarmState(alarm), dynamicPredicateValueCtx));
|
||||
alarmState.processAckAlarm(alarmNf);
|
||||
}
|
||||
ctx.tellSuccess(msg);
|
||||
@ -195,7 +202,7 @@ class DeviceState {
|
||||
keys.forEach(key -> latestValues.removeValue(new EntityKey(keyType, key)));
|
||||
for (DeviceProfileAlarm alarm : deviceProfile.getAlarmSettings()) {
|
||||
AlarmState alarmState = alarmStates.computeIfAbsent(alarm.getId(),
|
||||
a -> new AlarmState(this.deviceProfile, deviceId, alarm, getOrInitPersistedAlarmState(alarm)));
|
||||
a -> new AlarmState(this.deviceProfile, deviceId, alarm, getOrInitPersistedAlarmState(alarm), dynamicPredicateValueCtx));
|
||||
stateChanged |= alarmState.process(ctx, msg, latestValues, null);
|
||||
}
|
||||
}
|
||||
@ -214,7 +221,7 @@ class DeviceState {
|
||||
SnapshotUpdate update = merge(latestValues, attributes, scope);
|
||||
for (DeviceProfileAlarm alarm : deviceProfile.getAlarmSettings()) {
|
||||
AlarmState alarmState = alarmStates.computeIfAbsent(alarm.getId(),
|
||||
a -> new AlarmState(this.deviceProfile, deviceId, alarm, getOrInitPersistedAlarmState(alarm)));
|
||||
a -> new AlarmState(this.deviceProfile, deviceId, alarm, getOrInitPersistedAlarmState(alarm), dynamicPredicateValueCtx));
|
||||
stateChanged |= alarmState.process(ctx, msg, latestValues, update);
|
||||
}
|
||||
}
|
||||
@ -233,7 +240,7 @@ class DeviceState {
|
||||
if (update.hasUpdate()) {
|
||||
for (DeviceProfileAlarm alarm : deviceProfile.getAlarmSettings()) {
|
||||
AlarmState alarmState = alarmStates.computeIfAbsent(alarm.getId(),
|
||||
a -> new AlarmState(this.deviceProfile, deviceId, alarm, getOrInitPersistedAlarmState(alarm)));
|
||||
a -> new AlarmState(this.deviceProfile, deviceId, alarm, getOrInitPersistedAlarmState(alarm), dynamicPredicateValueCtx));
|
||||
stateChanged |= alarmState.process(ctx, msg, latestValues, update);
|
||||
}
|
||||
}
|
||||
@ -374,7 +381,7 @@ class DeviceState {
|
||||
}
|
||||
}
|
||||
|
||||
private EntityKeyValue toEntityValue(KvEntry entry) {
|
||||
public static EntityKeyValue toEntityValue(KvEntry entry) {
|
||||
switch (entry.getDataType()) {
|
||||
case STRING:
|
||||
return EntityKeyValue.fromString(entry.getStrValue().get());
|
||||
|
||||
@ -0,0 +1,25 @@
|
||||
/**
|
||||
* Copyright © 2016-2021 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.rule.engine.profile;
|
||||
|
||||
public interface DynamicPredicateValueCtx {
|
||||
|
||||
EntityKeyValue getTenantValue(String key);
|
||||
|
||||
EntityKeyValue getCustomerValue(String key);
|
||||
|
||||
void resetCustomer();
|
||||
}
|
||||
@ -0,0 +1,74 @@
|
||||
/**
|
||||
* Copyright © 2016-2021 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.rule.engine.profile;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.thingsboard.rule.engine.api.TbContext;
|
||||
import org.thingsboard.server.common.data.DataConstants;
|
||||
import org.thingsboard.server.common.data.Device;
|
||||
import org.thingsboard.server.common.data.id.CustomerId;
|
||||
import org.thingsboard.server.common.data.id.DeviceId;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
|
||||
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
@Slf4j
|
||||
public class DynamicPredicateValueCtxImpl implements DynamicPredicateValueCtx {
|
||||
private final TenantId tenantId;
|
||||
private CustomerId customerId;
|
||||
private final DeviceId deviceId;
|
||||
private final TbContext ctx;
|
||||
|
||||
public DynamicPredicateValueCtxImpl(TenantId tenantId, DeviceId deviceId, TbContext ctx) {
|
||||
this.tenantId = tenantId;
|
||||
this.deviceId = deviceId;
|
||||
this.ctx = ctx;
|
||||
resetCustomer();
|
||||
}
|
||||
|
||||
@Override
|
||||
public EntityKeyValue getTenantValue(String key) {
|
||||
return getValue(tenantId, key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public EntityKeyValue getCustomerValue(String key) {
|
||||
return customerId == null || customerId.isNullUid() ? null : getValue(customerId, key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resetCustomer() {
|
||||
Device device = ctx.getDeviceService().findDeviceById(tenantId, deviceId);
|
||||
if (device != null) {
|
||||
this.customerId = device.getCustomerId();
|
||||
}
|
||||
}
|
||||
|
||||
private EntityKeyValue getValue(EntityId entityId, String key) {
|
||||
try {
|
||||
Optional<AttributeKvEntry> entry = ctx.getAttributesService().find(tenantId, entityId, DataConstants.SERVER_SCOPE, key).get();
|
||||
if (entry.isPresent()) {
|
||||
return DeviceState.toEntityValue(entry.get());
|
||||
}
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
log.warn("Failed to get attribute by key: {} for {}: [{}]", key, entityId.getEntityType(), entityId.getId());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
@ -95,7 +95,9 @@ class ProfileState {
|
||||
case NUMERIC:
|
||||
case BOOLEAN:
|
||||
DynamicValue value = ((SimpleKeyFilterPredicate) predicate).getValue().getDynamicValue();
|
||||
if (value != null && value.getSourceType() == DynamicValueSourceType.CURRENT_DEVICE) {
|
||||
if (value != null && (value.getSourceType() == DynamicValueSourceType.CURRENT_TENANT ||
|
||||
value.getSourceType() == DynamicValueSourceType.CURRENT_CUSTOMER ||
|
||||
value.getSourceType() == DynamicValueSourceType.CURRENT_DEVICE)) {
|
||||
EntityKey entityKey = new EntityKey(EntityKeyType.ATTRIBUTE, value.getSourceAttribute());
|
||||
entityKeys.add(entityKey);
|
||||
ruleKeys.add(entityKey);
|
||||
|
||||
@ -54,7 +54,7 @@ export class FilterPredicateValueComponent implements ControlValueAccessor, OnIn
|
||||
if (allow) {
|
||||
this.dynamicValueSourceTypes.push(DynamicValueSourceType.CURRENT_USER);
|
||||
} else {
|
||||
this.dynamicValueSourceTypes = [DynamicValueSourceType.CURRENT_DEVICE];
|
||||
this.dynamicValueSourceTypes.push(DynamicValueSourceType.CURRENT_DEVICE);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user