Merge with 2.5.3

This commit is contained in:
Andrii Shvaika 2020-08-05 17:15:42 +03:00
commit 35b2996401
12 changed files with 117 additions and 20 deletions

View File

@ -197,15 +197,15 @@ public class DefaultDeviceStateService implements DeviceStateService {
if (lastReportedActivity > 0 && lastReportedActivity > lastSavedActivity) { if (lastReportedActivity > 0 && lastReportedActivity > lastSavedActivity) {
DeviceStateData stateData = getOrFetchDeviceStateData(deviceId); DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
if (stateData != null) { if (stateData != null) {
DeviceState state = stateData.getState();
stateData.getState().setLastActivityTime(lastReportedActivity);
stateData.getMetaData().putValue("scope", SERVER_SCOPE);
pushRuleEngineMessage(stateData, ACTIVITY_EVENT);
save(deviceId, LAST_ACTIVITY_TIME, lastReportedActivity); save(deviceId, LAST_ACTIVITY_TIME, lastReportedActivity);
deviceLastSavedActivity.put(deviceId, lastReportedActivity); deviceLastSavedActivity.put(deviceId, lastReportedActivity);
DeviceState state = stateData.getState();
state.setLastActivityTime(lastReportedActivity);
if (!state.isActive()) { if (!state.isActive()) {
state.setActive(true); state.setActive(true);
save(deviceId, ACTIVITY_STATE, state.isActive()); save(deviceId, ACTIVITY_STATE, state.isActive());
stateData.getMetaData().putValue("scope", SERVER_SCOPE);
pushRuleEngineMessage(stateData, ACTIVITY_EVENT);
} }
} }
} }

View File

@ -617,6 +617,16 @@ queue:
max_poll_records: "${TB_QUEUE_KAFKA_MAX_POLL_RECORDS:8192}" max_poll_records: "${TB_QUEUE_KAFKA_MAX_POLL_RECORDS:8192}"
max_partition_fetch_bytes: "${TB_QUEUE_KAFKA_MAX_PARTITION_FETCH_BYTES:16777216}" max_partition_fetch_bytes: "${TB_QUEUE_KAFKA_MAX_PARTITION_FETCH_BYTES:16777216}"
fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}" fetch_max_bytes: "${TB_QUEUE_KAFKA_FETCH_MAX_BYTES:134217728}"
other:
# Properties for Confluent cloud
# - key: "ssl.endpoint.identification.algorithm"
# value: "https"
# - key: "sasl.mechanism"
# value: "PLAIN"
# - key: "sasl.jaas.config"
# value: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CLUSTER_API_KEY\" password=\"CLUSTER_API_SECRET\";"
# - key: "security.protocol"
# value: "SASL_SSL"
topic-properties: topic-properties:
rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" core: "${TB_QUEUE_KAFKA_CORE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
@ -624,6 +634,7 @@ queue:
notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}" notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000}"
js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600}" js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600}"
aws_sqs: aws_sqs:
use_default_credential_provider_chain: "${TB_QUEUE_AWS_SQS_USE_DEFAULT_CREDENTIAL_PROVIDER_CHAIN:false}"
access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}" access_key_id: "${TB_QUEUE_AWS_SQS_ACCESS_KEY_ID:YOUR_KEY}"
secret_access_key: "${TB_QUEUE_AWS_SQS_SECRET_ACCESS_KEY:YOUR_SECRET}" secret_access_key: "${TB_QUEUE_AWS_SQS_SECRET_ACCESS_KEY:YOUR_SECRET}"
region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}" region: "${TB_QUEUE_AWS_SQS_REGION:YOUR_REGION}"

View File

@ -16,10 +16,12 @@
package org.thingsboard.server.queue.kafka; package org.thingsboard.server.queue.kafka;
import lombok.Getter; import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.List; import java.util.List;
@ -30,6 +32,7 @@ import java.util.Properties;
*/ */
@Slf4j @Slf4j
@ConditionalOnExpression("'${queue.type:null}'=='kafka'") @ConditionalOnExpression("'${queue.type:null}'=='kafka'")
@ConfigurationProperties(prefix = "queue.kafka")
@Component @Component
public class TbKafkaSettings { public class TbKafkaSettings {
@ -67,7 +70,7 @@ public class TbKafkaSettings {
@Getter @Getter
private int fetchMaxBytes; private int fetchMaxBytes;
@Value("${kafka.other:#{null}}") @Setter
private List<TbKafkaProperty> other; private List<TbKafkaProperty> other;
public Properties toProps() { public Properties toProps() {

View File

@ -16,8 +16,10 @@
package org.thingsboard.server.queue.sqs; package org.thingsboard.server.queue.sqs;
import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder; import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.CreateQueueRequest; import com.amazonaws.services.sqs.model.CreateQueueRequest;
@ -37,9 +39,16 @@ public class TbAwsSqsAdmin implements TbQueueAdmin {
public TbAwsSqsAdmin(TbAwsSqsSettings sqsSettings, Map<String, String> attributes) { public TbAwsSqsAdmin(TbAwsSqsSettings sqsSettings, Map<String, String> attributes) {
this.attributes = attributes; this.attributes = attributes;
AWSCredentials awsCredentials = new BasicAWSCredentials(sqsSettings.getAccessKeyId(), sqsSettings.getSecretAccessKey()); AWSCredentialsProvider credentialsProvider;
if (sqsSettings.getUseDefaultCredentialProviderChain()) {
credentialsProvider = new DefaultAWSCredentialsProviderChain();
} else {
AWSCredentials awsCredentials = new BasicAWSCredentials(sqsSettings.getAccessKeyId(), sqsSettings.getSecretAccessKey());
credentialsProvider = new AWSStaticCredentialsProvider(awsCredentials);
}
sqsClient = AmazonSQSClientBuilder.standard() sqsClient = AmazonSQSClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(awsCredentials)) .withCredentials(credentialsProvider)
.withRegion(sqsSettings.getRegion()) .withRegion(sqsSettings.getRegion())
.build(); .build();

View File

@ -16,8 +16,10 @@
package org.thingsboard.server.queue.sqs; package org.thingsboard.server.queue.sqs;
import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder; import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry; import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
@ -67,13 +69,19 @@ public class TbAwsSqsConsumerTemplate<T extends TbQueueMsg> extends AbstractPara
this.decoder = decoder; this.decoder = decoder;
this.sqsSettings = sqsSettings; this.sqsSettings = sqsSettings;
AWSCredentials awsCredentials = new BasicAWSCredentials(sqsSettings.getAccessKeyId(), sqsSettings.getSecretAccessKey()); AWSCredentialsProvider credentialsProvider;
AWSStaticCredentialsProvider credProvider = new AWSStaticCredentialsProvider(awsCredentials); if (sqsSettings.getUseDefaultCredentialProviderChain()) {
credentialsProvider = new DefaultAWSCredentialsProviderChain();
} else {
AWSCredentials awsCredentials = new BasicAWSCredentials(sqsSettings.getAccessKeyId(), sqsSettings.getSecretAccessKey());
credentialsProvider = new AWSStaticCredentialsProvider(awsCredentials);
}
this.sqsClient = AmazonSQSClientBuilder.standard() sqsClient = AmazonSQSClientBuilder.standard()
.withCredentials(credProvider) .withCredentials(credentialsProvider)
.withRegion(sqsSettings.getRegion()) .withRegion(sqsSettings.getRegion())
.build(); .build();
} }
@Override @Override

View File

@ -16,8 +16,10 @@
package org.thingsboard.server.queue.sqs; package org.thingsboard.server.queue.sqs;
import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder; import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.SendMessageRequest; import com.amazonaws.services.sqs.model.SendMessageRequest;
@ -54,14 +56,18 @@ public class TbAwsSqsProducerTemplate<T extends TbQueueMsg> implements TbQueuePr
this.admin = admin; this.admin = admin;
this.defaultTopic = defaultTopic; this.defaultTopic = defaultTopic;
AWSCredentials awsCredentials = new BasicAWSCredentials(sqsSettings.getAccessKeyId(), sqsSettings.getSecretAccessKey()); AWSCredentialsProvider credentialsProvider;
AWSStaticCredentialsProvider credProvider = new AWSStaticCredentialsProvider(awsCredentials); if (sqsSettings.getUseDefaultCredentialProviderChain()) {
credentialsProvider = new DefaultAWSCredentialsProviderChain();
} else {
AWSCredentials awsCredentials = new BasicAWSCredentials(sqsSettings.getAccessKeyId(), sqsSettings.getSecretAccessKey());
credentialsProvider = new AWSStaticCredentialsProvider(awsCredentials);
}
this.sqsClient = AmazonSQSClientBuilder.standard() sqsClient = AmazonSQSClientBuilder.standard()
.withCredentials(credProvider) .withCredentials(credentialsProvider)
.withRegion(sqsSettings.getRegion()) .withRegion(sqsSettings.getRegion())
.build(); .build();
producerExecutor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool()); producerExecutor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
} }

View File

@ -27,6 +27,9 @@ import org.springframework.stereotype.Component;
@Data @Data
public class TbAwsSqsSettings { public class TbAwsSqsSettings {
@Value("${queue.aws_sqs.use_default_credential_provider_chain}")
private Boolean useDefaultCredentialProviderChain;
@Value("${queue.aws_sqs.access_key_id}") @Value("${queue.aws_sqs.access_key_id}")
private String accessKeyId; private String accessKeyId;

View File

@ -53,7 +53,7 @@ import java.util.concurrent.TimeUnit;
*/ */
@Slf4j @Slf4j
@Component("MqttSslHandlerProvider") @Component("MqttSslHandlerProvider")
@ConditionalOnExpression("'${transport.type:null}'=='null' || ('${transport.type}'=='local' && '${transport.http.enabled}'=='true')") @ConditionalOnExpression("'${transport.type:null}'=='null' || ('${transport.type}'=='local' && '${transport.mqtt.enabled}'=='true')")
@ConditionalOnProperty(prefix = "transport.mqtt.ssl", value = "enabled", havingValue = "true", matchIfMissing = false) @ConditionalOnProperty(prefix = "transport.mqtt.ssl", value = "enabled", havingValue = "true", matchIfMissing = false)
public class MqttSslHandlerProvider { public class MqttSslHandlerProvider {

View File

@ -207,6 +207,7 @@ public class JsonMqttAdaptor implements MqttTransportAdaptor {
try { try {
return new JsonParser().parse(payload); return new JsonParser().parse(payload);
} catch (JsonSyntaxException ex) { } catch (JsonSyntaxException ex) {
log.warn("Payload is in incorrect format: {}", payload);
throw new AdaptorException(ex); throw new AdaptorException(ex);
} }
} }

View File

@ -553,7 +553,13 @@ public class DefaultEntityQueryRepository implements EntityQueryRepository {
String lowerSearchText = "%" + searchText.toLowerCase() + "%"; String lowerSearchText = "%" + searchText.toLowerCase() + "%";
ctx.addStringParameter("lowerSearchTextParam", lowerSearchText); ctx.addStringParameter("lowerSearchTextParam", lowerSearchText);
List<String> searchAliases = selectionMapping.stream().map(EntityKeyMapping::getValueAlias).collect(Collectors.toList()); List<String> searchAliases = selectionMapping.stream().map(EntityKeyMapping::getValueAlias).collect(Collectors.toList());
return String.format(" WHERE LOWER(CONCAT(%s)) LIKE :%s", String.join(" , ", searchAliases), "lowerSearchTextParam"); String searchAliasesExpression;
if (searchAliases.size() > 1) {
searchAliasesExpression = "CONCAT(" + String.join(" , ", searchAliases) + ")";
} else {
searchAliasesExpression = searchAliases.get(0);
}
return String.format(" WHERE LOWER(%s) LIKE :%s", searchAliasesExpression, "lowerSearchTextParam");
} else { } else {
return ""; return "";
} }

View File

@ -18,7 +18,6 @@ package org.thingsboard.rule.engine.action;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbContext;
@ -59,7 +58,7 @@ public class TbClearAlarmNode extends TbAbstractAlarmNode<TbClearAlarmNodeConfig
protected ListenableFuture<AlarmResult> processAlarm(TbContext ctx, TbMsg msg) { protected ListenableFuture<AlarmResult> processAlarm(TbContext ctx, TbMsg msg) {
String alarmType = TbNodeUtils.processPattern(this.config.getAlarmType(), msg.getMetaData()); String alarmType = TbNodeUtils.processPattern(this.config.getAlarmType(), msg.getMetaData());
ListenableFuture<Alarm> alarmFuture; ListenableFuture<Alarm> alarmFuture;
if(msg.getOriginator().getEntityType().equals(EntityType.ALARM)){ if (msg.getOriginator().getEntityType().equals(EntityType.ALARM)) {
alarmFuture = ctx.getAlarmService().findAlarmByIdAsync(ctx.getTenantId(), new AlarmId(msg.getOriginator().getId())); alarmFuture = ctx.getAlarmService().findAlarmByIdAsync(ctx.getTenantId(), new AlarmId(msg.getOriginator().getId()));
} else { } else {
alarmFuture = ctx.getAlarmService().findLatestByOriginatorAndType(ctx.getTenantId(), msg.getOriginator(), alarmType); alarmFuture = ctx.getAlarmService().findLatestByOriginatorAndType(ctx.getTenantId(), msg.getOriginator(), alarmType);

View File

@ -36,6 +36,7 @@ import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException; import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.data.alarm.Alarm; import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.id.AlarmId;
import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.RuleChainId;
@ -97,6 +98,7 @@ public class TbAlarmNodeTest {
private ListeningExecutor dbExecutor; private ListeningExecutor dbExecutor;
private EntityId originator = new DeviceId(Uuids.timeBased()); private EntityId originator = new DeviceId(Uuids.timeBased());
private EntityId alarmOriginator = new AlarmId(Uuids.timeBased());
private TenantId tenantId = new TenantId(Uuids.timeBased()); private TenantId tenantId = new TenantId(Uuids.timeBased());
private TbMsgMetaData metaData = new TbMsgMetaData(); private TbMsgMetaData metaData = new TbMsgMetaData();
private String rawJson = "{\"name\": \"Vit\", \"passed\": 5}"; private String rawJson = "{\"name\": \"Vit\", \"passed\": 5}";
@ -328,6 +330,55 @@ public class TbAlarmNodeTest {
assertEquals(expectedAlarm, actualAlarm); assertEquals(expectedAlarm, actualAlarm);
} }
@Test
public void alarmCanBeClearedWithAlarmOriginator() throws ScriptException, IOException {
initWithClearAlarmScript();
metaData.putValue("key", "value");
TbMsg msg = TbMsg.newMsg( "USER", alarmOriginator, metaData, TbMsgDataType.JSON, rawJson, ruleChainId, ruleNodeId);
long oldEndDate = System.currentTimeMillis();
AlarmId id = new AlarmId(alarmOriginator.getId());
Alarm activeAlarm = Alarm.builder().type("SomeType").tenantId(tenantId).originator(originator).status(ACTIVE_UNACK).severity(WARNING).endTs(oldEndDate).build();
activeAlarm.setId(id);
when(detailsJs.executeJsonAsync(msg)).thenReturn(Futures.immediateFuture(null));
when(alarmService.findAlarmByIdAsync(tenantId, id)).thenReturn(Futures.immediateFuture(activeAlarm));
when(alarmService.clearAlarm(eq(activeAlarm.getTenantId()), eq(activeAlarm.getId()), org.mockito.Mockito.any(JsonNode.class), anyLong())).thenReturn(Futures.immediateFuture(true));
// doAnswer((Answer<Alarm>) invocationOnMock -> (Alarm) (invocationOnMock.getArguments())[0]).when(alarmService).createOrUpdateAlarm(activeAlarm);
node.onMsg(ctx, msg);
verify(ctx).tellNext(any(), eq("Cleared"));
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
ArgumentCaptor<String> typeCaptor = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<EntityId> originatorCaptor = ArgumentCaptor.forClass(EntityId.class);
ArgumentCaptor<TbMsgMetaData> metadataCaptor = ArgumentCaptor.forClass(TbMsgMetaData.class);
ArgumentCaptor<String> dataCaptor = ArgumentCaptor.forClass(String.class);
verify(ctx).transformMsg(msgCaptor.capture(), typeCaptor.capture(), originatorCaptor.capture(), metadataCaptor.capture(), dataCaptor.capture());
assertEquals("ALARM", typeCaptor.getValue());
assertEquals(alarmOriginator, originatorCaptor.getValue());
assertEquals("value", metadataCaptor.getValue().getValue("key"));
assertEquals(Boolean.TRUE.toString(), metadataCaptor.getValue().getValue(IS_CLEARED_ALARM));
assertNotSame(metaData, metadataCaptor.getValue());
Alarm actualAlarm = new ObjectMapper().readValue(dataCaptor.getValue().getBytes(), Alarm.class);
Alarm expectedAlarm = Alarm.builder()
.tenantId(tenantId)
.originator(originator)
.status(CLEARED_UNACK)
.severity(WARNING)
.propagate(false)
.type("SomeType")
.details(null)
.endTs(oldEndDate)
.build();
expectedAlarm.setId(id);
assertEquals(expectedAlarm, actualAlarm);
}
private void initWithCreateAlarmScript() { private void initWithCreateAlarmScript() {
try { try {
TbCreateAlarmNodeConfiguration config = new TbCreateAlarmNodeConfiguration(); TbCreateAlarmNodeConfiguration config = new TbCreateAlarmNodeConfiguration();