Merge pull request #193 from thingsboard/feature/TB-69
TB-69: Implementation
This commit is contained in:
commit
47f44d54cf
@ -37,6 +37,7 @@ import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
|
||||
import org.thingsboard.server.common.msg.cluster.ServerAddress;
|
||||
import org.thingsboard.server.common.transport.auth.DeviceAuthService;
|
||||
import org.thingsboard.server.controller.plugin.PluginWebSocketMsgEndpoint;
|
||||
import org.thingsboard.server.dao.alarm.AlarmService;
|
||||
import org.thingsboard.server.dao.asset.AssetService;
|
||||
import org.thingsboard.server.dao.attributes.AttributesService;
|
||||
import org.thingsboard.server.dao.customer.CustomerService;
|
||||
@ -105,6 +106,9 @@ public class ActorSystemContext {
|
||||
@Autowired
|
||||
@Getter private EventService eventService;
|
||||
|
||||
@Autowired
|
||||
@Getter private AlarmService alarmService;
|
||||
|
||||
@Autowired
|
||||
@Getter @Setter private PluginWebSocketMsgEndpoint wsMsgEndpoint;
|
||||
|
||||
|
||||
@ -15,22 +15,27 @@
|
||||
*/
|
||||
package org.thingsboard.server.actors.rule;
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import org.thingsboard.server.actors.ActorSystemContext;
|
||||
import org.thingsboard.server.common.data.Event;
|
||||
import org.thingsboard.server.common.data.alarm.Alarm;
|
||||
import org.thingsboard.server.common.data.alarm.AlarmId;
|
||||
import org.thingsboard.server.common.data.id.*;
|
||||
import org.thingsboard.server.dao.alarm.AlarmService;
|
||||
import org.thingsboard.server.dao.event.EventService;
|
||||
import org.thingsboard.server.dao.timeseries.TimeseriesService;
|
||||
import org.thingsboard.server.extensions.api.device.DeviceAttributes;
|
||||
import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
|
||||
import org.thingsboard.server.extensions.api.device.DeviceMetaData;
|
||||
import org.thingsboard.server.extensions.api.rules.RuleContext;
|
||||
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
public class RuleProcessingContext implements RuleContext {
|
||||
|
||||
private final TimeseriesService tsService;
|
||||
private final EventService eventService;
|
||||
private final AlarmService alarmService;
|
||||
private final RuleId ruleId;
|
||||
private TenantId tenantId;
|
||||
private CustomerId customerId;
|
||||
@ -40,6 +45,7 @@ public class RuleProcessingContext implements RuleContext {
|
||||
RuleProcessingContext(ActorSystemContext systemContext, RuleId ruleId) {
|
||||
this.tsService = systemContext.getTsService();
|
||||
this.eventService = systemContext.getEventService();
|
||||
this.alarmService = systemContext.getAlarmService();
|
||||
this.ruleId = ruleId;
|
||||
}
|
||||
|
||||
@ -77,6 +83,25 @@ public class RuleProcessingContext implements RuleContext {
|
||||
return eventService.findEvent(tenantId, deviceId, eventType, eventUid);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Alarm createOrUpdateAlarm(Alarm alarm) {
|
||||
alarm.setTenantId(tenantId);
|
||||
return alarmService.createOrUpdateAlarm(alarm);
|
||||
}
|
||||
|
||||
public Optional<Alarm> findLatestAlarm(EntityId originator, String alarmType) {
|
||||
try {
|
||||
return Optional.ofNullable(alarmService.findLatestByOriginatorAndType(tenantId, originator, alarmType).get());
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
throw new RuntimeException("Failed to lookup alarm!", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<Boolean> clearAlarm(AlarmId alarmId, long clearTs) {
|
||||
return alarmService.clearAlarm(alarmId, clearTs);
|
||||
}
|
||||
|
||||
private void checkEvent(Event event) {
|
||||
if (event.getTenantId() == null) {
|
||||
event.setTenantId(tenantId);
|
||||
|
||||
@ -110,7 +110,6 @@ public class EntityRelation {
|
||||
if (to != null ? !to.equals(that.to) : that.to != null) return false;
|
||||
if (type != null ? !type.equals(that.type) : that.type != null) return false;
|
||||
return typeGroup == that.typeGroup;
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -18,6 +18,7 @@ package org.thingsboard.server.dao.alarm;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import org.thingsboard.server.common.data.alarm.*;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.page.TimePageData;
|
||||
|
||||
/**
|
||||
@ -40,4 +41,6 @@ public interface AlarmService {
|
||||
AlarmSeverity findHighestAlarmSeverity(EntityId entityId, AlarmSearchStatus alarmSearchStatus,
|
||||
AlarmStatus alarmStatus);
|
||||
|
||||
ListenableFuture<Alarm> findLatestByOriginatorAndType(TenantId tenantId, EntityId originator, String type);
|
||||
|
||||
}
|
||||
|
||||
@ -27,6 +27,7 @@ import org.springframework.util.StringUtils;
|
||||
import org.thingsboard.server.common.data.Tenant;
|
||||
import org.thingsboard.server.common.data.alarm.*;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.page.TimePageData;
|
||||
import org.thingsboard.server.common.data.page.TimePageLink;
|
||||
import org.thingsboard.server.common.data.relation.EntityRelation;
|
||||
@ -111,6 +112,10 @@ public class BaseAlarmService extends AbstractEntityService implements AlarmServ
|
||||
}
|
||||
}
|
||||
|
||||
public ListenableFuture<Alarm> findLatestByOriginatorAndType(TenantId tenantId, EntityId originator, String type) {
|
||||
return alarmDao.findLatestByOriginatorAndType(tenantId, originator, type);
|
||||
}
|
||||
|
||||
private Alarm createAlarm(Alarm alarm) throws InterruptedException, ExecutionException {
|
||||
log.debug("New Alarm : {}", alarm);
|
||||
Alarm saved = alarmDao.save(alarm);
|
||||
@ -204,15 +209,15 @@ public class BaseAlarmService extends AbstractEntityService implements AlarmServ
|
||||
validateId(alarmId, "Incorrect alarmId " + alarmId);
|
||||
return Futures.transform(alarmDao.findAlarmByIdAsync(alarmId.getId()),
|
||||
(AsyncFunction<Alarm, AlarmInfo>) alarm1 -> {
|
||||
AlarmInfo alarmInfo = new AlarmInfo(alarm1);
|
||||
return Futures.transform(
|
||||
entityService.fetchEntityNameAsync(alarmInfo.getOriginator()), (Function<String, AlarmInfo>)
|
||||
originatorName -> {
|
||||
alarmInfo.setOriginatorName(originatorName);
|
||||
return alarmInfo;
|
||||
}
|
||||
);
|
||||
});
|
||||
AlarmInfo alarmInfo = new AlarmInfo(alarm1);
|
||||
return Futures.transform(
|
||||
entityService.fetchEntityNameAsync(alarmInfo.getOriginator()), (Function<String, AlarmInfo>)
|
||||
originatorName -> {
|
||||
alarmInfo.setOriginatorName(originatorName);
|
||||
return alarmInfo;
|
||||
}
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -234,7 +239,7 @@ public class BaseAlarmService extends AbstractEntityService implements AlarmServ
|
||||
));
|
||||
}
|
||||
return Futures.successfulAsList(alarmFutures);
|
||||
});
|
||||
});
|
||||
}
|
||||
return Futures.transform(alarms, new Function<List<AlarmInfo>, TimePageData<AlarmInfo>>() {
|
||||
@Nullable
|
||||
@ -247,7 +252,7 @@ public class BaseAlarmService extends AbstractEntityService implements AlarmServ
|
||||
|
||||
@Override
|
||||
public AlarmSeverity findHighestAlarmSeverity(EntityId entityId, AlarmSearchStatus alarmSearchStatus,
|
||||
AlarmStatus alarmStatus) {
|
||||
AlarmStatus alarmStatus) {
|
||||
TimePageLink nextPageLink = new TimePageLink(100);
|
||||
boolean hasNext = true;
|
||||
AlarmSeverity highestSeverity = null;
|
||||
@ -321,7 +326,7 @@ public class BaseAlarmService extends AbstractEntityService implements AlarmServ
|
||||
List<EntityId> parentEntities = relationService.findByQuery(query).get().stream().map(r -> r.getFrom()).collect(Collectors.toList());
|
||||
for (EntityId parentId : parentEntities) {
|
||||
updateAlarmRelation(parentId, alarm.getId(), oldStatus, newStatus);
|
||||
}
|
||||
}
|
||||
updateAlarmRelation(alarm.getOriginator(), alarm.getId(), oldStatus, newStatus);
|
||||
} catch (ExecutionException | InterruptedException e) {
|
||||
log.warn("[{}] Failed to update relations. Old status: [{}], New status: [{}]", alarm.getId(), oldStatus, newStatus);
|
||||
|
||||
@ -15,9 +15,12 @@
|
||||
*/
|
||||
package org.thingsboard.server.extensions.api.rules;
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import org.thingsboard.server.common.data.Event;
|
||||
import org.thingsboard.server.common.data.alarm.Alarm;
|
||||
import org.thingsboard.server.common.data.alarm.AlarmId;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.RuleId;
|
||||
import org.thingsboard.server.extensions.api.device.DeviceAttributes;
|
||||
import org.thingsboard.server.extensions.api.device.DeviceMetaData;
|
||||
|
||||
import java.util.Optional;
|
||||
@ -34,4 +37,9 @@ public interface RuleContext {
|
||||
|
||||
Optional<Event> findEvent(String eventType, String eventUid);
|
||||
|
||||
Optional<Alarm> findLatestAlarm(EntityId originator, String alarmType);
|
||||
|
||||
Alarm createOrUpdateAlarm(Alarm alarm);
|
||||
|
||||
ListenableFuture<Boolean> clearAlarm(AlarmId id, long clearTs);
|
||||
}
|
||||
|
||||
@ -18,6 +18,8 @@ package org.thingsboard.server.extensions.api.rules;
|
||||
import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
|
||||
import org.thingsboard.server.extensions.api.component.ConfigurableComponent;
|
||||
|
||||
import javax.script.ScriptException;
|
||||
|
||||
/**
|
||||
* @author Andrew Shvayka
|
||||
*/
|
||||
|
||||
@ -75,18 +75,4 @@ public abstract class BasicJsFilter implements RuleFilter<JsFilterConfiguration>
|
||||
}
|
||||
}
|
||||
|
||||
protected static Object getValue(KvEntry attr) {
|
||||
switch (attr.getDataType()) {
|
||||
case STRING:
|
||||
return attr.getStrValue().get();
|
||||
case LONG:
|
||||
return attr.getLongValue().get();
|
||||
case DOUBLE:
|
||||
return attr.getDoubleValue().get();
|
||||
case BOOLEAN:
|
||||
return attr.getBooleanValue().get();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -16,9 +16,6 @@
|
||||
package org.thingsboard.server.extensions.core.filter;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
|
||||
import org.thingsboard.server.common.msg.core.UpdateAttributesRequest;
|
||||
import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
|
||||
import org.thingsboard.server.common.msg.session.FromDeviceMsg;
|
||||
@ -28,10 +25,6 @@ import org.thingsboard.server.extensions.api.rules.RuleContext;
|
||||
|
||||
import javax.script.Bindings;
|
||||
import javax.script.ScriptException;
|
||||
import javax.script.SimpleBindings;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @author Andrew Shvayka
|
||||
@ -40,25 +33,18 @@ import java.util.Map;
|
||||
@Slf4j
|
||||
public class DeviceAttributesFilter extends BasicJsFilter {
|
||||
|
||||
public static final String CLIENT_SIDE = "cs";
|
||||
public static final String SERVER_SIDE = "ss";
|
||||
public static final String SHARED = "shared";
|
||||
|
||||
@Override
|
||||
protected boolean doFilter(RuleContext ctx, ToDeviceActorMsg msg) throws ScriptException {
|
||||
return evaluator.execute(toBindings(ctx.getDeviceMetaData().getDeviceAttributes(), msg != null ? msg.getPayload() : null));
|
||||
}
|
||||
|
||||
private Bindings toBindings(DeviceAttributes attributes, FromDeviceMsg msg) {
|
||||
Bindings bindings = new SimpleBindings();
|
||||
convertListEntries(bindings, CLIENT_SIDE, attributes.getClientSideAttributes());
|
||||
convertListEntries(bindings, SERVER_SIDE, attributes.getServerSideAttributes());
|
||||
convertListEntries(bindings, SHARED, attributes.getServerSidePublicAttributes());
|
||||
Bindings bindings = NashornJsEvaluator.getAttributeBindings(attributes);
|
||||
|
||||
if (msg != null) {
|
||||
switch (msg.getMsgType()) {
|
||||
case POST_ATTRIBUTES_REQUEST:
|
||||
updateBindings(bindings, (UpdateAttributesRequest) msg);
|
||||
bindings = NashornJsEvaluator.updateBindings(bindings, (UpdateAttributesRequest) msg);
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -66,28 +52,4 @@ public class DeviceAttributesFilter extends BasicJsFilter {
|
||||
return bindings;
|
||||
}
|
||||
|
||||
private void updateBindings(Bindings bindings, UpdateAttributesRequest msg) {
|
||||
Map<String, Object> attrMap = (Map<String, Object>) bindings.get(CLIENT_SIDE);
|
||||
for (AttributeKvEntry attr : msg.getAttributes()) {
|
||||
if (!CLIENT_SIDE.equalsIgnoreCase(attr.getKey()) && !SERVER_SIDE.equalsIgnoreCase(attr.getKey())
|
||||
&& !SHARED.equalsIgnoreCase(attr.getKey())) {
|
||||
bindings.put(attr.getKey(), getValue(attr));
|
||||
}
|
||||
attrMap.put(attr.getKey(), getValue(attr));
|
||||
}
|
||||
bindings.put(CLIENT_SIDE, attrMap);
|
||||
}
|
||||
|
||||
public static Bindings convertListEntries(Bindings bindings, String attributesVarName, Collection<AttributeKvEntry> attributes) {
|
||||
Map<String, Object> attrMap = new HashMap<>();
|
||||
for (AttributeKvEntry attr : attributes) {
|
||||
if (!CLIENT_SIDE.equalsIgnoreCase(attr.getKey()) && !SERVER_SIDE.equalsIgnoreCase(attr.getKey())
|
||||
&& !SHARED.equalsIgnoreCase(attr.getKey())) {
|
||||
bindings.put(attr.getKey(), getValue(attr));
|
||||
}
|
||||
attrMap.put(attr.getKey(), getValue(attr));
|
||||
}
|
||||
bindings.put(attributesVarName, attrMap);
|
||||
return bindings;
|
||||
}
|
||||
}
|
||||
|
||||
@ -23,9 +23,7 @@ import org.thingsboard.server.common.msg.session.FromDeviceMsg;
|
||||
import org.thingsboard.server.extensions.api.component.Filter;
|
||||
import org.thingsboard.server.extensions.api.rules.RuleContext;
|
||||
|
||||
import javax.script.Bindings;
|
||||
import javax.script.ScriptException;
|
||||
import javax.script.SimpleBindings;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
@ -41,7 +39,7 @@ public class DeviceTelemetryFilter extends BasicJsFilter {
|
||||
if (deviceMsg instanceof TelemetryUploadRequest) {
|
||||
TelemetryUploadRequest telemetryMsg = (TelemetryUploadRequest) deviceMsg;
|
||||
for (List<KvEntry> entries : telemetryMsg.getData().values()) {
|
||||
if (evaluator.execute(toBindings(entries))) {
|
||||
if (evaluator.execute(NashornJsEvaluator.toBindings(entries))) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@ -49,12 +47,4 @@ public class DeviceTelemetryFilter extends BasicJsFilter {
|
||||
return false;
|
||||
}
|
||||
|
||||
private Bindings toBindings(List<KvEntry> entries) {
|
||||
Bindings bindings = new SimpleBindings();
|
||||
for (KvEntry entry : entries) {
|
||||
bindings.put(entry.getKey(), getValue(entry));
|
||||
}
|
||||
return bindings;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -17,10 +17,16 @@ package org.thingsboard.server.extensions.core.filter;
|
||||
|
||||
import jdk.nashorn.api.scripting.NashornScriptEngineFactory;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
|
||||
import org.thingsboard.server.common.data.kv.KvEntry;
|
||||
import org.thingsboard.server.common.msg.core.UpdateAttributesRequest;
|
||||
import org.thingsboard.server.extensions.api.device.DeviceAttributes;
|
||||
|
||||
import javax.script.*;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @author Andrew Shvayka
|
||||
@ -28,6 +34,9 @@ import javax.script.*;
|
||||
@Slf4j
|
||||
public class NashornJsEvaluator {
|
||||
|
||||
public static final String CLIENT_SIDE = "cs";
|
||||
public static final String SERVER_SIDE = "ss";
|
||||
public static final String SHARED = "shared";
|
||||
private static NashornScriptEngineFactory factory = new NashornScriptEngineFactory();
|
||||
|
||||
private CompiledScript engine;
|
||||
@ -47,6 +56,65 @@ public class NashornJsEvaluator {
|
||||
}
|
||||
}
|
||||
|
||||
public static Bindings convertListEntries(Bindings bindings, String attributesVarName, Collection<AttributeKvEntry> attributes) {
|
||||
Map<String, Object> attrMap = new HashMap<>();
|
||||
for (AttributeKvEntry attr : attributes) {
|
||||
if (!CLIENT_SIDE.equalsIgnoreCase(attr.getKey()) && !SERVER_SIDE.equalsIgnoreCase(attr.getKey())
|
||||
&& !SHARED.equalsIgnoreCase(attr.getKey())) {
|
||||
bindings.put(attr.getKey(), getValue(attr));
|
||||
}
|
||||
attrMap.put(attr.getKey(), getValue(attr));
|
||||
}
|
||||
bindings.put(attributesVarName, attrMap);
|
||||
return bindings;
|
||||
}
|
||||
|
||||
public static Bindings updateBindings(Bindings bindings, UpdateAttributesRequest msg) {
|
||||
Map<String, Object> attrMap = (Map<String, Object>) bindings.get(CLIENT_SIDE);
|
||||
for (AttributeKvEntry attr : msg.getAttributes()) {
|
||||
if (!CLIENT_SIDE.equalsIgnoreCase(attr.getKey()) && !SERVER_SIDE.equalsIgnoreCase(attr.getKey())
|
||||
&& !SHARED.equalsIgnoreCase(attr.getKey())) {
|
||||
bindings.put(attr.getKey(), getValue(attr));
|
||||
}
|
||||
attrMap.put(attr.getKey(), getValue(attr));
|
||||
}
|
||||
bindings.put(CLIENT_SIDE, attrMap);
|
||||
return bindings;
|
||||
}
|
||||
|
||||
protected static Object getValue(KvEntry attr) {
|
||||
switch (attr.getDataType()) {
|
||||
case STRING:
|
||||
return attr.getStrValue().get();
|
||||
case LONG:
|
||||
return attr.getLongValue().get();
|
||||
case DOUBLE:
|
||||
return attr.getDoubleValue().get();
|
||||
case BOOLEAN:
|
||||
return attr.getBooleanValue().get();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public static Bindings toBindings(List<KvEntry> entries) {
|
||||
return toBindings(new SimpleBindings(), entries);
|
||||
}
|
||||
|
||||
public static Bindings toBindings(Bindings bindings, List<KvEntry> entries) {
|
||||
for (KvEntry entry : entries) {
|
||||
bindings.put(entry.getKey(), getValue(entry));
|
||||
}
|
||||
return bindings;
|
||||
}
|
||||
|
||||
public static Bindings getAttributeBindings(DeviceAttributes attributes) {
|
||||
Bindings bindings = new SimpleBindings();
|
||||
convertListEntries(bindings, CLIENT_SIDE, attributes.getClientSideAttributes());
|
||||
convertListEntries(bindings, SERVER_SIDE, attributes.getServerSideAttributes());
|
||||
convertListEntries(bindings, SHARED, attributes.getServerSidePublicAttributes());
|
||||
return bindings;
|
||||
}
|
||||
|
||||
public Boolean execute(Bindings bindings) throws ScriptException {
|
||||
Object eval = engine.eval(bindings);
|
||||
if (eval instanceof Boolean) {
|
||||
|
||||
@ -32,7 +32,7 @@ import java.util.Optional;
|
||||
/**
|
||||
* @author Andrew Shvayka
|
||||
*/
|
||||
@Processor(name = "Alarm Deduplication Processor", descriptor = "AlarmDeduplicationProcessorDescriptor.json",
|
||||
@Processor(name = "(Deprecated) Alarm Deduplication Processor", descriptor = "AlarmDeduplicationProcessorDescriptor.json",
|
||||
configuration = AlarmDeduplicationProcessorConfiguration.class)
|
||||
@Slf4j
|
||||
public class AlarmDeduplicationProcessor extends SimpleRuleLifecycleComponent
|
||||
|
||||
@ -0,0 +1,213 @@
|
||||
/**
|
||||
* Copyright © 2016-2017 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.extensions.core.processor;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.velocity.Template;
|
||||
import org.apache.velocity.VelocityContext;
|
||||
import org.apache.velocity.runtime.parser.ParseException;
|
||||
import org.thingsboard.server.common.data.alarm.Alarm;
|
||||
import org.thingsboard.server.common.data.alarm.AlarmSeverity;
|
||||
import org.thingsboard.server.common.data.alarm.AlarmStatus;
|
||||
import org.thingsboard.server.common.data.kv.KvEntry;
|
||||
import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;
|
||||
import org.thingsboard.server.common.msg.core.UpdateAttributesRequest;
|
||||
import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
|
||||
import org.thingsboard.server.common.msg.session.FromDeviceMsg;
|
||||
import org.thingsboard.server.extensions.api.component.Processor;
|
||||
import org.thingsboard.server.extensions.api.rules.*;
|
||||
import org.thingsboard.server.extensions.core.filter.NashornJsEvaluator;
|
||||
import org.thingsboard.server.extensions.core.utils.VelocityUtils;
|
||||
|
||||
import javax.script.Bindings;
|
||||
import javax.script.ScriptException;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author Andrew Shvayka
|
||||
*/
|
||||
@Processor(name = "Alarm Processor", descriptor = "AlarmProcessorDescriptor.json",
|
||||
configuration = AlarmProcessorConfiguration.class)
|
||||
@Slf4j
|
||||
public class AlarmProcessor implements RuleProcessor<AlarmProcessorConfiguration> {
|
||||
|
||||
static final String IS_NEW_ALARM = "isNewAlarm";
|
||||
static final String IS_EXISTING_ALARM = "isExistingAlarm";
|
||||
static final String IS_CLEARED_ALARM = "isClearedAlarm";
|
||||
|
||||
protected NashornJsEvaluator newAlarmEvaluator;
|
||||
protected NashornJsEvaluator clearAlarmEvaluator;
|
||||
|
||||
private ObjectMapper mapper = new ObjectMapper();
|
||||
private AlarmProcessorConfiguration configuration;
|
||||
private AlarmStatus status;
|
||||
private AlarmSeverity severity;
|
||||
private Template alarmTypeTemplate;
|
||||
private Template alarmDetailsTemplate;
|
||||
|
||||
|
||||
@Override
|
||||
public void init(AlarmProcessorConfiguration configuration) {
|
||||
this.configuration = configuration;
|
||||
try {
|
||||
this.alarmTypeTemplate = VelocityUtils.create(configuration.getAlarmTypeTemplate(), "Alarm Type Template");
|
||||
this.alarmDetailsTemplate = VelocityUtils.create(configuration.getAlarmDetailsTemplate(), "Alarm Details Template");
|
||||
this.status = AlarmStatus.valueOf(configuration.getAlarmStatus());
|
||||
this.severity = AlarmSeverity.valueOf(configuration.getAlarmSeverity());
|
||||
initEvaluators();
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to create templates based on provided configuration!", e);
|
||||
throw new RuntimeException("Failed to create templates based on provided configuration!", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resume() {
|
||||
initEvaluators();
|
||||
log.debug("Resume method was called, but no impl provided!");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void suspend() {
|
||||
destroyEvaluators();
|
||||
log.debug("Suspend method was called, but no impl provided!");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
destroyEvaluators();
|
||||
log.debug("Stop method was called, but no impl provided!");
|
||||
}
|
||||
|
||||
@Override
|
||||
public RuleProcessingMetaData process(RuleContext ctx, ToDeviceActorMsg wrapper) throws RuleException {
|
||||
RuleProcessingMetaData md = new RuleProcessingMetaData();
|
||||
|
||||
FromDeviceMsg msg = wrapper.getPayload();
|
||||
Bindings bindings = buildBindings(ctx, msg);
|
||||
|
||||
boolean isActiveAlarm;
|
||||
boolean isClearedAlarm;
|
||||
|
||||
try {
|
||||
isActiveAlarm = newAlarmEvaluator.execute(bindings);
|
||||
isClearedAlarm = clearAlarmEvaluator.execute(bindings);
|
||||
} catch (ScriptException e) {
|
||||
log.debug("[{}] Failed to evaluate alarm expressions!", ctx.getRuleId(), e);
|
||||
throw new RuleException("Failed to evaluate alarm expressions!", e);
|
||||
}
|
||||
|
||||
if (!isActiveAlarm && !isClearedAlarm) {
|
||||
log.debug("[{}] Incoming message do not trigger alarm", ctx.getRuleId());
|
||||
return md;
|
||||
}
|
||||
|
||||
Alarm existing = null;
|
||||
if (isActiveAlarm) {
|
||||
Alarm alarm = buildAlarm(ctx, msg);
|
||||
existing = ctx.createOrUpdateAlarm(alarm);
|
||||
if (existing.getStartTs() == alarm.getStartTs()) {
|
||||
log.debug("[{}][{}] New Active Alarm detected");
|
||||
md.put(IS_NEW_ALARM, Boolean.TRUE);
|
||||
} else {
|
||||
log.debug("[{}][{}] Existing Active Alarm detected");
|
||||
md.put(IS_EXISTING_ALARM, Boolean.TRUE);
|
||||
}
|
||||
} else if (isClearedAlarm) {
|
||||
VelocityContext context = VelocityUtils.createContext(ctx.getDeviceMetaData(), msg);
|
||||
String alarmType = VelocityUtils.merge(alarmTypeTemplate, context);
|
||||
Optional<Alarm> alarm = ctx.findLatestAlarm(ctx.getDeviceMetaData().getDeviceId(), alarmType);
|
||||
if (alarm.isPresent()) {
|
||||
ctx.clearAlarm(alarm.get().getId(), System.currentTimeMillis());
|
||||
log.debug("[{}][{}] Existing Active Alarm cleared");
|
||||
md.put(IS_CLEARED_ALARM, Boolean.TRUE);
|
||||
existing = alarm.get();
|
||||
}
|
||||
}
|
||||
//TODO: handle cleared alarms
|
||||
|
||||
if (existing != null) {
|
||||
md.put("alarmId", existing.getId().getId());
|
||||
md.put("alarmType", existing.getType());
|
||||
md.put("alarmSeverity", existing.getSeverity());
|
||||
try {
|
||||
md.put("alarmDetails", mapper.writeValueAsString(existing.getDetails()));
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new RuleException("Failed to serialize alarm details", e);
|
||||
}
|
||||
}
|
||||
|
||||
return md;
|
||||
}
|
||||
|
||||
private Alarm buildAlarm(RuleContext ctx, FromDeviceMsg msg) throws RuleException {
|
||||
VelocityContext context = VelocityUtils.createContext(ctx.getDeviceMetaData(), msg);
|
||||
String alarmType = VelocityUtils.merge(alarmTypeTemplate, context);
|
||||
String alarmDetails = VelocityUtils.merge(alarmDetailsTemplate, context);
|
||||
|
||||
Alarm alarm = new Alarm();
|
||||
alarm.setOriginator(ctx.getDeviceMetaData().getDeviceId());
|
||||
alarm.setType(alarmType);
|
||||
|
||||
alarm.setStatus(status);
|
||||
alarm.setSeverity(severity);
|
||||
alarm.setPropagate(configuration.isAlarmPropagateFlag());
|
||||
|
||||
try {
|
||||
alarm.setDetails(mapper.readTree(alarmDetails));
|
||||
} catch (IOException e) {
|
||||
log.debug("[{}] Failed to parse alarm details {} as json string after evaluation.", ctx.getRuleId(), e);
|
||||
throw new RuleException("Failed to parse alarm details as json string after evaluation!", e);
|
||||
}
|
||||
return alarm;
|
||||
}
|
||||
|
||||
private Bindings buildBindings(RuleContext ctx, FromDeviceMsg msg) {
|
||||
Bindings bindings = NashornJsEvaluator.getAttributeBindings(ctx.getDeviceMetaData().getDeviceAttributes());
|
||||
if (msg != null) {
|
||||
switch (msg.getMsgType()) {
|
||||
case POST_ATTRIBUTES_REQUEST:
|
||||
bindings = NashornJsEvaluator.updateBindings(bindings, (UpdateAttributesRequest) msg);
|
||||
break;
|
||||
case POST_TELEMETRY_REQUEST:
|
||||
TelemetryUploadRequest telemetryMsg = (TelemetryUploadRequest) msg;
|
||||
for (List<KvEntry> entries : telemetryMsg.getData().values()) {
|
||||
bindings = NashornJsEvaluator.toBindings(bindings, entries);
|
||||
}
|
||||
}
|
||||
}
|
||||
return bindings;
|
||||
}
|
||||
|
||||
private void initEvaluators() {
|
||||
newAlarmEvaluator = new NashornJsEvaluator(configuration.getNewAlarmExpression());
|
||||
clearAlarmEvaluator = new NashornJsEvaluator(configuration.getClearAlarmExpression());
|
||||
}
|
||||
|
||||
private void destroyEvaluators() {
|
||||
if (newAlarmEvaluator != null) {
|
||||
newAlarmEvaluator.destroy();
|
||||
}
|
||||
if (clearAlarmEvaluator != null) {
|
||||
clearAlarmEvaluator.destroy();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,38 @@
|
||||
/**
|
||||
* Copyright © 2016-2017 The Thingsboard Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.thingsboard.server.extensions.core.processor;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author Andrew Shvayka
|
||||
*/
|
||||
@Data
|
||||
public class AlarmProcessorConfiguration {
|
||||
|
||||
private String newAlarmExpression;
|
||||
private String clearAlarmExpression;
|
||||
|
||||
private String alarmTypeTemplate;
|
||||
private String alarmSeverity;
|
||||
private String alarmStatus;
|
||||
private boolean alarmPropagateFlag;
|
||||
|
||||
private String alarmDetailsTemplate;
|
||||
|
||||
}
|
||||
@ -29,7 +29,7 @@ import org.thingsboard.server.common.msg.session.FromDeviceMsg;
|
||||
import org.thingsboard.server.extensions.api.device.DeviceAttributes;
|
||||
import org.thingsboard.server.extensions.api.device.DeviceMetaData;
|
||||
import org.thingsboard.server.extensions.api.rules.RuleProcessingMetaData;
|
||||
import org.thingsboard.server.extensions.core.filter.DeviceAttributesFilter;
|
||||
import org.thingsboard.server.extensions.core.filter.NashornJsEvaluator;
|
||||
|
||||
import java.io.StringReader;
|
||||
import java.io.StringWriter;
|
||||
@ -70,9 +70,9 @@ public class VelocityUtils {
|
||||
context.put("date", new DateTool());
|
||||
DeviceAttributes deviceAttributes = deviceMetaData.getDeviceAttributes();
|
||||
|
||||
pushAttributes(context, deviceAttributes.getClientSideAttributes(), DeviceAttributesFilter.CLIENT_SIDE);
|
||||
pushAttributes(context, deviceAttributes.getServerSideAttributes(), DeviceAttributesFilter.SERVER_SIDE);
|
||||
pushAttributes(context, deviceAttributes.getServerSidePublicAttributes(), DeviceAttributesFilter.SHARED);
|
||||
pushAttributes(context, deviceAttributes.getClientSideAttributes(), NashornJsEvaluator.CLIENT_SIDE);
|
||||
pushAttributes(context, deviceAttributes.getServerSideAttributes(), NashornJsEvaluator.SERVER_SIDE);
|
||||
pushAttributes(context, deviceAttributes.getServerSidePublicAttributes(), NashornJsEvaluator.SHARED);
|
||||
|
||||
switch (payload.getMsgType()) {
|
||||
case POST_TELEMETRY_REQUEST:
|
||||
|
||||
113
extensions-core/src/main/resources/AlarmProcessorDescriptor.json
Normal file
113
extensions-core/src/main/resources/AlarmProcessorDescriptor.json
Normal file
@ -0,0 +1,113 @@
|
||||
{
|
||||
"schema": {
|
||||
"title": "Alarm Configuration",
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"newAlarmExpression": {
|
||||
"title": "Alarm trigger expression",
|
||||
"type": "string",
|
||||
"default": ""
|
||||
},
|
||||
"clearAlarmExpression": {
|
||||
"title": "Alarm clear expression",
|
||||
"type": "string",
|
||||
"default": ""
|
||||
},
|
||||
"alarmTypeTemplate": {
|
||||
"title": "Alarm type",
|
||||
"type": "string"
|
||||
},
|
||||
"alarmSeverity": {
|
||||
"title": "Severity",
|
||||
"type": "string"
|
||||
},
|
||||
"alarmStatus": {
|
||||
"title": "Status",
|
||||
"type": "string"
|
||||
},
|
||||
"alarmPropagateFlag": {
|
||||
"title": "Propagate Alarm",
|
||||
"type": "boolean"
|
||||
},
|
||||
"alarmDetailsTemplate": {
|
||||
"title": "Alarm details",
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"newAlarmExpression",
|
||||
"clearAlarmExpression",
|
||||
"alarmSeverity",
|
||||
"alarmStatus",
|
||||
"alarmTypeTemplate",
|
||||
"alarmDetailsTemplate"
|
||||
]
|
||||
},
|
||||
"form": [
|
||||
{
|
||||
"key": "newAlarmExpression",
|
||||
"type": "javascript"
|
||||
},
|
||||
{
|
||||
"key": "clearAlarmExpression",
|
||||
"type": "javascript"
|
||||
},
|
||||
{
|
||||
"key": "alarmSeverity",
|
||||
"type": "rc-select",
|
||||
"multiple": false,
|
||||
"items": [
|
||||
{
|
||||
"value": "CRITICAL",
|
||||
"label": "Critical"
|
||||
},
|
||||
{
|
||||
"value": "MAJOR",
|
||||
"label": "Major"
|
||||
},
|
||||
{
|
||||
"value": "MINOR",
|
||||
"label": "Minor"
|
||||
},
|
||||
{
|
||||
"value": "WARNING",
|
||||
"label": "Warning"
|
||||
},
|
||||
{
|
||||
"value": "INDETERMINATE",
|
||||
"label": "Indeterminate"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"key": "alarmStatus",
|
||||
"type": "rc-select",
|
||||
"multiple": false,
|
||||
"items": [
|
||||
{
|
||||
"value": "ACTIVE_UNACK",
|
||||
"label": "Active Unacknowledged"
|
||||
},
|
||||
{
|
||||
"value": "ACTIVE_ACK",
|
||||
"label": "Active Acknowledged"
|
||||
},
|
||||
{
|
||||
"value": "CLEARED_UNACK",
|
||||
"label": "Cleared Unacknowledged"
|
||||
},
|
||||
{
|
||||
"value": "CLEARED_ACK",
|
||||
"label": "Cleared Acknowledged"
|
||||
}
|
||||
]
|
||||
},
|
||||
"alarmTypeTemplate",
|
||||
"alarmPropagateFlag",
|
||||
{
|
||||
"key": "alarmDetailsTemplate",
|
||||
"type": "textarea",
|
||||
"rows": 5
|
||||
}
|
||||
]
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user