Clear alarm node: async processing
This commit is contained in:
parent
6b810703df
commit
171e824684
@ -16,9 +16,7 @@
|
||||
package org.thingsboard.rule.engine.action;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.thingsboard.rule.engine.api.RuleNode;
|
||||
import org.thingsboard.rule.engine.api.TbContext;
|
||||
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
|
||||
@ -31,18 +29,21 @@ import org.thingsboard.server.common.data.id.AlarmId;
|
||||
import org.thingsboard.server.common.data.plugin.ComponentType;
|
||||
import org.thingsboard.server.common.msg.TbMsg;
|
||||
|
||||
@Slf4j
|
||||
import static com.google.common.util.concurrent.Futures.immediateFuture;
|
||||
import static com.google.common.util.concurrent.Futures.transform;
|
||||
import static com.google.common.util.concurrent.Futures.transformAsync;
|
||||
|
||||
@RuleNode(
|
||||
type = ComponentType.ACTION,
|
||||
name = "clear alarm", relationTypes = {"Cleared", "False"},
|
||||
configClazz = TbClearAlarmNodeConfiguration.class,
|
||||
nodeDescription = "Clear Alarm",
|
||||
nodeDetails =
|
||||
"Details - JS function that creates JSON object based on incoming message. This object will be added into Alarm.details field.\n" +
|
||||
"Node output:\n" +
|
||||
"If alarm was not cleared, original message is returned. Otherwise new Message returned with type 'ALARM', Alarm object in 'msg' property and 'metadata' will contains 'isClearedAlarm' property. " +
|
||||
"Message payload can be accessed via <code>msg</code> property. For example <code>'temperature = ' + msg.temperature ;</code>. " +
|
||||
"Message metadata can be accessed via <code>metadata</code> property. For example <code>'name = ' + metadata.customerName;</code>.",
|
||||
nodeDetails = """
|
||||
Details - JS function that creates JSON object based on incoming message. This object will be added into Alarm.details field.
|
||||
Node output:
|
||||
If alarm was not cleared, original message is returned. Otherwise new Message returned with type 'ALARM', Alarm object in 'msg' property and 'metadata' will contains 'isClearedAlarm' property.
|
||||
Message payload can be accessed via <code>msg</code> property. For example <code>'temperature = ' + msg.temperature ;</code>.
|
||||
Message metadata can be accessed via <code>metadata</code> property. For example <code>'name = ' + metadata.customerName;</code>.""",
|
||||
configDirective = "tbActionNodeClearAlarmConfig",
|
||||
icon = "notifications_off"
|
||||
)
|
||||
@ -55,22 +56,26 @@ public class TbClearAlarmNode extends TbAbstractAlarmNode<TbClearAlarmNodeConfig
|
||||
|
||||
@Override
|
||||
protected ListenableFuture<TbAlarmResult> processAlarm(TbContext ctx, TbMsg msg) {
|
||||
String alarmType = TbNodeUtils.processPattern(this.config.getAlarmType(), msg);
|
||||
Alarm alarm;
|
||||
if (msg.getOriginator().getEntityType().equals(EntityType.ALARM)) {
|
||||
alarm = ctx.getAlarmService().findAlarmById(ctx.getTenantId(), new AlarmId(msg.getOriginator().getId()));
|
||||
String alarmType = TbNodeUtils.processPattern(config.getAlarmType(), msg);
|
||||
|
||||
ListenableFuture<Alarm> alarmFuture;
|
||||
if (msg.getOriginator().getEntityType() == EntityType.ALARM) {
|
||||
alarmFuture = ctx.getAlarmService().findAlarmByIdAsync(ctx.getTenantId(), new AlarmId(msg.getOriginator().getId()));
|
||||
} else {
|
||||
alarm = ctx.getAlarmService().findLatestActiveByOriginatorAndType(ctx.getTenantId(), msg.getOriginator(), alarmType);
|
||||
alarmFuture = ctx.getAlarmService().findLatestActiveByOriginatorAndTypeAsync(ctx.getTenantId(), msg.getOriginator(), alarmType);
|
||||
}
|
||||
if (alarm != null && !alarm.getStatus().isCleared()) {
|
||||
return clearAlarm(ctx, msg, alarm);
|
||||
}
|
||||
return Futures.immediateFuture(new TbAlarmResult(false, false, false, null));
|
||||
|
||||
return transformAsync(alarmFuture, alarm -> {
|
||||
if (alarm != null && !alarm.getStatus().isCleared()) {
|
||||
return clearAlarmAsync(ctx, msg, alarm);
|
||||
}
|
||||
return immediateFuture(new TbAlarmResult(false, false, false, null));
|
||||
}, ctx.getDbCallbackExecutor());
|
||||
}
|
||||
|
||||
private ListenableFuture<TbAlarmResult> clearAlarm(TbContext ctx, TbMsg msg, Alarm alarm) {
|
||||
private ListenableFuture<TbAlarmResult> clearAlarmAsync(TbContext ctx, TbMsg msg, Alarm alarm) {
|
||||
ListenableFuture<JsonNode> asyncDetails = buildAlarmDetails(msg, alarm.getDetails());
|
||||
return Futures.transform(asyncDetails, details -> {
|
||||
return transform(asyncDetails, details -> {
|
||||
AlarmApiCallResult result = ctx.getAlarmService().clearAlarm(ctx.getTenantId(), alarm.getId(), System.currentTimeMillis(), details);
|
||||
if (result.isSuccessful()) {
|
||||
return new TbAlarmResult(false, false, result.isCleared(), result.getAlarm());
|
||||
@ -79,4 +84,5 @@ public class TbClearAlarmNode extends TbAbstractAlarmNode<TbClearAlarmNodeConfig
|
||||
}
|
||||
}, ctx.getDbCallbackExecutor());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -17,7 +17,7 @@ package org.thingsboard.rule.engine.action;
|
||||
|
||||
import com.datastax.oss.driver.api.core.uuid.Uuids;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.FluentFuture;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
@ -49,6 +49,7 @@ import org.thingsboard.server.common.msg.TbMsgMetaData;
|
||||
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import static com.google.common.util.concurrent.Futures.immediateFuture;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyLong;
|
||||
@ -112,8 +113,8 @@ class TbClearAlarmNodeTest {
|
||||
.endTs(oldEndDate)
|
||||
.build();
|
||||
|
||||
when(alarmDetailsScriptMock.executeJsonAsync(msg)).thenReturn(Futures.immediateFuture(null));
|
||||
when(alarmServiceMock.findLatestActiveByOriginatorAndType(tenantId, msgOriginator, "SomeType")).thenReturn(activeAlarm);
|
||||
when(alarmDetailsScriptMock.executeJsonAsync(msg)).thenReturn(immediateFuture(null));
|
||||
when(alarmServiceMock.findLatestActiveByOriginatorAndTypeAsync(tenantId, msgOriginator, "SomeType")).thenReturn(FluentFuture.from(immediateFuture(activeAlarm)));
|
||||
when(alarmServiceMock.clearAlarm(eq(activeAlarm.getTenantId()), eq(activeAlarm.getId()), anyLong(), nullable(JsonNode.class)))
|
||||
.thenReturn(AlarmApiCallResult.builder()
|
||||
.successful(true)
|
||||
@ -172,8 +173,8 @@ class TbClearAlarmNodeTest {
|
||||
.build();
|
||||
expectedAlarm.setId(id);
|
||||
|
||||
when(alarmDetailsScriptMock.executeJsonAsync(msg)).thenReturn(Futures.immediateFuture(null));
|
||||
when(alarmServiceMock.findAlarmById(tenantId, id)).thenReturn(activeAlarm);
|
||||
when(alarmDetailsScriptMock.executeJsonAsync(msg)).thenReturn(immediateFuture(null));
|
||||
when(alarmServiceMock.findAlarmByIdAsync(tenantId, id)).thenReturn(immediateFuture(activeAlarm));
|
||||
when(alarmServiceMock.clearAlarm(eq(activeAlarm.getTenantId()), eq(activeAlarm.getId()), anyLong(), nullable(JsonNode.class)))
|
||||
.thenReturn(AlarmApiCallResult.builder()
|
||||
.successful(true)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user