Merge branch 'develop/3.4' of github.com:thingsboard/thingsboard into develop/3.4
This commit is contained in:
commit
33e736d4ca
@ -74,6 +74,7 @@ import org.thingsboard.server.gen.edge.v1.UserCredentialsRequestMsg;
|
|||||||
import org.thingsboard.server.gen.edge.v1.WidgetBundleTypesRequestMsg;
|
import org.thingsboard.server.gen.edge.v1.WidgetBundleTypesRequestMsg;
|
||||||
import org.thingsboard.server.service.entitiy.entityview.TbEntityViewService;
|
import org.thingsboard.server.service.entitiy.entityview.TbEntityViewService;
|
||||||
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
|
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
|
||||||
|
import org.thingsboard.server.service.state.DefaultDeviceStateService;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
@ -162,6 +163,9 @@ public class DefaultEdgeRequestsService implements EdgeRequestsService {
|
|||||||
Map<String, Object> entityData = new HashMap<>();
|
Map<String, Object> entityData = new HashMap<>();
|
||||||
ObjectNode attributes = mapper.createObjectNode();
|
ObjectNode attributes = mapper.createObjectNode();
|
||||||
for (AttributeKvEntry attr : ssAttributes) {
|
for (AttributeKvEntry attr : ssAttributes) {
|
||||||
|
if (DefaultDeviceStateService.PERSISTENT_ATTRIBUTES.contains(attr.getKey())) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
if (attr.getDataType() == DataType.BOOLEAN && attr.getBooleanValue().isPresent()) {
|
if (attr.getDataType() == DataType.BOOLEAN && attr.getBooleanValue().isPresent()) {
|
||||||
attributes.put(attr.getKey(), attr.getBooleanValue().get());
|
attributes.put(attr.getKey(), attr.getBooleanValue().get());
|
||||||
} else if (attr.getDataType() == DataType.DOUBLE && attr.getDoubleValue().isPresent()) {
|
} else if (attr.getDataType() == DataType.DOUBLE && attr.getDoubleValue().isPresent()) {
|
||||||
|
|||||||
@ -266,7 +266,6 @@ sql:
|
|||||||
batch_size: "${SQL_EDGE_EVENTS_BATCH_SIZE:1000}"
|
batch_size: "${SQL_EDGE_EVENTS_BATCH_SIZE:1000}"
|
||||||
batch_max_delay: "${SQL_EDGE_EVENTS_BATCH_MAX_DELAY_MS:100}"
|
batch_max_delay: "${SQL_EDGE_EVENTS_BATCH_MAX_DELAY_MS:100}"
|
||||||
stats_print_interval_ms: "${SQL_EDGE_EVENTS_BATCH_STATS_PRINT_MS:10000}"
|
stats_print_interval_ms: "${SQL_EDGE_EVENTS_BATCH_STATS_PRINT_MS:10000}"
|
||||||
batch_threads: "${SQL_EDGE_EVENTS_BATCH_THREADS:3}" # batch thread count have to be a prime number like 3 or 5 to gain perfect hash distribution
|
|
||||||
# Specify whether to sort entities before batch update. Should be enabled for cluster mode to avoid deadlocks
|
# Specify whether to sort entities before batch update. Should be enabled for cluster mode to avoid deadlocks
|
||||||
batch_sort: "${SQL_BATCH_SORT:false}"
|
batch_sort: "${SQL_BATCH_SORT:false}"
|
||||||
# Specify whether to remove null characters from strValue of attributes and timeseries before insert
|
# Specify whether to remove null characters from strValue of attributes and timeseries before insert
|
||||||
|
|||||||
@ -72,9 +72,6 @@ public class JpaBaseEdgeEventDao extends JpaAbstractSearchTextDao<EdgeEventEntit
|
|||||||
@Value("${sql.edge_events.stats_print_interval_ms:10000}")
|
@Value("${sql.edge_events.stats_print_interval_ms:10000}")
|
||||||
private long statsPrintIntervalMs;
|
private long statsPrintIntervalMs;
|
||||||
|
|
||||||
@Value("${sql.edge_events.batch_threads:3}")
|
|
||||||
private int batchThreads;
|
|
||||||
|
|
||||||
private TbSqlBlockingQueueWrapper<EdgeEventEntity> queue;
|
private TbSqlBlockingQueueWrapper<EdgeEventEntity> queue;
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
@ -110,7 +107,7 @@ public class JpaBaseEdgeEventDao extends JpaAbstractSearchTextDao<EdgeEventEntit
|
|||||||
return NULL_UUID.hashCode();
|
return NULL_UUID.hashCode();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
queue = new TbSqlBlockingQueueWrapper<>(params, hashcodeFunction, batchThreads, statsFactory);
|
queue = new TbSqlBlockingQueueWrapper<>(params, hashcodeFunction, 1, statsFactory);
|
||||||
queue.init(logExecutor, v -> edgeEventInsertRepository.save(v),
|
queue.init(logExecutor, v -> edgeEventInsertRepository.save(v),
|
||||||
Comparator.comparing(EdgeEventEntity::getTs)
|
Comparator.comparing(EdgeEventEntity::getTs)
|
||||||
);
|
);
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user