Create alarm node: process message asynchronously to avoid blocking dispatcher thread

This commit is contained in:
Dmytro Skarzhynets 2025-07-18 17:47:04 +03:00
parent aed4af9477
commit b323bdc5d0
No known key found for this signature in database
GPG Key ID: 2B51652F224037DF
8 changed files with 83 additions and 63 deletions

View File

@ -16,6 +16,7 @@
package org.thingsboard.server.service.telemetry;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@ -171,6 +172,11 @@ public class DefaultAlarmSubscriptionService extends AbstractSubscriptionService
return alarmService.findLatestActiveByOriginatorAndType(tenantId, originator, type);
}
@Override
public FluentFuture<Alarm> findLatestActiveByOriginatorAndTypeAsync(TenantId tenantId, EntityId originator, String type) {
return alarmService.findLatestActiveByOriginatorAndTypeAsync(tenantId, originator, type);
}
@Override
public Alarm findLatestByOriginatorAndType(TenantId tenantId, EntityId originator, String type) {
return alarmService.findLatestActiveByOriginatorAndType(tenantId, originator, type);

View File

@ -16,6 +16,7 @@
package org.thingsboard.server.dao.alarm;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.server.common.data.EntitySubtype;
import org.thingsboard.server.common.data.alarm.Alarm;
@ -105,6 +106,8 @@ public interface AlarmService extends EntityDaoService {
Alarm findLatestActiveByOriginatorAndType(TenantId tenantId, EntityId originator, String type);
FluentFuture<Alarm> findLatestActiveByOriginatorAndTypeAsync(TenantId tenantId, EntityId originator, String type);
PageData<AlarmData> findAlarmDataByQueryForEntities(TenantId tenantId,
AlarmDataQuery query, Collection<EntityId> orderedEntityIds);

View File

@ -16,6 +16,7 @@
package org.thingsboard.server.dao.alarm;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.server.common.data.EntitySubtype;
import org.thingsboard.server.common.data.alarm.Alarm;
@ -47,15 +48,14 @@ import java.util.List;
import java.util.Set;
import java.util.UUID;
/**
* Created by ashvayka on 11.05.17.
*/
public interface AlarmDao extends Dao<Alarm> {
Alarm findLatestByOriginatorAndType(TenantId tenantId, EntityId originator, String type);
Alarm findLatestActiveByOriginatorAndType(TenantId tenantId, EntityId originator, String type);
FluentFuture<Alarm> findLatestActiveByOriginatorAndTypeAsync(TenantId tenantId, EntityId originator, String type);
ListenableFuture<Alarm> findLatestByOriginatorAndTypeAsync(TenantId tenantId, EntityId originator, String type);
Alarm findAlarmById(TenantId tenantId, UUID key);

View File

@ -18,6 +18,7 @@ package org.thingsboard.server.dao.alarm;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Function;
import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -169,6 +170,11 @@ public class BaseAlarmService extends AbstractCachedEntityService<TenantId, Page
return alarmDao.findLatestActiveByOriginatorAndType(tenantId, originator, type);
}
@Override
public FluentFuture<Alarm> findLatestActiveByOriginatorAndTypeAsync(TenantId tenantId, EntityId originator, String type) {
return alarmDao.findLatestActiveByOriginatorAndTypeAsync(tenantId, originator, type);
}
@Override
public PageData<AlarmData> findAlarmDataByQueryForEntities(TenantId tenantId,
AlarmDataQuery query, Collection<EntityId> orderedEntityIds) {

View File

@ -16,6 +16,7 @@
package org.thingsboard.server.dao.sql.alarm;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@ -79,9 +80,6 @@ import static org.thingsboard.server.common.data.page.SortOrder.Direction.ASC;
import static org.thingsboard.server.dao.DaoUtil.convertTenantEntityTypesToDto;
import static org.thingsboard.server.dao.DaoUtil.toPageable;
/**
* Created by Valerii Sosliuk on 5/19/2017.
*/
@Slf4j
@Component
@SqlDao
@ -124,6 +122,11 @@ public class JpaAlarmDao extends JpaAbstractDao<AlarmEntity, Alarm> implements A
return latest.isEmpty() ? null : DaoUtil.getData(latest.get(0));
}
@Override
public FluentFuture<Alarm> findLatestActiveByOriginatorAndTypeAsync(TenantId tenantId, EntityId originator, String type) {
return FluentFuture.from(service.submit(() -> findLatestActiveByOriginatorAndType(tenantId, originator, type)));
}
@Override
public ListenableFuture<Alarm> findLatestByOriginatorAndTypeAsync(TenantId tenantId, EntityId originator, String type) {
return service.submit(() -> findLatestByOriginatorAndType(tenantId, originator, type));

View File

@ -16,6 +16,7 @@
package org.thingsboard.rule.engine.api;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.server.common.data.EntitySubtype;
@ -78,6 +79,8 @@ public interface RuleEngineAlarmService {
Alarm findLatestActiveByOriginatorAndType(TenantId tenantId, EntityId originator, String type);
FluentFuture<Alarm> findLatestActiveByOriginatorAndTypeAsync(TenantId tenantId, EntityId originator, String type);
Alarm findLatestByOriginatorAndType(TenantId tenantId, EntityId originator, String type);
AlarmInfo findAlarmInfoById(TenantId tenantId, AlarmId alarmId);

View File

@ -70,7 +70,6 @@ public class TbCreateAlarmNode extends TbAbstractAlarmNode<TbCreateAlarmNodeConf
}
}
@Override
protected TbCreateAlarmNodeConfiguration loadAlarmNodeConfig(TbNodeConfiguration configuration) throws TbNodeException {
TbCreateAlarmNodeConfiguration nodeConfiguration = TbNodeUtils.convert(configuration, TbCreateAlarmNodeConfiguration.class);
@ -96,12 +95,13 @@ public class TbCreateAlarmNode extends TbAbstractAlarmNode<TbCreateAlarmNodeConf
}
}
Alarm existingAlarm = ctx.getAlarmService().findLatestActiveByOriginatorAndType(ctx.getTenantId(), msg.getOriginator(), alarmType);
if (existingAlarm == null || existingAlarm.getStatus().isCleared()) {
return createNewAlarm(ctx, msg, msgAlarm);
} else {
return updateAlarm(ctx, msg, existingAlarm, msgAlarm);
}
return ctx.getAlarmService().findLatestActiveByOriginatorAndTypeAsync(ctx.getTenantId(), msg.getOriginator(), alarmType).transformAsync(existingAlarm -> {
if (existingAlarm == null || existingAlarm.getStatus().isCleared()) {
return createNewAlarm(ctx, msg, msgAlarm);
} else {
return updateAlarm(ctx, msg, existingAlarm, msgAlarm);
}
}, ctx.getDbCallbackExecutor());
}
private Alarm getAlarmFromMessage(TbContext ctx, TbMsg msg) throws IOException {

View File

@ -17,7 +17,7 @@ package org.thingsboard.rule.engine.action;
import com.datastax.oss.driver.api.core.uuid.Uuids;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.FluentFuture;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
@ -60,6 +60,8 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
@ -67,6 +69,7 @@ import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.then;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.never;
@ExtendWith(MockitoExtension.class)
@ -99,6 +102,8 @@ class TbCreateAlarmNodeTest {
dbExecutor = new TestDbCallbackExecutor();
metadata = new TbMsgMetaData();
config = new TbCreateAlarmNodeConfiguration();
lenient().when(ctxMock.getDbCallbackExecutor()).thenReturn(dbExecutor);
}
@Test
@ -212,10 +217,9 @@ class TbCreateAlarmNodeTest {
// mocks
given(ctxMock.getTenantId()).willReturn(tenantId);
given(ctxMock.getAlarmService()).willReturn(alarmServiceMock);
given(ctxMock.getDbCallbackExecutor()).willReturn(dbExecutor);
given(ctxMock.getSelfId()).willReturn(ruleNodeSelfId);
given(alarmServiceMock.findLatestActiveByOriginatorAndType(tenantId, msgOriginator, alarmType)).willReturn(existingAlarm);
given(alarmDetailsScriptMock.executeJsonAsync(incomingMsg)).willReturn(Futures.immediateFuture(alarmDetails));
given(alarmServiceMock.findLatestActiveByOriginatorAndTypeAsync(tenantId, msgOriginator, alarmType)).willReturn(FluentFuture.from(immediateFuture(existingAlarm)));
given(alarmDetailsScriptMock.executeJsonAsync(incomingMsg)).willReturn(immediateFuture(alarmDetails));
var apiCallResult = AlarmApiCallResult.builder()
.successful(true)
.created(true)
@ -230,11 +234,11 @@ class TbCreateAlarmNodeTest {
given(ctxMock.alarmActionMsg(expectedCreatedAlarmInfo, ruleNodeSelfId, TbMsgType.ENTITY_CREATED)).willReturn(alarmActionMsgMock);
given(ctxMock.transformMsg(any(TbMsg.class), any(TbMsgType.class), any(EntityId.class), any(TbMsgMetaData.class), anyString()))
.willAnswer(answer -> answer.getArgument(0, TbMsg.class).transform()
.type(answer.getArgument(1, TbMsgType.class))
.originator(answer.getArgument(2, EntityId.class))
.metaData(answer.getArgument(3, TbMsgMetaData.class))
.data(answer.getArgument(4, String.class))
.build()
.type(answer.getArgument(1, TbMsgType.class))
.originator(answer.getArgument(2, EntityId.class))
.metaData(answer.getArgument(3, TbMsgMetaData.class))
.data(answer.getArgument(4, String.class))
.build()
);
given(ctxMock.createScriptEngine(ScriptLanguage.TBEL, TbAbstractAlarmNodeConfiguration.ALARM_DETAILS_BUILD_TBEL_TEMPLATE)).willReturn(alarmDetailsScriptMock);
@ -384,10 +388,9 @@ class TbCreateAlarmNodeTest {
// mocks
given(ctxMock.getTenantId()).willReturn(tenantId);
given(ctxMock.getAlarmService()).willReturn(alarmServiceMock);
given(ctxMock.getDbCallbackExecutor()).willReturn(dbExecutor);
given(ctxMock.getSelfId()).willReturn(ruleNodeSelfId);
given(alarmServiceMock.findLatestActiveByOriginatorAndType(tenantId, msgOriginator, alarmType)).willReturn(existingClearedAlarm);
given(alarmDetailsScriptMock.executeJsonAsync(incomingMsg)).willReturn(Futures.immediateFuture(alarmDetails));
given(alarmServiceMock.findLatestActiveByOriginatorAndTypeAsync(tenantId, msgOriginator, alarmType)).willReturn(FluentFuture.from(immediateFuture(existingClearedAlarm)));
given(alarmDetailsScriptMock.executeJsonAsync(incomingMsg)).willReturn(immediateFuture(alarmDetails));
var apiCallResult = AlarmApiCallResult.builder()
.successful(true)
.created(true)
@ -402,11 +405,11 @@ class TbCreateAlarmNodeTest {
given(ctxMock.alarmActionMsg(expectedCreatedAlarmInfo, ruleNodeSelfId, TbMsgType.ENTITY_CREATED)).willReturn(alarmActionMsgMock);
given(ctxMock.transformMsg(any(TbMsg.class), any(TbMsgType.class), any(EntityId.class), any(TbMsgMetaData.class), anyString()))
.willAnswer(answer -> answer.getArgument(0, TbMsg.class).transform()
.type(answer.getArgument(1, TbMsgType.class))
.originator(answer.getArgument(2, EntityId.class))
.metaData(answer.getArgument(3, TbMsgMetaData.class))
.data(answer.getArgument(4, String.class))
.build()
.type(answer.getArgument(1, TbMsgType.class))
.originator(answer.getArgument(2, EntityId.class))
.metaData(answer.getArgument(3, TbMsgMetaData.class))
.data(answer.getArgument(4, String.class))
.build()
);
given(ctxMock.createScriptEngine(ScriptLanguage.JS, config.getAlarmDetailsBuildJs())).willReturn(alarmDetailsScriptMock);
@ -576,10 +579,9 @@ class TbCreateAlarmNodeTest {
// mocks
given(ctxMock.getTenantId()).willReturn(tenantId);
given(ctxMock.getAlarmService()).willReturn(alarmServiceMock);
given(ctxMock.getDbCallbackExecutor()).willReturn(dbExecutor);
given(ctxMock.getSelfId()).willReturn(ruleNodeSelfId);
given(alarmServiceMock.findLatestActiveByOriginatorAndType(tenantId, msgOriginator, alarmType)).willReturn(existingActiveAlarm);
given(alarmDetailsScriptMock.executeJsonAsync(any())).willReturn(Futures.immediateFuture(newAlarmDetails));
given(alarmServiceMock.findLatestActiveByOriginatorAndTypeAsync(tenantId, msgOriginator, alarmType)).willReturn(FluentFuture.from(immediateFuture(existingActiveAlarm)));
given(alarmDetailsScriptMock.executeJsonAsync(any())).willReturn(immediateFuture(newAlarmDetails));
doReturn(newEndTs).when(nodeSpy).currentTimeMillis();
var apiCallResult = AlarmApiCallResult.builder()
.successful(true)
@ -595,11 +597,11 @@ class TbCreateAlarmNodeTest {
given(ctxMock.alarmActionMsg(expectedUpdatedAlarmInfo, ruleNodeSelfId, TbMsgType.ENTITY_UPDATED)).willReturn(alarmActionMsgMock);
given(ctxMock.transformMsg(any(TbMsg.class), any(TbMsgType.class), any(EntityId.class), any(TbMsgMetaData.class), anyString()))
.willAnswer(answer -> answer.getArgument(0, TbMsg.class).transform()
.type(answer.getArgument(1, TbMsgType.class))
.originator(answer.getArgument(2, EntityId.class))
.metaData(answer.getArgument(3, TbMsgMetaData.class))
.data(answer.getArgument(4, String.class))
.build()
.type(answer.getArgument(1, TbMsgType.class))
.originator(answer.getArgument(2, EntityId.class))
.metaData(answer.getArgument(3, TbMsgMetaData.class))
.data(answer.getArgument(4, String.class))
.build()
);
given(ctxMock.createScriptEngine(ScriptLanguage.TBEL, config.getAlarmDetailsBuildTbel())).willReturn(alarmDetailsScriptMock);
@ -753,9 +755,8 @@ class TbCreateAlarmNodeTest {
// mocks
given(ctxMock.getTenantId()).willReturn(tenantId);
given(ctxMock.getAlarmService()).willReturn(alarmServiceMock);
given(ctxMock.getDbCallbackExecutor()).willReturn(dbExecutor);
given(ctxMock.getSelfId()).willReturn(ruleNodeSelfId);
given(alarmServiceMock.findLatestActiveByOriginatorAndType(tenantId, msgOriginator, alarmType)).willReturn(existingClearedAlarm);
given(alarmServiceMock.findLatestActiveByOriginatorAndTypeAsync(tenantId, msgOriginator, alarmType)).willReturn(FluentFuture.from(immediateFuture(existingClearedAlarm)));
var apiCallResult = AlarmApiCallResult.builder()
.successful(true)
.created(true)
@ -770,11 +771,11 @@ class TbCreateAlarmNodeTest {
given(ctxMock.alarmActionMsg(expectedCreatedAlarmInfo, ruleNodeSelfId, TbMsgType.ENTITY_CREATED)).willReturn(alarmActionMsgMock);
given(ctxMock.transformMsg(any(TbMsg.class), any(TbMsgType.class), any(EntityId.class), any(TbMsgMetaData.class), anyString()))
.willAnswer(answer -> answer.getArgument(0, TbMsg.class).transform()
.type(answer.getArgument(1, TbMsgType.class))
.originator(answer.getArgument(2, EntityId.class))
.metaData(answer.getArgument(3, TbMsgMetaData.class))
.data(answer.getArgument(4, String.class))
.build()
.type(answer.getArgument(1, TbMsgType.class))
.originator(answer.getArgument(2, EntityId.class))
.metaData(answer.getArgument(3, TbMsgMetaData.class))
.data(answer.getArgument(4, String.class))
.build()
);
given(ctxMock.createScriptEngine(ScriptLanguage.TBEL, config.getAlarmDetailsBuildTbel())).willReturn(alarmDetailsScriptMock);
@ -941,10 +942,9 @@ class TbCreateAlarmNodeTest {
// mocks
given(ctxMock.getTenantId()).willReturn(tenantId);
given(ctxMock.getAlarmService()).willReturn(alarmServiceMock);
given(ctxMock.getDbCallbackExecutor()).willReturn(dbExecutor);
given(ctxMock.getSelfId()).willReturn(ruleNodeSelfId);
given(alarmServiceMock.findLatestActiveByOriginatorAndType(tenantId, msgOriginator, alarmType)).willReturn(existingActiveAlarm);
given(alarmDetailsScriptMock.executeJsonAsync(any())).willReturn(Futures.immediateFuture(newAlarmDetails));
given(alarmServiceMock.findLatestActiveByOriginatorAndTypeAsync(tenantId, msgOriginator, alarmType)).willReturn(FluentFuture.from(immediateFuture(existingActiveAlarm)));
given(alarmDetailsScriptMock.executeJsonAsync(any())).willReturn(immediateFuture(newAlarmDetails));
doReturn(newEndTs).when(nodeSpy).currentTimeMillis();
var apiCallResult = AlarmApiCallResult.builder()
.successful(true)
@ -960,11 +960,11 @@ class TbCreateAlarmNodeTest {
given(ctxMock.alarmActionMsg(expectedUpdatedAlarmInfo, ruleNodeSelfId, TbMsgType.ENTITY_UPDATED)).willReturn(alarmActionMsgMock);
given(ctxMock.transformMsg(any(TbMsg.class), any(TbMsgType.class), any(EntityId.class), any(TbMsgMetaData.class), anyString()))
.willAnswer(answer -> answer.getArgument(0, TbMsg.class).transform()
.type(answer.getArgument(1, TbMsgType.class))
.originator(answer.getArgument(2, EntityId.class))
.metaData(answer.getArgument(3, TbMsgMetaData.class))
.data(answer.getArgument(4, String.class))
.build()
.type(answer.getArgument(1, TbMsgType.class))
.originator(answer.getArgument(2, EntityId.class))
.metaData(answer.getArgument(3, TbMsgMetaData.class))
.data(answer.getArgument(4, String.class))
.build()
);
given(ctxMock.createScriptEngine(ScriptLanguage.TBEL, config.getAlarmDetailsBuildTbel())).willReturn(alarmDetailsScriptMock);
@ -1125,10 +1125,9 @@ class TbCreateAlarmNodeTest {
// mocks
given(ctxMock.getTenantId()).willReturn(tenantId);
given(ctxMock.getAlarmService()).willReturn(alarmServiceMock);
given(ctxMock.getDbCallbackExecutor()).willReturn(dbExecutor);
given(ctxMock.getSelfId()).willReturn(ruleNodeSelfId);
given(alarmServiceMock.findLatestActiveByOriginatorAndType(tenantId, msgOriginator, alarmType)).willReturn(existingActiveAlarm);
given(alarmDetailsScriptMock.executeJsonAsync(any())).willReturn(Futures.immediateFuture(alarmDetails));
given(alarmServiceMock.findLatestActiveByOriginatorAndTypeAsync(tenantId, msgOriginator, alarmType)).willReturn(FluentFuture.from(immediateFuture(existingActiveAlarm)));
given(alarmDetailsScriptMock.executeJsonAsync(any())).willReturn(immediateFuture(alarmDetails));
doReturn(endTs).when(nodeSpy).currentTimeMillis();
var apiCallResult = AlarmApiCallResult.builder()
.successful(true)
@ -1144,11 +1143,11 @@ class TbCreateAlarmNodeTest {
given(ctxMock.alarmActionMsg(expectedUpdatedAlarmInfo, ruleNodeSelfId, TbMsgType.ENTITY_UPDATED)).willReturn(alarmActionMsgMock);
given(ctxMock.transformMsg(any(TbMsg.class), any(TbMsgType.class), any(EntityId.class), any(TbMsgMetaData.class), anyString()))
.willAnswer(answer -> answer.getArgument(0, TbMsg.class).transform()
.type(answer.getArgument(1, TbMsgType.class))
.originator(answer.getArgument(2, EntityId.class))
.metaData(answer.getArgument(3, TbMsgMetaData.class))
.data(answer.getArgument(4, String.class))
.build()
.type(answer.getArgument(1, TbMsgType.class))
.originator(answer.getArgument(2, EntityId.class))
.metaData(answer.getArgument(3, TbMsgMetaData.class))
.data(answer.getArgument(4, String.class))
.build()
);
given(ctxMock.createScriptEngine(ScriptLanguage.TBEL, config.getAlarmDetailsBuildTbel())).willReturn(alarmDetailsScriptMock);
@ -1216,11 +1215,11 @@ class TbCreateAlarmNodeTest {
given(ctxMock.getTenantId()).willReturn(tenantId);
given(ctxMock.getAlarmService()).willReturn(alarmServiceMock);
given(ctxMock.getDbCallbackExecutor()).willReturn(dbExecutor);
given(ctxMock.createScriptEngine(ScriptLanguage.TBEL, config.getAlarmDetailsBuildTbel())).willReturn(alarmDetailsScriptMock);
given(alarmServiceMock.findLatestActiveByOriginatorAndTypeAsync(tenantId, msgOriginator, config.getAlarmType())).willReturn(FluentFuture.from(immediateFuture(null)));
var expectedException = new ExecutionException("Failed to execute script.", new RuntimeException("Something went wrong!"));
given(alarmDetailsScriptMock.executeJsonAsync(incomingMsg)).willReturn(Futures.immediateFailedFuture(expectedException));
given(alarmDetailsScriptMock.executeJsonAsync(incomingMsg)).willReturn(immediateFailedFuture(expectedException));
nodeSpy.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));