Alarm node changed to read alarm config from message (#1396)
* Alarm node changed to read alarm config from message * Alarm node changed to read alarm config from message * Alarm node changed to read alarm config from message
This commit is contained in:
		
							parent
							
								
									5aced98ce3
								
							
						
					
					
						commit
						9d23930b19
					
				@ -16,6 +16,7 @@
 | 
			
		||||
package org.thingsboard.rule.engine.action;
 | 
			
		||||
 | 
			
		||||
import com.fasterxml.jackson.databind.JsonNode;
 | 
			
		||||
import com.fasterxml.jackson.databind.ObjectMapper;
 | 
			
		||||
import com.google.common.base.Function;
 | 
			
		||||
import com.google.common.util.concurrent.Futures;
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
@ -31,6 +32,8 @@ import org.thingsboard.server.common.data.id.TenantId;
 | 
			
		||||
import org.thingsboard.server.common.data.plugin.ComponentType;
 | 
			
		||||
import org.thingsboard.server.common.msg.TbMsg;
 | 
			
		||||
 | 
			
		||||
import java.io.IOException;
 | 
			
		||||
 | 
			
		||||
@Slf4j
 | 
			
		||||
@RuleNode(
 | 
			
		||||
        type = ComponentType.ACTION,
 | 
			
		||||
@ -49,6 +52,8 @@ import org.thingsboard.server.common.msg.TbMsg;
 | 
			
		||||
)
 | 
			
		||||
public class TbCreateAlarmNode extends TbAbstractAlarmNode<TbCreateAlarmNodeConfiguration> {
 | 
			
		||||
 | 
			
		||||
    private static ObjectMapper mapper = new ObjectMapper();
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    protected TbCreateAlarmNodeConfiguration loadAlarmNodeConfig(TbNodeConfiguration configuration) throws TbNodeException {
 | 
			
		||||
        return TbNodeUtils.convert(configuration, TbCreateAlarmNodeConfiguration.class);
 | 
			
		||||
@ -56,32 +61,59 @@ public class TbCreateAlarmNode extends TbAbstractAlarmNode<TbCreateAlarmNodeConf
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    protected ListenableFuture<AlarmResult> processAlarm(TbContext ctx, TbMsg msg) {
 | 
			
		||||
        ListenableFuture<Alarm> latest = ctx.getAlarmService().findLatestByOriginatorAndType(ctx.getTenantId(), msg.getOriginator(), config.getAlarmType());
 | 
			
		||||
        return Futures.transformAsync(latest, a -> {
 | 
			
		||||
            if (a == null || a.getStatus().isCleared()) {
 | 
			
		||||
                return createNewAlarm(ctx, msg);
 | 
			
		||||
        String alarmType;
 | 
			
		||||
        final Alarm msgAlarm;
 | 
			
		||||
 | 
			
		||||
        if (!config.isUseMessageAlarmData()) {
 | 
			
		||||
            alarmType = config.getAlarmType();
 | 
			
		||||
            msgAlarm = null;
 | 
			
		||||
        } else {
 | 
			
		||||
            try {
 | 
			
		||||
                msgAlarm = mapper.readValue(msg.getData(), Alarm.class);
 | 
			
		||||
                msgAlarm.setTenantId(ctx.getTenantId());
 | 
			
		||||
                alarmType = msgAlarm.getType();
 | 
			
		||||
            } catch (IOException e) {
 | 
			
		||||
                ctx.tellFailure(msg, e);
 | 
			
		||||
                return null;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        ListenableFuture<Alarm> latest = ctx.getAlarmService().findLatestByOriginatorAndType(ctx.getTenantId(), msg.getOriginator(), alarmType);
 | 
			
		||||
        return Futures.transformAsync(latest, existingAlarm -> {
 | 
			
		||||
            if (existingAlarm == null || existingAlarm.getStatus().isCleared()) {
 | 
			
		||||
                return createNewAlarm(ctx, msg, msgAlarm);
 | 
			
		||||
            } else {
 | 
			
		||||
                return updateAlarm(ctx, msg, a);
 | 
			
		||||
                return updateAlarm(ctx, msg, existingAlarm, msgAlarm);
 | 
			
		||||
            }
 | 
			
		||||
        }, ctx.getDbCallbackExecutor());
 | 
			
		||||
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private ListenableFuture<AlarmResult> createNewAlarm(TbContext ctx, TbMsg msg) {
 | 
			
		||||
        ListenableFuture<Alarm> asyncAlarm = Futures.transform(buildAlarmDetails(ctx, msg, null),
 | 
			
		||||
                details -> buildAlarm(msg, details, ctx.getTenantId()));
 | 
			
		||||
    private ListenableFuture<AlarmResult> createNewAlarm(TbContext ctx, TbMsg msg, Alarm msgAlarm) {
 | 
			
		||||
        ListenableFuture<Alarm> asyncAlarm;
 | 
			
		||||
        if (msgAlarm != null ) {
 | 
			
		||||
            asyncAlarm = Futures.immediateCheckedFuture(msgAlarm);
 | 
			
		||||
        } else {
 | 
			
		||||
            asyncAlarm = Futures.transform(buildAlarmDetails(ctx, msg, null),
 | 
			
		||||
                    details -> buildAlarm(msg, details, ctx.getTenantId()));
 | 
			
		||||
        }
 | 
			
		||||
        ListenableFuture<Alarm> asyncCreated = Futures.transform(asyncAlarm,
 | 
			
		||||
                alarm -> ctx.getAlarmService().createOrUpdateAlarm(alarm), ctx.getDbCallbackExecutor());
 | 
			
		||||
        return Futures.transform(asyncCreated, alarm -> new AlarmResult(true, false, false, alarm));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private ListenableFuture<AlarmResult> updateAlarm(TbContext ctx, TbMsg msg, Alarm alarm) {
 | 
			
		||||
        ListenableFuture<Alarm> asyncUpdated = Futures.transform(buildAlarmDetails(ctx, msg, alarm.getDetails()), (Function<JsonNode, Alarm>) details -> {
 | 
			
		||||
            alarm.setSeverity(config.getSeverity());
 | 
			
		||||
            alarm.setPropagate(config.isPropagate());
 | 
			
		||||
            alarm.setDetails(details);
 | 
			
		||||
            alarm.setEndTs(System.currentTimeMillis());
 | 
			
		||||
            return ctx.getAlarmService().createOrUpdateAlarm(alarm);
 | 
			
		||||
    private ListenableFuture<AlarmResult> updateAlarm(TbContext ctx, TbMsg msg, Alarm existingAlarm, Alarm msgAlarm) {
 | 
			
		||||
        ListenableFuture<Alarm> asyncUpdated = Futures.transform(buildAlarmDetails(ctx, msg, existingAlarm.getDetails()), (Function<JsonNode, Alarm>) details -> {
 | 
			
		||||
            if (msgAlarm != null) {
 | 
			
		||||
                existingAlarm.setSeverity(msgAlarm.getSeverity());
 | 
			
		||||
                existingAlarm.setPropagate(msgAlarm.isPropagate());
 | 
			
		||||
            } else {
 | 
			
		||||
                existingAlarm.setSeverity(config.getSeverity());
 | 
			
		||||
                existingAlarm.setPropagate(config.isPropagate());
 | 
			
		||||
            }
 | 
			
		||||
            existingAlarm.setDetails(details);
 | 
			
		||||
            existingAlarm.setEndTs(System.currentTimeMillis());
 | 
			
		||||
            return ctx.getAlarmService().createOrUpdateAlarm(existingAlarm);
 | 
			
		||||
        }, ctx.getDbCallbackExecutor());
 | 
			
		||||
 | 
			
		||||
        return Futures.transform(asyncUpdated, a -> new AlarmResult(false, true, false, a));
 | 
			
		||||
 | 
			
		||||
@ -24,6 +24,7 @@ public class TbCreateAlarmNodeConfiguration extends TbAbstractAlarmNodeConfigura
 | 
			
		||||
 | 
			
		||||
    private AlarmSeverity severity;
 | 
			
		||||
    private boolean propagate;
 | 
			
		||||
    private boolean useMessageAlarmData;
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public TbCreateAlarmNodeConfiguration defaultConfiguration() {
 | 
			
		||||
@ -36,6 +37,7 @@ public class TbCreateAlarmNodeConfiguration extends TbAbstractAlarmNodeConfigura
 | 
			
		||||
        configuration.setAlarmType("General Alarm");
 | 
			
		||||
        configuration.setSeverity(AlarmSeverity.CRITICAL);
 | 
			
		||||
        configuration.setPropagate(false);
 | 
			
		||||
        configuration.setUseMessageAlarmData(false);
 | 
			
		||||
        return configuration;
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
										
											
												File diff suppressed because one or more lines are too long
											
										
									
								
							
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user