Merge pull request #13741 from dskarzh/create-alarm-node-async-processing
Create alarm node: process message asynchronously to avoid blocking dispatcher thread
This commit is contained in:
		
						commit
						2637963c08
					
				@ -16,6 +16,7 @@
 | 
			
		||||
package org.thingsboard.server.service.telemetry;
 | 
			
		||||
 | 
			
		||||
import com.fasterxml.jackson.databind.JsonNode;
 | 
			
		||||
import com.google.common.util.concurrent.FluentFuture;
 | 
			
		||||
import com.google.common.util.concurrent.FutureCallback;
 | 
			
		||||
import com.google.common.util.concurrent.Futures;
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
@ -171,6 +172,11 @@ public class DefaultAlarmSubscriptionService extends AbstractSubscriptionService
 | 
			
		||||
        return alarmService.findLatestActiveByOriginatorAndType(tenantId, originator, type);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public FluentFuture<Alarm> findLatestActiveByOriginatorAndTypeAsync(TenantId tenantId, EntityId originator, String type) {
 | 
			
		||||
        return alarmService.findLatestActiveByOriginatorAndTypeAsync(tenantId, originator, type);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public Alarm findLatestByOriginatorAndType(TenantId tenantId, EntityId originator, String type) {
 | 
			
		||||
        return alarmService.findLatestActiveByOriginatorAndType(tenantId, originator, type);
 | 
			
		||||
 | 
			
		||||
@ -16,6 +16,7 @@
 | 
			
		||||
package org.thingsboard.server.dao.alarm;
 | 
			
		||||
 | 
			
		||||
import com.fasterxml.jackson.databind.JsonNode;
 | 
			
		||||
import com.google.common.util.concurrent.FluentFuture;
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import org.thingsboard.server.common.data.EntitySubtype;
 | 
			
		||||
import org.thingsboard.server.common.data.alarm.Alarm;
 | 
			
		||||
@ -105,6 +106,8 @@ public interface AlarmService extends EntityDaoService {
 | 
			
		||||
 | 
			
		||||
    Alarm findLatestActiveByOriginatorAndType(TenantId tenantId, EntityId originator, String type);
 | 
			
		||||
 | 
			
		||||
    FluentFuture<Alarm> findLatestActiveByOriginatorAndTypeAsync(TenantId tenantId, EntityId originator, String type);
 | 
			
		||||
 | 
			
		||||
    PageData<AlarmData> findAlarmDataByQueryForEntities(TenantId tenantId,
 | 
			
		||||
                                                        AlarmDataQuery query, Collection<EntityId> orderedEntityIds);
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -16,6 +16,7 @@
 | 
			
		||||
package org.thingsboard.server.dao.alarm;
 | 
			
		||||
 | 
			
		||||
import com.fasterxml.jackson.databind.JsonNode;
 | 
			
		||||
import com.google.common.util.concurrent.FluentFuture;
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import org.thingsboard.server.common.data.EntitySubtype;
 | 
			
		||||
import org.thingsboard.server.common.data.alarm.Alarm;
 | 
			
		||||
@ -47,15 +48,14 @@ import java.util.List;
 | 
			
		||||
import java.util.Set;
 | 
			
		||||
import java.util.UUID;
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * Created by ashvayka on 11.05.17.
 | 
			
		||||
 */
 | 
			
		||||
public interface AlarmDao extends Dao<Alarm> {
 | 
			
		||||
 | 
			
		||||
    Alarm findLatestByOriginatorAndType(TenantId tenantId, EntityId originator, String type);
 | 
			
		||||
 | 
			
		||||
    Alarm findLatestActiveByOriginatorAndType(TenantId tenantId, EntityId originator, String type);
 | 
			
		||||
 | 
			
		||||
    FluentFuture<Alarm> findLatestActiveByOriginatorAndTypeAsync(TenantId tenantId, EntityId originator, String type);
 | 
			
		||||
 | 
			
		||||
    ListenableFuture<Alarm> findLatestByOriginatorAndTypeAsync(TenantId tenantId, EntityId originator, String type);
 | 
			
		||||
 | 
			
		||||
    Alarm findAlarmById(TenantId tenantId, UUID key);
 | 
			
		||||
 | 
			
		||||
@ -18,6 +18,7 @@ package org.thingsboard.server.dao.alarm;
 | 
			
		||||
 | 
			
		||||
import com.fasterxml.jackson.databind.JsonNode;
 | 
			
		||||
import com.google.common.base.Function;
 | 
			
		||||
import com.google.common.util.concurrent.FluentFuture;
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import lombok.RequiredArgsConstructor;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
@ -169,6 +170,11 @@ public class BaseAlarmService extends AbstractCachedEntityService<TenantId, Page
 | 
			
		||||
        return alarmDao.findLatestActiveByOriginatorAndType(tenantId, originator, type);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public FluentFuture<Alarm> findLatestActiveByOriginatorAndTypeAsync(TenantId tenantId, EntityId originator, String type) {
 | 
			
		||||
        return alarmDao.findLatestActiveByOriginatorAndTypeAsync(tenantId, originator, type);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public PageData<AlarmData> findAlarmDataByQueryForEntities(TenantId tenantId,
 | 
			
		||||
                                                               AlarmDataQuery query, Collection<EntityId> orderedEntityIds) {
 | 
			
		||||
 | 
			
		||||
@ -16,6 +16,7 @@
 | 
			
		||||
package org.thingsboard.server.dao.sql.alarm;
 | 
			
		||||
 | 
			
		||||
import com.fasterxml.jackson.databind.JsonNode;
 | 
			
		||||
import com.google.common.util.concurrent.FluentFuture;
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import lombok.extern.slf4j.Slf4j;
 | 
			
		||||
import org.springframework.beans.factory.annotation.Autowired;
 | 
			
		||||
@ -79,9 +80,6 @@ import static org.thingsboard.server.common.data.page.SortOrder.Direction.ASC;
 | 
			
		||||
import static org.thingsboard.server.dao.DaoUtil.convertTenantEntityTypesToDto;
 | 
			
		||||
import static org.thingsboard.server.dao.DaoUtil.toPageable;
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * Created by Valerii Sosliuk on 5/19/2017.
 | 
			
		||||
 */
 | 
			
		||||
@Slf4j
 | 
			
		||||
@Component
 | 
			
		||||
@SqlDao
 | 
			
		||||
@ -124,6 +122,11 @@ public class JpaAlarmDao extends JpaAbstractDao<AlarmEntity, Alarm> implements A
 | 
			
		||||
        return latest.isEmpty() ? null : DaoUtil.getData(latest.get(0));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public FluentFuture<Alarm> findLatestActiveByOriginatorAndTypeAsync(TenantId tenantId, EntityId originator, String type) {
 | 
			
		||||
        return FluentFuture.from(service.submit(() -> findLatestActiveByOriginatorAndType(tenantId, originator, type)));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public ListenableFuture<Alarm> findLatestByOriginatorAndTypeAsync(TenantId tenantId, EntityId originator, String type) {
 | 
			
		||||
        return service.submit(() -> findLatestByOriginatorAndType(tenantId, originator, type));
 | 
			
		||||
 | 
			
		||||
@ -16,6 +16,7 @@
 | 
			
		||||
package org.thingsboard.rule.engine.api;
 | 
			
		||||
 | 
			
		||||
import com.fasterxml.jackson.databind.JsonNode;
 | 
			
		||||
import com.google.common.util.concurrent.FluentFuture;
 | 
			
		||||
import com.google.common.util.concurrent.Futures;
 | 
			
		||||
import com.google.common.util.concurrent.ListenableFuture;
 | 
			
		||||
import org.thingsboard.server.common.data.EntitySubtype;
 | 
			
		||||
@ -78,6 +79,8 @@ public interface RuleEngineAlarmService {
 | 
			
		||||
 | 
			
		||||
    Alarm findLatestActiveByOriginatorAndType(TenantId tenantId, EntityId originator, String type);
 | 
			
		||||
 | 
			
		||||
    FluentFuture<Alarm> findLatestActiveByOriginatorAndTypeAsync(TenantId tenantId, EntityId originator, String type);
 | 
			
		||||
 | 
			
		||||
    Alarm findLatestByOriginatorAndType(TenantId tenantId, EntityId originator, String type);
 | 
			
		||||
 | 
			
		||||
    AlarmInfo findAlarmInfoById(TenantId tenantId, AlarmId alarmId);
 | 
			
		||||
 | 
			
		||||
@ -70,7 +70,6 @@ public class TbCreateAlarmNode extends TbAbstractAlarmNode<TbCreateAlarmNodeConf
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    protected TbCreateAlarmNodeConfiguration loadAlarmNodeConfig(TbNodeConfiguration configuration) throws TbNodeException {
 | 
			
		||||
        TbCreateAlarmNodeConfiguration nodeConfiguration = TbNodeUtils.convert(configuration, TbCreateAlarmNodeConfiguration.class);
 | 
			
		||||
@ -96,12 +95,13 @@ public class TbCreateAlarmNode extends TbAbstractAlarmNode<TbCreateAlarmNodeConf
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        Alarm existingAlarm = ctx.getAlarmService().findLatestActiveByOriginatorAndType(ctx.getTenantId(), msg.getOriginator(), alarmType);
 | 
			
		||||
        return ctx.getAlarmService().findLatestActiveByOriginatorAndTypeAsync(ctx.getTenantId(), msg.getOriginator(), alarmType).transformAsync(existingAlarm -> {
 | 
			
		||||
            if (existingAlarm == null || existingAlarm.getStatus().isCleared()) {
 | 
			
		||||
                return createNewAlarm(ctx, msg, msgAlarm);
 | 
			
		||||
            } else {
 | 
			
		||||
                return updateAlarm(ctx, msg, existingAlarm, msgAlarm);
 | 
			
		||||
            }
 | 
			
		||||
        }, ctx.getDbCallbackExecutor());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    private Alarm getAlarmFromMessage(TbContext ctx, TbMsg msg) throws IOException {
 | 
			
		||||
 | 
			
		||||
@ -17,7 +17,7 @@ package org.thingsboard.rule.engine.action;
 | 
			
		||||
 | 
			
		||||
import com.datastax.oss.driver.api.core.uuid.Uuids;
 | 
			
		||||
import com.fasterxml.jackson.databind.JsonNode;
 | 
			
		||||
import com.google.common.util.concurrent.Futures;
 | 
			
		||||
import com.google.common.util.concurrent.FluentFuture;
 | 
			
		||||
import org.junit.jupiter.api.BeforeEach;
 | 
			
		||||
import org.junit.jupiter.api.DisplayName;
 | 
			
		||||
import org.junit.jupiter.api.Test;
 | 
			
		||||
@ -60,6 +60,8 @@ import java.util.List;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.concurrent.ExecutionException;
 | 
			
		||||
 | 
			
		||||
import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
 | 
			
		||||
import static com.google.common.util.concurrent.Futures.immediateFuture;
 | 
			
		||||
import static org.assertj.core.api.Assertions.assertThat;
 | 
			
		||||
import static org.mockito.ArgumentMatchers.any;
 | 
			
		||||
import static org.mockito.ArgumentMatchers.anyString;
 | 
			
		||||
@ -67,6 +69,7 @@ import static org.mockito.BDDMockito.given;
 | 
			
		||||
import static org.mockito.BDDMockito.then;
 | 
			
		||||
import static org.mockito.Mockito.doReturn;
 | 
			
		||||
import static org.mockito.Mockito.eq;
 | 
			
		||||
import static org.mockito.Mockito.lenient;
 | 
			
		||||
import static org.mockito.Mockito.never;
 | 
			
		||||
 | 
			
		||||
@ExtendWith(MockitoExtension.class)
 | 
			
		||||
@ -99,6 +102,8 @@ class TbCreateAlarmNodeTest {
 | 
			
		||||
        dbExecutor = new TestDbCallbackExecutor();
 | 
			
		||||
        metadata = new TbMsgMetaData();
 | 
			
		||||
        config = new TbCreateAlarmNodeConfiguration();
 | 
			
		||||
 | 
			
		||||
        lenient().when(ctxMock.getDbCallbackExecutor()).thenReturn(dbExecutor);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
@ -212,10 +217,9 @@ class TbCreateAlarmNodeTest {
 | 
			
		||||
        // mocks
 | 
			
		||||
        given(ctxMock.getTenantId()).willReturn(tenantId);
 | 
			
		||||
        given(ctxMock.getAlarmService()).willReturn(alarmServiceMock);
 | 
			
		||||
        given(ctxMock.getDbCallbackExecutor()).willReturn(dbExecutor);
 | 
			
		||||
        given(ctxMock.getSelfId()).willReturn(ruleNodeSelfId);
 | 
			
		||||
        given(alarmServiceMock.findLatestActiveByOriginatorAndType(tenantId, msgOriginator, alarmType)).willReturn(existingAlarm);
 | 
			
		||||
        given(alarmDetailsScriptMock.executeJsonAsync(incomingMsg)).willReturn(Futures.immediateFuture(alarmDetails));
 | 
			
		||||
        given(alarmServiceMock.findLatestActiveByOriginatorAndTypeAsync(tenantId, msgOriginator, alarmType)).willReturn(FluentFuture.from(immediateFuture(existingAlarm)));
 | 
			
		||||
        given(alarmDetailsScriptMock.executeJsonAsync(incomingMsg)).willReturn(immediateFuture(alarmDetails));
 | 
			
		||||
        var apiCallResult = AlarmApiCallResult.builder()
 | 
			
		||||
                .successful(true)
 | 
			
		||||
                .created(true)
 | 
			
		||||
@ -384,10 +388,9 @@ class TbCreateAlarmNodeTest {
 | 
			
		||||
        // mocks
 | 
			
		||||
        given(ctxMock.getTenantId()).willReturn(tenantId);
 | 
			
		||||
        given(ctxMock.getAlarmService()).willReturn(alarmServiceMock);
 | 
			
		||||
        given(ctxMock.getDbCallbackExecutor()).willReturn(dbExecutor);
 | 
			
		||||
        given(ctxMock.getSelfId()).willReturn(ruleNodeSelfId);
 | 
			
		||||
        given(alarmServiceMock.findLatestActiveByOriginatorAndType(tenantId, msgOriginator, alarmType)).willReturn(existingClearedAlarm);
 | 
			
		||||
        given(alarmDetailsScriptMock.executeJsonAsync(incomingMsg)).willReturn(Futures.immediateFuture(alarmDetails));
 | 
			
		||||
        given(alarmServiceMock.findLatestActiveByOriginatorAndTypeAsync(tenantId, msgOriginator, alarmType)).willReturn(FluentFuture.from(immediateFuture(existingClearedAlarm)));
 | 
			
		||||
        given(alarmDetailsScriptMock.executeJsonAsync(incomingMsg)).willReturn(immediateFuture(alarmDetails));
 | 
			
		||||
        var apiCallResult = AlarmApiCallResult.builder()
 | 
			
		||||
                .successful(true)
 | 
			
		||||
                .created(true)
 | 
			
		||||
@ -576,10 +579,9 @@ class TbCreateAlarmNodeTest {
 | 
			
		||||
        // mocks
 | 
			
		||||
        given(ctxMock.getTenantId()).willReturn(tenantId);
 | 
			
		||||
        given(ctxMock.getAlarmService()).willReturn(alarmServiceMock);
 | 
			
		||||
        given(ctxMock.getDbCallbackExecutor()).willReturn(dbExecutor);
 | 
			
		||||
        given(ctxMock.getSelfId()).willReturn(ruleNodeSelfId);
 | 
			
		||||
        given(alarmServiceMock.findLatestActiveByOriginatorAndType(tenantId, msgOriginator, alarmType)).willReturn(existingActiveAlarm);
 | 
			
		||||
        given(alarmDetailsScriptMock.executeJsonAsync(any())).willReturn(Futures.immediateFuture(newAlarmDetails));
 | 
			
		||||
        given(alarmServiceMock.findLatestActiveByOriginatorAndTypeAsync(tenantId, msgOriginator, alarmType)).willReturn(FluentFuture.from(immediateFuture(existingActiveAlarm)));
 | 
			
		||||
        given(alarmDetailsScriptMock.executeJsonAsync(any())).willReturn(immediateFuture(newAlarmDetails));
 | 
			
		||||
        doReturn(newEndTs).when(nodeSpy).currentTimeMillis();
 | 
			
		||||
        var apiCallResult = AlarmApiCallResult.builder()
 | 
			
		||||
                .successful(true)
 | 
			
		||||
@ -753,9 +755,8 @@ class TbCreateAlarmNodeTest {
 | 
			
		||||
        // mocks
 | 
			
		||||
        given(ctxMock.getTenantId()).willReturn(tenantId);
 | 
			
		||||
        given(ctxMock.getAlarmService()).willReturn(alarmServiceMock);
 | 
			
		||||
        given(ctxMock.getDbCallbackExecutor()).willReturn(dbExecutor);
 | 
			
		||||
        given(ctxMock.getSelfId()).willReturn(ruleNodeSelfId);
 | 
			
		||||
        given(alarmServiceMock.findLatestActiveByOriginatorAndType(tenantId, msgOriginator, alarmType)).willReturn(existingClearedAlarm);
 | 
			
		||||
        given(alarmServiceMock.findLatestActiveByOriginatorAndTypeAsync(tenantId, msgOriginator, alarmType)).willReturn(FluentFuture.from(immediateFuture(existingClearedAlarm)));
 | 
			
		||||
        var apiCallResult = AlarmApiCallResult.builder()
 | 
			
		||||
                .successful(true)
 | 
			
		||||
                .created(true)
 | 
			
		||||
@ -941,10 +942,9 @@ class TbCreateAlarmNodeTest {
 | 
			
		||||
        // mocks
 | 
			
		||||
        given(ctxMock.getTenantId()).willReturn(tenantId);
 | 
			
		||||
        given(ctxMock.getAlarmService()).willReturn(alarmServiceMock);
 | 
			
		||||
        given(ctxMock.getDbCallbackExecutor()).willReturn(dbExecutor);
 | 
			
		||||
        given(ctxMock.getSelfId()).willReturn(ruleNodeSelfId);
 | 
			
		||||
        given(alarmServiceMock.findLatestActiveByOriginatorAndType(tenantId, msgOriginator, alarmType)).willReturn(existingActiveAlarm);
 | 
			
		||||
        given(alarmDetailsScriptMock.executeJsonAsync(any())).willReturn(Futures.immediateFuture(newAlarmDetails));
 | 
			
		||||
        given(alarmServiceMock.findLatestActiveByOriginatorAndTypeAsync(tenantId, msgOriginator, alarmType)).willReturn(FluentFuture.from(immediateFuture(existingActiveAlarm)));
 | 
			
		||||
        given(alarmDetailsScriptMock.executeJsonAsync(any())).willReturn(immediateFuture(newAlarmDetails));
 | 
			
		||||
        doReturn(newEndTs).when(nodeSpy).currentTimeMillis();
 | 
			
		||||
        var apiCallResult = AlarmApiCallResult.builder()
 | 
			
		||||
                .successful(true)
 | 
			
		||||
@ -1125,10 +1125,9 @@ class TbCreateAlarmNodeTest {
 | 
			
		||||
        // mocks
 | 
			
		||||
        given(ctxMock.getTenantId()).willReturn(tenantId);
 | 
			
		||||
        given(ctxMock.getAlarmService()).willReturn(alarmServiceMock);
 | 
			
		||||
        given(ctxMock.getDbCallbackExecutor()).willReturn(dbExecutor);
 | 
			
		||||
        given(ctxMock.getSelfId()).willReturn(ruleNodeSelfId);
 | 
			
		||||
        given(alarmServiceMock.findLatestActiveByOriginatorAndType(tenantId, msgOriginator, alarmType)).willReturn(existingActiveAlarm);
 | 
			
		||||
        given(alarmDetailsScriptMock.executeJsonAsync(any())).willReturn(Futures.immediateFuture(alarmDetails));
 | 
			
		||||
        given(alarmServiceMock.findLatestActiveByOriginatorAndTypeAsync(tenantId, msgOriginator, alarmType)).willReturn(FluentFuture.from(immediateFuture(existingActiveAlarm)));
 | 
			
		||||
        given(alarmDetailsScriptMock.executeJsonAsync(any())).willReturn(immediateFuture(alarmDetails));
 | 
			
		||||
        doReturn(endTs).when(nodeSpy).currentTimeMillis();
 | 
			
		||||
        var apiCallResult = AlarmApiCallResult.builder()
 | 
			
		||||
                .successful(true)
 | 
			
		||||
@ -1216,11 +1215,11 @@ class TbCreateAlarmNodeTest {
 | 
			
		||||
 | 
			
		||||
        given(ctxMock.getTenantId()).willReturn(tenantId);
 | 
			
		||||
        given(ctxMock.getAlarmService()).willReturn(alarmServiceMock);
 | 
			
		||||
        given(ctxMock.getDbCallbackExecutor()).willReturn(dbExecutor);
 | 
			
		||||
        given(ctxMock.createScriptEngine(ScriptLanguage.TBEL, config.getAlarmDetailsBuildTbel())).willReturn(alarmDetailsScriptMock);
 | 
			
		||||
        given(alarmServiceMock.findLatestActiveByOriginatorAndTypeAsync(tenantId, msgOriginator, config.getAlarmType())).willReturn(FluentFuture.from(immediateFuture(null)));
 | 
			
		||||
 | 
			
		||||
        var expectedException = new ExecutionException("Failed to execute script.", new RuntimeException("Something went wrong!"));
 | 
			
		||||
        given(alarmDetailsScriptMock.executeJsonAsync(incomingMsg)).willReturn(Futures.immediateFailedFuture(expectedException));
 | 
			
		||||
        given(alarmDetailsScriptMock.executeJsonAsync(incomingMsg)).willReturn(immediateFailedFuture(expectedException));
 | 
			
		||||
 | 
			
		||||
        nodeSpy.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user