Merge pull request #13987 from dskarzh/rule-node/clear-alarm/async-processing
Clear alarm node: async processing
This commit is contained in:
		
						commit
						3a7d1957f9
					
				@ -16,9 +16,7 @@
 | 
				
			|||||||
package org.thingsboard.rule.engine.action;
 | 
					package org.thingsboard.rule.engine.action;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import com.fasterxml.jackson.databind.JsonNode;
 | 
					import com.fasterxml.jackson.databind.JsonNode;
 | 
				
			||||||
import com.google.common.util.concurrent.Futures;
 | 
					 | 
				
			||||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
					import com.google.common.util.concurrent.ListenableFuture;
 | 
				
			||||||
import lombok.extern.slf4j.Slf4j;
 | 
					 | 
				
			||||||
import org.thingsboard.rule.engine.api.RuleNode;
 | 
					import org.thingsboard.rule.engine.api.RuleNode;
 | 
				
			||||||
import org.thingsboard.rule.engine.api.TbContext;
 | 
					import org.thingsboard.rule.engine.api.TbContext;
 | 
				
			||||||
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
 | 
					import org.thingsboard.rule.engine.api.TbNodeConfiguration;
 | 
				
			||||||
@ -31,18 +29,21 @@ import org.thingsboard.server.common.data.id.AlarmId;
 | 
				
			|||||||
import org.thingsboard.server.common.data.plugin.ComponentType;
 | 
					import org.thingsboard.server.common.data.plugin.ComponentType;
 | 
				
			||||||
import org.thingsboard.server.common.msg.TbMsg;
 | 
					import org.thingsboard.server.common.msg.TbMsg;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@Slf4j
 | 
					import static com.google.common.util.concurrent.Futures.immediateFuture;
 | 
				
			||||||
 | 
					import static com.google.common.util.concurrent.Futures.transform;
 | 
				
			||||||
 | 
					import static com.google.common.util.concurrent.Futures.transformAsync;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@RuleNode(
 | 
					@RuleNode(
 | 
				
			||||||
        type = ComponentType.ACTION,
 | 
					        type = ComponentType.ACTION,
 | 
				
			||||||
        name = "clear alarm", relationTypes = {"Cleared", "False"},
 | 
					        name = "clear alarm", relationTypes = {"Cleared", "False"},
 | 
				
			||||||
        configClazz = TbClearAlarmNodeConfiguration.class,
 | 
					        configClazz = TbClearAlarmNodeConfiguration.class,
 | 
				
			||||||
        nodeDescription = "Clear Alarm",
 | 
					        nodeDescription = "Clear Alarm",
 | 
				
			||||||
        nodeDetails =
 | 
					        nodeDetails = """
 | 
				
			||||||
                "Details - JS function that creates JSON object based on incoming message. This object will be added into Alarm.details field.\n" +
 | 
					                Details - JS function that creates JSON object based on incoming message. This object will be added into Alarm.details field.
 | 
				
			||||||
                        "Node output:\n" +
 | 
					                Node output:
 | 
				
			||||||
                        "If alarm was not cleared, original message is returned. Otherwise new Message returned with type 'ALARM', Alarm object in 'msg' property and 'metadata' will contains 'isClearedAlarm' property. " +
 | 
					                If alarm was not cleared, original message is returned. Otherwise new Message returned with type 'ALARM', Alarm object in 'msg' property and 'metadata' will contains 'isClearedAlarm' property.
 | 
				
			||||||
                        "Message payload can be accessed via <code>msg</code> property. For example <code>'temperature = ' + msg.temperature ;</code>. " +
 | 
					                Message payload can be accessed via <code>msg</code> property. For example <code>'temperature = ' + msg.temperature ;</code>.
 | 
				
			||||||
                        "Message metadata can be accessed via <code>metadata</code> property. For example <code>'name = ' + metadata.customerName;</code>.",
 | 
					                Message metadata can be accessed via <code>metadata</code> property. For example <code>'name = ' + metadata.customerName;</code>.""",
 | 
				
			||||||
        configDirective = "tbActionNodeClearAlarmConfig",
 | 
					        configDirective = "tbActionNodeClearAlarmConfig",
 | 
				
			||||||
        icon = "notifications_off"
 | 
					        icon = "notifications_off"
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
@ -55,22 +56,26 @@ public class TbClearAlarmNode extends TbAbstractAlarmNode<TbClearAlarmNodeConfig
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    @Override
 | 
					    @Override
 | 
				
			||||||
    protected ListenableFuture<TbAlarmResult> processAlarm(TbContext ctx, TbMsg msg) {
 | 
					    protected ListenableFuture<TbAlarmResult> processAlarm(TbContext ctx, TbMsg msg) {
 | 
				
			||||||
        String alarmType = TbNodeUtils.processPattern(this.config.getAlarmType(), msg);
 | 
					        String alarmType = TbNodeUtils.processPattern(config.getAlarmType(), msg);
 | 
				
			||||||
        Alarm alarm;
 | 
					
 | 
				
			||||||
        if (msg.getOriginator().getEntityType().equals(EntityType.ALARM)) {
 | 
					        ListenableFuture<Alarm> alarmFuture;
 | 
				
			||||||
            alarm = ctx.getAlarmService().findAlarmById(ctx.getTenantId(), new AlarmId(msg.getOriginator().getId()));
 | 
					        if (msg.getOriginator().getEntityType() == EntityType.ALARM) {
 | 
				
			||||||
 | 
					            alarmFuture = ctx.getAlarmService().findAlarmByIdAsync(ctx.getTenantId(), new AlarmId(msg.getOriginator().getId()));
 | 
				
			||||||
        } else {
 | 
					        } else {
 | 
				
			||||||
            alarm = ctx.getAlarmService().findLatestActiveByOriginatorAndType(ctx.getTenantId(), msg.getOriginator(), alarmType);
 | 
					            alarmFuture = ctx.getAlarmService().findLatestActiveByOriginatorAndTypeAsync(ctx.getTenantId(), msg.getOriginator(), alarmType);
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        if (alarm != null && !alarm.getStatus().isCleared()) {
 | 
					
 | 
				
			||||||
            return clearAlarm(ctx, msg, alarm);
 | 
					        return transformAsync(alarmFuture, alarm -> {
 | 
				
			||||||
        }
 | 
					            if (alarm != null && !alarm.getStatus().isCleared()) {
 | 
				
			||||||
        return Futures.immediateFuture(new TbAlarmResult(false, false, false, null));
 | 
					                return clearAlarmAsync(ctx, msg, alarm);
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					            return immediateFuture(new TbAlarmResult(false, false, false, null));
 | 
				
			||||||
 | 
					        }, ctx.getDbCallbackExecutor());
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    private ListenableFuture<TbAlarmResult> clearAlarm(TbContext ctx, TbMsg msg, Alarm alarm) {
 | 
					    private ListenableFuture<TbAlarmResult> clearAlarmAsync(TbContext ctx, TbMsg msg, Alarm alarm) {
 | 
				
			||||||
        ListenableFuture<JsonNode> asyncDetails = buildAlarmDetails(msg, alarm.getDetails());
 | 
					        ListenableFuture<JsonNode> asyncDetails = buildAlarmDetails(msg, alarm.getDetails());
 | 
				
			||||||
        return Futures.transform(asyncDetails, details -> {
 | 
					        return transform(asyncDetails, details -> {
 | 
				
			||||||
            AlarmApiCallResult result = ctx.getAlarmService().clearAlarm(ctx.getTenantId(), alarm.getId(), System.currentTimeMillis(), details);
 | 
					            AlarmApiCallResult result = ctx.getAlarmService().clearAlarm(ctx.getTenantId(), alarm.getId(), System.currentTimeMillis(), details);
 | 
				
			||||||
            if (result.isSuccessful()) {
 | 
					            if (result.isSuccessful()) {
 | 
				
			||||||
                return new TbAlarmResult(false, false, result.isCleared(), result.getAlarm());
 | 
					                return new TbAlarmResult(false, false, result.isCleared(), result.getAlarm());
 | 
				
			||||||
@ -79,4 +84,5 @@ public class TbClearAlarmNode extends TbAbstractAlarmNode<TbClearAlarmNodeConfig
 | 
				
			|||||||
            }
 | 
					            }
 | 
				
			||||||
        }, ctx.getDbCallbackExecutor());
 | 
					        }, ctx.getDbCallbackExecutor());
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
@ -17,7 +17,7 @@ package org.thingsboard.rule.engine.action;
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
import com.datastax.oss.driver.api.core.uuid.Uuids;
 | 
					import com.datastax.oss.driver.api.core.uuid.Uuids;
 | 
				
			||||||
import com.fasterxml.jackson.databind.JsonNode;
 | 
					import com.fasterxml.jackson.databind.JsonNode;
 | 
				
			||||||
import com.google.common.util.concurrent.Futures;
 | 
					import com.google.common.util.concurrent.FluentFuture;
 | 
				
			||||||
import org.junit.jupiter.api.BeforeEach;
 | 
					import org.junit.jupiter.api.BeforeEach;
 | 
				
			||||||
import org.junit.jupiter.api.Test;
 | 
					import org.junit.jupiter.api.Test;
 | 
				
			||||||
import org.junit.jupiter.api.extension.ExtendWith;
 | 
					import org.junit.jupiter.api.extension.ExtendWith;
 | 
				
			||||||
@ -49,6 +49,7 @@ import org.thingsboard.server.common.msg.TbMsgMetaData;
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
import java.util.function.Consumer;
 | 
					import java.util.function.Consumer;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import static com.google.common.util.concurrent.Futures.immediateFuture;
 | 
				
			||||||
import static org.assertj.core.api.Assertions.assertThat;
 | 
					import static org.assertj.core.api.Assertions.assertThat;
 | 
				
			||||||
import static org.mockito.ArgumentMatchers.any;
 | 
					import static org.mockito.ArgumentMatchers.any;
 | 
				
			||||||
import static org.mockito.ArgumentMatchers.anyLong;
 | 
					import static org.mockito.ArgumentMatchers.anyLong;
 | 
				
			||||||
@ -112,8 +113,8 @@ class TbClearAlarmNodeTest {
 | 
				
			|||||||
                .endTs(oldEndDate)
 | 
					                .endTs(oldEndDate)
 | 
				
			||||||
                .build();
 | 
					                .build();
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        when(alarmDetailsScriptMock.executeJsonAsync(msg)).thenReturn(Futures.immediateFuture(null));
 | 
					        when(alarmDetailsScriptMock.executeJsonAsync(msg)).thenReturn(immediateFuture(null));
 | 
				
			||||||
        when(alarmServiceMock.findLatestActiveByOriginatorAndType(tenantId, msgOriginator, "SomeType")).thenReturn(activeAlarm);
 | 
					        when(alarmServiceMock.findLatestActiveByOriginatorAndTypeAsync(tenantId, msgOriginator, "SomeType")).thenReturn(FluentFuture.from(immediateFuture(activeAlarm)));
 | 
				
			||||||
        when(alarmServiceMock.clearAlarm(eq(activeAlarm.getTenantId()), eq(activeAlarm.getId()), anyLong(), nullable(JsonNode.class)))
 | 
					        when(alarmServiceMock.clearAlarm(eq(activeAlarm.getTenantId()), eq(activeAlarm.getId()), anyLong(), nullable(JsonNode.class)))
 | 
				
			||||||
                .thenReturn(AlarmApiCallResult.builder()
 | 
					                .thenReturn(AlarmApiCallResult.builder()
 | 
				
			||||||
                        .successful(true)
 | 
					                        .successful(true)
 | 
				
			||||||
@ -172,8 +173,8 @@ class TbClearAlarmNodeTest {
 | 
				
			|||||||
                .build();
 | 
					                .build();
 | 
				
			||||||
        expectedAlarm.setId(id);
 | 
					        expectedAlarm.setId(id);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        when(alarmDetailsScriptMock.executeJsonAsync(msg)).thenReturn(Futures.immediateFuture(null));
 | 
					        when(alarmDetailsScriptMock.executeJsonAsync(msg)).thenReturn(immediateFuture(null));
 | 
				
			||||||
        when(alarmServiceMock.findAlarmById(tenantId, id)).thenReturn(activeAlarm);
 | 
					        when(alarmServiceMock.findAlarmByIdAsync(tenantId, id)).thenReturn(immediateFuture(activeAlarm));
 | 
				
			||||||
        when(alarmServiceMock.clearAlarm(eq(activeAlarm.getTenantId()), eq(activeAlarm.getId()), anyLong(), nullable(JsonNode.class)))
 | 
					        when(alarmServiceMock.clearAlarm(eq(activeAlarm.getTenantId()), eq(activeAlarm.getId()), anyLong(), nullable(JsonNode.class)))
 | 
				
			||||||
                .thenReturn(AlarmApiCallResult.builder()
 | 
					                .thenReturn(AlarmApiCallResult.builder()
 | 
				
			||||||
                        .successful(true)
 | 
					                        .successful(true)
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user