Merge branch 'master' of github.com:thingsboard/thingsboard
This commit is contained in:
commit
57a0a27753
@ -157,7 +157,8 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
|
|||||||
private final DbTypeInfoComponent dbTypeInfoComponent;
|
private final DbTypeInfoComponent dbTypeInfoComponent;
|
||||||
private final TbApiUsageReportClient apiUsageReportClient;
|
private final TbApiUsageReportClient apiUsageReportClient;
|
||||||
private final NotificationRuleProcessor notificationRuleProcessor;
|
private final NotificationRuleProcessor notificationRuleProcessor;
|
||||||
@Autowired @Lazy
|
@Autowired
|
||||||
|
@Lazy
|
||||||
private TelemetrySubscriptionService tsSubService;
|
private TelemetrySubscriptionService tsSubService;
|
||||||
|
|
||||||
@Value("${state.defaultInactivityTimeoutInSec}")
|
@Value("${state.defaultInactivityTimeoutInSec}")
|
||||||
@ -362,14 +363,16 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
|
|||||||
if (proto.getAdded()) {
|
if (proto.getAdded()) {
|
||||||
Futures.addCallback(fetchDeviceState(device), new FutureCallback<>() {
|
Futures.addCallback(fetchDeviceState(device), new FutureCallback<>() {
|
||||||
@Override
|
@Override
|
||||||
public void onSuccess(@Nullable DeviceStateData state) {
|
public void onSuccess(DeviceStateData state) {
|
||||||
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, device.getId());
|
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, device.getId());
|
||||||
if (addDeviceUsingState(tpi, state)) {
|
Set<DeviceId> deviceIds = partitionedEntities.get(tpi);
|
||||||
save(deviceId, ACTIVITY_STATE, false);
|
boolean isMyPartition = deviceIds != null;
|
||||||
|
if (isMyPartition) {
|
||||||
|
deviceIds.add(state.getDeviceId());
|
||||||
|
initializeActivityState(deviceId, state);
|
||||||
callback.onSuccess();
|
callback.onSuccess();
|
||||||
} else {
|
} else {
|
||||||
log.debug("[{}][{}] Device belongs to external partition. Probably rebalancing is in progress. Topic: {}"
|
log.debug("[{}][{}] Device belongs to external partition. Probably rebalancing is in progress. Topic: {}", tenantId, deviceId, tpi.getFullTopicName());
|
||||||
, tenantId, deviceId, tpi.getFullTopicName());
|
|
||||||
callback.onFailure(new RuntimeException("Device belongs to external partition " + tpi.getFullTopicName() + "!"));
|
callback.onFailure(new RuntimeException("Device belongs to external partition " + tpi.getFullTopicName() + "!"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -400,6 +403,21 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void onDeviceDeleted(TenantId tenantId, DeviceId deviceId) {
|
||||||
|
cleanupEntity(deviceId);
|
||||||
|
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId);
|
||||||
|
Set<DeviceId> deviceIdSet = partitionedEntities.get(tpi);
|
||||||
|
if (deviceIdSet != null) {
|
||||||
|
deviceIdSet.remove(deviceId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initializeActivityState(DeviceId deviceId, DeviceStateData fetchedState) {
|
||||||
|
DeviceStateData cachedState = deviceStates.putIfAbsent(fetchedState.getDeviceId(), fetchedState);
|
||||||
|
boolean activityState = Objects.requireNonNullElse(cachedState, fetchedState).getState().isActive();
|
||||||
|
save(deviceId, ACTIVITY_STATE, activityState);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Map<TopicPartitionInfo, List<ListenableFuture<?>>> onAddedPartitions(Set<TopicPartitionInfo> addedPartitions) {
|
protected Map<TopicPartitionInfo, List<ListenableFuture<?>>> onAddedPartitions(Set<TopicPartitionInfo> addedPartitions) {
|
||||||
var result = new HashMap<TopicPartitionInfo, List<ListenableFuture<?>>>();
|
var result = new HashMap<TopicPartitionInfo, List<ListenableFuture<?>>>();
|
||||||
@ -436,10 +454,16 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
|
|||||||
}
|
}
|
||||||
if (devicePackFutureHolder.future == null || !devicePackFutureHolder.future.isCancelled()) {
|
if (devicePackFutureHolder.future == null || !devicePackFutureHolder.future.isCancelled()) {
|
||||||
for (var state : states) {
|
for (var state : states) {
|
||||||
if (!addDeviceUsingState(entry.getKey(), state)) {
|
TopicPartitionInfo tpi = entry.getKey();
|
||||||
return;
|
Set<DeviceId> deviceIds = partitionedEntities.get(tpi);
|
||||||
|
boolean isMyPartition = deviceIds != null;
|
||||||
|
if (isMyPartition) {
|
||||||
|
deviceIds.add(state.getDeviceId());
|
||||||
|
deviceStates.putIfAbsent(state.getDeviceId(), state);
|
||||||
|
checkAndUpdateState(state.getDeviceId(), state);
|
||||||
|
} else {
|
||||||
|
log.debug("[{}] Device belongs to external partition {}", state.getDeviceId(), tpi.getFullTopicName());
|
||||||
}
|
}
|
||||||
checkAndUpdateState(state.getDeviceId(), state);
|
|
||||||
}
|
}
|
||||||
log.info("[{}] Initialized {} out of {} device states", entry.getKey().getPartition().orElse(0), counter.addAndGet(states.size()), entry.getValue().size());
|
log.info("[{}] Initialized {} out of {} device states", entry.getKey().getPartition().orElse(0), counter.addAndGet(states.size()), entry.getValue().size());
|
||||||
}
|
}
|
||||||
@ -475,18 +499,6 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean addDeviceUsingState(TopicPartitionInfo tpi, DeviceStateData state) {
|
|
||||||
Set<DeviceId> deviceIds = partitionedEntities.get(tpi);
|
|
||||||
if (deviceIds != null) {
|
|
||||||
deviceIds.add(state.getDeviceId());
|
|
||||||
deviceStates.putIfAbsent(state.getDeviceId(), state);
|
|
||||||
return true;
|
|
||||||
} else {
|
|
||||||
log.debug("[{}] Device belongs to external partition {}", state.getDeviceId(), tpi.getFullTopicName());
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void checkStates() {
|
void checkStates() {
|
||||||
try {
|
try {
|
||||||
final long ts = getCurrentTimeMillis();
|
final long ts = getCurrentTimeMillis();
|
||||||
@ -619,15 +631,6 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
|
|||||||
return cleanup;
|
return cleanup;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void onDeviceDeleted(TenantId tenantId, DeviceId deviceId) {
|
|
||||||
cleanupEntity(deviceId);
|
|
||||||
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, deviceId);
|
|
||||||
Set<DeviceId> deviceIdSet = partitionedEntities.get(tpi);
|
|
||||||
if (deviceIdSet != null) {
|
|
||||||
deviceIdSet.remove(deviceId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void cleanupEntityOnPartitionRemoval(DeviceId deviceId) {
|
protected void cleanupEntityOnPartitionRemoval(DeviceId deviceId) {
|
||||||
cleanupEntity(deviceId);
|
cleanupEntity(deviceId);
|
||||||
|
|||||||
@ -15,6 +15,7 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.server.service.state;
|
package org.thingsboard.server.service.state;
|
||||||
|
|
||||||
|
import com.google.common.util.concurrent.Futures;
|
||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import org.junit.jupiter.api.extension.ExtendWith;
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
@ -28,8 +29,10 @@ import org.mockito.junit.jupiter.MockitoExtension;
|
|||||||
import org.springframework.test.util.ReflectionTestUtils;
|
import org.springframework.test.util.ReflectionTestUtils;
|
||||||
import org.thingsboard.server.cluster.TbClusterService;
|
import org.thingsboard.server.cluster.TbClusterService;
|
||||||
import org.thingsboard.server.common.data.AttributeScope;
|
import org.thingsboard.server.common.data.AttributeScope;
|
||||||
|
import org.thingsboard.server.common.data.Device;
|
||||||
import org.thingsboard.server.common.data.DeviceIdInfo;
|
import org.thingsboard.server.common.data.DeviceIdInfo;
|
||||||
import org.thingsboard.server.common.data.id.DeviceId;
|
import org.thingsboard.server.common.data.id.DeviceId;
|
||||||
|
import org.thingsboard.server.common.data.id.EntityId;
|
||||||
import org.thingsboard.server.common.data.id.TenantId;
|
import org.thingsboard.server.common.data.id.TenantId;
|
||||||
import org.thingsboard.server.common.data.msg.TbMsgType;
|
import org.thingsboard.server.common.data.msg.TbMsgType;
|
||||||
import org.thingsboard.server.common.data.notification.rule.trigger.DeviceActivityTrigger;
|
import org.thingsboard.server.common.data.notification.rule.trigger.DeviceActivityTrigger;
|
||||||
@ -41,11 +44,13 @@ import org.thingsboard.server.common.msg.TbMsg;
|
|||||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
||||||
import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
|
import org.thingsboard.server.common.msg.notification.NotificationRuleProcessor;
|
||||||
import org.thingsboard.server.common.msg.queue.ServiceType;
|
import org.thingsboard.server.common.msg.queue.ServiceType;
|
||||||
|
import org.thingsboard.server.common.msg.queue.TbCallback;
|
||||||
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
|
||||||
import org.thingsboard.server.dao.attributes.AttributesService;
|
import org.thingsboard.server.dao.attributes.AttributesService;
|
||||||
import org.thingsboard.server.dao.device.DeviceService;
|
import org.thingsboard.server.dao.device.DeviceService;
|
||||||
import org.thingsboard.server.dao.sql.query.EntityQueryRepository;
|
import org.thingsboard.server.dao.sql.query.EntityQueryRepository;
|
||||||
import org.thingsboard.server.dao.timeseries.TimeseriesService;
|
import org.thingsboard.server.dao.timeseries.TimeseriesService;
|
||||||
|
import org.thingsboard.server.gen.transport.TransportProtos;
|
||||||
import org.thingsboard.server.queue.discovery.PartitionService;
|
import org.thingsboard.server.queue.discovery.PartitionService;
|
||||||
import org.thingsboard.server.queue.discovery.QueueKey;
|
import org.thingsboard.server.queue.discovery.QueueKey;
|
||||||
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
|
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
|
||||||
@ -66,6 +71,7 @@ import java.util.stream.Stream;
|
|||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
import static org.awaitility.Awaitility.await;
|
import static org.awaitility.Awaitility.await;
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
import static org.mockito.ArgumentMatchers.anyList;
|
||||||
import static org.mockito.ArgumentMatchers.anyLong;
|
import static org.mockito.ArgumentMatchers.anyLong;
|
||||||
import static org.mockito.ArgumentMatchers.eq;
|
import static org.mockito.ArgumentMatchers.eq;
|
||||||
import static org.mockito.BDDMockito.given;
|
import static org.mockito.BDDMockito.given;
|
||||||
@ -1070,4 +1076,106 @@ public class DefaultDeviceStateServiceTest {
|
|||||||
then(service).should().fetchDeviceStateDataUsingSeparateRequests(deviceId);
|
then(service).should().fetchDeviceStateDataUsingSeparateRequests(deviceId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void givenDeviceAdded_whenOnQueueMsg_thenShouldCacheAndSaveActivityToFalse() throws InterruptedException {
|
||||||
|
// GIVEN
|
||||||
|
final long defaultTimeout = 1000;
|
||||||
|
initStateService(defaultTimeout);
|
||||||
|
given(deviceService.findDeviceById(any(TenantId.class), any(DeviceId.class))).willReturn(new Device(deviceId));
|
||||||
|
given(attributesService.find(any(TenantId.class), any(EntityId.class), any(AttributeScope.class), anyList())).willReturn(Futures.immediateFuture(Collections.emptyList()));
|
||||||
|
|
||||||
|
TransportProtos.DeviceStateServiceMsgProto proto = TransportProtos.DeviceStateServiceMsgProto.newBuilder()
|
||||||
|
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
|
||||||
|
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
|
||||||
|
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
|
||||||
|
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
|
||||||
|
.setAdded(true)
|
||||||
|
.setUpdated(false)
|
||||||
|
.setDeleted(false)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
// WHEN
|
||||||
|
service.onQueueMsg(proto, TbCallback.EMPTY);
|
||||||
|
|
||||||
|
// THEN
|
||||||
|
await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> {
|
||||||
|
assertThat(service.deviceStates.get(deviceId).getState().isActive()).isEqualTo(false);
|
||||||
|
then(telemetrySubscriptionService).should().saveAttrAndNotify(eq(TenantId.SYS_TENANT_ID), eq(deviceId), eq(AttributeScope.SERVER_SCOPE), eq(ACTIVITY_STATE), eq(false), any());
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void givenDeviceActivityEventHappenedAfterAdded_whenOnDeviceActivity_thenShouldCacheAndSaveActivityToTrue() throws InterruptedException {
|
||||||
|
// GIVEN
|
||||||
|
final long defaultTimeout = 1000;
|
||||||
|
initStateService(defaultTimeout);
|
||||||
|
long currentTime = System.currentTimeMillis();
|
||||||
|
DeviceState deviceState = DeviceState.builder()
|
||||||
|
.active(false)
|
||||||
|
.inactivityTimeout(service.getDefaultInactivityTimeoutInSec())
|
||||||
|
.build();
|
||||||
|
DeviceStateData stateData = DeviceStateData.builder()
|
||||||
|
.tenantId(tenantId)
|
||||||
|
.deviceId(deviceId)
|
||||||
|
.deviceCreationTime(currentTime - 10000)
|
||||||
|
.state(deviceState)
|
||||||
|
.metaData(TbMsgMetaData.EMPTY)
|
||||||
|
.build();
|
||||||
|
service.deviceStates.put(deviceId, stateData);
|
||||||
|
|
||||||
|
// WHEN
|
||||||
|
service.onDeviceActivity(tenantId, deviceId, currentTime);
|
||||||
|
|
||||||
|
// THEN
|
||||||
|
await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> {
|
||||||
|
assertThat(service.deviceStates.get(deviceId).getState().isActive()).isEqualTo(true);
|
||||||
|
then(telemetrySubscriptionService).should().saveAttrAndNotify(eq(TenantId.SYS_TENANT_ID), eq(deviceId), eq(AttributeScope.SERVER_SCOPE), eq(LAST_ACTIVITY_TIME), eq(currentTime), any());
|
||||||
|
then(telemetrySubscriptionService).should().saveAttrAndNotify(eq(TenantId.SYS_TENANT_ID), eq(deviceId), eq(AttributeScope.SERVER_SCOPE), eq(ACTIVITY_STATE), eq(true), any());
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void givenDeviceActivityEventHappenedBeforeAdded_whenOnQueueMsg_thenShouldSaveActivityStateUsingValueFromCache() throws InterruptedException {
|
||||||
|
// GIVEN
|
||||||
|
final long defaultTimeout = 1000;
|
||||||
|
initStateService(defaultTimeout);
|
||||||
|
given(deviceService.findDeviceById(any(TenantId.class), any(DeviceId.class))).willReturn(new Device(deviceId));
|
||||||
|
given(attributesService.find(any(TenantId.class), any(EntityId.class), any(AttributeScope.class), anyList())).willReturn(Futures.immediateFuture(Collections.emptyList()));
|
||||||
|
|
||||||
|
long currentTime = System.currentTimeMillis();
|
||||||
|
DeviceState deviceState = DeviceState.builder()
|
||||||
|
.active(true)
|
||||||
|
.lastConnectTime(currentTime - 8000)
|
||||||
|
.lastActivityTime(currentTime - 4000)
|
||||||
|
.lastDisconnectTime(0)
|
||||||
|
.lastInactivityAlarmTime(0)
|
||||||
|
.inactivityTimeout(3000)
|
||||||
|
.build();
|
||||||
|
DeviceStateData stateData = DeviceStateData.builder()
|
||||||
|
.tenantId(tenantId)
|
||||||
|
.deviceId(deviceId)
|
||||||
|
.deviceCreationTime(currentTime - 10000)
|
||||||
|
.state(deviceState)
|
||||||
|
.build();
|
||||||
|
service.deviceStates.put(deviceId, stateData);
|
||||||
|
|
||||||
|
// WHEN
|
||||||
|
TransportProtos.DeviceStateServiceMsgProto proto = TransportProtos.DeviceStateServiceMsgProto.newBuilder()
|
||||||
|
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
|
||||||
|
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
|
||||||
|
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
|
||||||
|
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
|
||||||
|
.setAdded(true)
|
||||||
|
.setUpdated(false)
|
||||||
|
.setDeleted(false)
|
||||||
|
.build();
|
||||||
|
service.onQueueMsg(proto, TbCallback.EMPTY);
|
||||||
|
|
||||||
|
// THEN
|
||||||
|
await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> {
|
||||||
|
assertThat(service.deviceStates.get(deviceId).getState().isActive()).isEqualTo(true);
|
||||||
|
then(telemetrySubscriptionService).should().saveAttrAndNotify(eq(TenantId.SYS_TENANT_ID), eq(deviceId), eq(AttributeScope.SERVER_SCOPE), eq(ACTIVITY_STATE), eq(true), any());
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -35,10 +35,13 @@ import org.thingsboard.server.common.data.StringUtils;
|
|||||||
import org.thingsboard.server.common.data.plugin.ComponentType;
|
import org.thingsboard.server.common.data.plugin.ComponentType;
|
||||||
import org.thingsboard.server.common.msg.TbMsg;
|
import org.thingsboard.server.common.msg.TbMsg;
|
||||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
||||||
|
import org.thingsboard.server.dao.exception.DataValidationException;
|
||||||
|
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import static org.thingsboard.server.dao.service.ConstraintValidator.validateFields;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@RuleNode(
|
@RuleNode(
|
||||||
type = ComponentType.EXTERNAL,
|
type = ComponentType.EXTERNAL,
|
||||||
@ -62,10 +65,9 @@ public class TbAwsLambdaNode extends TbAbstractExternalNode {
|
|||||||
@Override
|
@Override
|
||||||
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
|
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
|
||||||
config = TbNodeUtils.convert(configuration, TbAwsLambdaNodeConfiguration.class);
|
config = TbNodeUtils.convert(configuration, TbAwsLambdaNodeConfiguration.class);
|
||||||
if (StringUtils.isBlank(config.getFunctionName())) {
|
String errorPrefix = "'" + ctx.getSelf().getName() + "' node configuration is invalid: ";
|
||||||
throw new TbNodeException("Function name must be set!", true);
|
|
||||||
}
|
|
||||||
try {
|
try {
|
||||||
|
validateFields(config, errorPrefix);
|
||||||
AWSCredentials awsCredentials = new BasicAWSCredentials(config.getAccessKey(), config.getSecretKey());
|
AWSCredentials awsCredentials = new BasicAWSCredentials(config.getAccessKey(), config.getSecretKey());
|
||||||
client = AWSLambdaAsyncClientBuilder.standard()
|
client = AWSLambdaAsyncClientBuilder.standard()
|
||||||
.withCredentials(new AWSStaticCredentialsProvider(awsCredentials))
|
.withCredentials(new AWSStaticCredentialsProvider(awsCredentials))
|
||||||
@ -74,6 +76,8 @@ public class TbAwsLambdaNode extends TbAbstractExternalNode {
|
|||||||
.withConnectionTimeout((int) TimeUnit.SECONDS.toMillis(config.getConnectionTimeout()))
|
.withConnectionTimeout((int) TimeUnit.SECONDS.toMillis(config.getConnectionTimeout()))
|
||||||
.withRequestTimeout((int) TimeUnit.SECONDS.toMillis(config.getRequestTimeout())))
|
.withRequestTimeout((int) TimeUnit.SECONDS.toMillis(config.getRequestTimeout())))
|
||||||
.build();
|
.build();
|
||||||
|
} catch (DataValidationException e) {
|
||||||
|
throw new TbNodeException(e, true);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new TbNodeException(e);
|
throw new TbNodeException(e);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -15,6 +15,8 @@
|
|||||||
*/
|
*/
|
||||||
package org.thingsboard.rule.engine.aws.lambda;
|
package org.thingsboard.rule.engine.aws.lambda;
|
||||||
|
|
||||||
|
import jakarta.validation.constraints.Min;
|
||||||
|
import jakarta.validation.constraints.NotBlank;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import org.thingsboard.rule.engine.api.NodeConfiguration;
|
import org.thingsboard.rule.engine.api.NodeConfiguration;
|
||||||
|
|
||||||
@ -23,12 +25,18 @@ public class TbAwsLambdaNodeConfiguration implements NodeConfiguration<TbAwsLamb
|
|||||||
|
|
||||||
public static final String DEFAULT_QUALIFIER = "$LATEST";
|
public static final String DEFAULT_QUALIFIER = "$LATEST";
|
||||||
|
|
||||||
|
@NotBlank
|
||||||
private String accessKey;
|
private String accessKey;
|
||||||
|
@NotBlank
|
||||||
private String secretKey;
|
private String secretKey;
|
||||||
|
@NotBlank
|
||||||
private String region;
|
private String region;
|
||||||
|
@NotBlank
|
||||||
private String functionName;
|
private String functionName;
|
||||||
private String qualifier;
|
private String qualifier;
|
||||||
|
@Min(0)
|
||||||
private int connectionTimeout;
|
private int connectionTimeout;
|
||||||
|
@Min(0)
|
||||||
private int requestTimeout;
|
private int requestTimeout;
|
||||||
private boolean tellFailureIfFuncThrowsExc;
|
private boolean tellFailureIfFuncThrowsExc;
|
||||||
|
|
||||||
|
|||||||
@ -41,6 +41,7 @@ import org.thingsboard.rule.engine.api.util.TbNodeUtils;
|
|||||||
import org.thingsboard.server.common.data.StringUtils;
|
import org.thingsboard.server.common.data.StringUtils;
|
||||||
import org.thingsboard.server.common.data.id.DeviceId;
|
import org.thingsboard.server.common.data.id.DeviceId;
|
||||||
import org.thingsboard.server.common.data.msg.TbMsgType;
|
import org.thingsboard.server.common.data.msg.TbMsgType;
|
||||||
|
import org.thingsboard.server.common.data.rule.RuleNode;
|
||||||
import org.thingsboard.server.common.msg.TbMsg;
|
import org.thingsboard.server.common.msg.TbMsg;
|
||||||
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
import org.thingsboard.server.common.msg.TbMsgMetaData;
|
||||||
|
|
||||||
@ -50,8 +51,8 @@ import java.util.Map;
|
|||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.Assertions.assertThatThrownBy;
|
||||||
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
|
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
|
||||||
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
|
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
import static org.mockito.ArgumentMatchers.eq;
|
import static org.mockito.ArgumentMatchers.eq;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
@ -72,13 +73,17 @@ public class TbAwsLambdaNodeTest {
|
|||||||
private AWSLambdaAsync clientMock;
|
private AWSLambdaAsync clientMock;
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
void setUp() {
|
public void setUp() {
|
||||||
node = new TbAwsLambdaNode();
|
node = new TbAwsLambdaNode();
|
||||||
config = new TbAwsLambdaNodeConfiguration().defaultConfiguration();
|
config = new TbAwsLambdaNodeConfiguration().defaultConfiguration();
|
||||||
|
config.setAccessKey("accessKey");
|
||||||
|
config.setSecretKey("secretKey");
|
||||||
|
config.setFunctionName("new-function");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void verifyDefaultConfig() {
|
public void verifyDefaultConfig() {
|
||||||
|
config = new TbAwsLambdaNodeConfiguration().defaultConfiguration();
|
||||||
assertThat(config.getAccessKey()).isNull();
|
assertThat(config.getAccessKey()).isNull();
|
||||||
assertThat(config.getSecretKey()).isNull();
|
assertThat(config.getSecretKey()).isNull();
|
||||||
assertThat(config.getRegion()).isEqualTo(("us-east-1"));
|
assertThat(config.getRegion()).isEqualTo(("us-east-1"));
|
||||||
@ -94,10 +99,43 @@ public class TbAwsLambdaNodeTest {
|
|||||||
@ValueSource(strings = " ")
|
@ValueSource(strings = " ")
|
||||||
public void givenInvalidFunctionName_whenInit_thenThrowsException(String funcName) {
|
public void givenInvalidFunctionName_whenInit_thenThrowsException(String funcName) {
|
||||||
config.setFunctionName(funcName);
|
config.setFunctionName(funcName);
|
||||||
var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
|
verifyValidationExceptionOnInit();
|
||||||
assertThatThrownBy(() -> node.init(ctx, configuration))
|
}
|
||||||
.isInstanceOf(TbNodeException.class)
|
|
||||||
.hasMessage("Function name must be set!");
|
@ParameterizedTest
|
||||||
|
@NullAndEmptySource
|
||||||
|
@ValueSource(strings = " ")
|
||||||
|
public void givenInvalidAccessKey_whenInit_thenThrowsException(String accessKey) {
|
||||||
|
config.setAccessKey(accessKey);
|
||||||
|
verifyValidationExceptionOnInit();
|
||||||
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest
|
||||||
|
@NullAndEmptySource
|
||||||
|
@ValueSource(strings = " ")
|
||||||
|
public void givenInvalidSecretAccessKey_whenInit_thenThrowsException(String secretAccessKey) {
|
||||||
|
config.setSecretKey(secretAccessKey);
|
||||||
|
verifyValidationExceptionOnInit();
|
||||||
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest
|
||||||
|
@NullAndEmptySource
|
||||||
|
@ValueSource(strings = " ")
|
||||||
|
public void givenInvalidRegion_whenInit_thenThrowsException(String region) {
|
||||||
|
config.setRegion(region);
|
||||||
|
verifyValidationExceptionOnInit();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void givenInvalidConnectionTimeout_whenInit_thenThrowsException() {
|
||||||
|
config.setConnectionTimeout(-100);
|
||||||
|
verifyValidationExceptionOnInit();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void givenInvalidRequestTimeout_whenInit_thenThrowsException() {
|
||||||
|
config.setRequestTimeout(-100);
|
||||||
|
verifyValidationExceptionOnInit();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@ -280,10 +318,19 @@ public class TbAwsLambdaNodeTest {
|
|||||||
assertThat(throwableCaptor.getValue()).isInstanceOf(AWSLambdaException.class).hasMessageStartingWith(errorMsg);
|
assertThat(throwableCaptor.getValue()).isInstanceOf(AWSLambdaException.class).hasMessageStartingWith(errorMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void verifyValidationExceptionOnInit() {
|
||||||
|
RuleNode ruleNode = new RuleNode();
|
||||||
|
ruleNode.setName("test");
|
||||||
|
when(ctx.getSelf()).thenReturn(ruleNode);
|
||||||
|
String errorPrefix = "'test' node configuration is invalid: ";
|
||||||
|
assertThatThrownBy(() -> node.init(ctx, new TbNodeConfiguration(JacksonUtil.valueToTree(config))))
|
||||||
|
.isInstanceOf(TbNodeException.class)
|
||||||
|
.hasMessageContaining(errorPrefix)
|
||||||
|
.extracting(e -> ((TbNodeException) e).isUnrecoverable())
|
||||||
|
.isEqualTo(true);
|
||||||
|
}
|
||||||
|
|
||||||
private void init() {
|
private void init() {
|
||||||
config.setAccessKey("accessKey");
|
|
||||||
config.setSecretKey("secretKey");
|
|
||||||
config.setFunctionName("new-function");
|
|
||||||
ReflectionTestUtils.setField(node, "client", clientMock);
|
ReflectionTestUtils.setField(node, "client", clientMock);
|
||||||
ReflectionTestUtils.setField(node, "config", config);
|
ReflectionTestUtils.setField(node, "config", config);
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user