Merge pull request #5765 from volodymyr-babak/edge-event-sort-fix-2
[3.3.3] Edge tests fixes #2
This commit is contained in:
commit
2734757f32
@ -501,8 +501,9 @@ public final class EdgeGrpcSession implements Closeable {
|
||||
|
||||
private ListenableFuture<List<Void>> updateQueueStartTs(Long newStartTs) {
|
||||
log.trace("[{}] updating QueueStartTs [{}][{}]", this.sessionId, edge.getId(), newStartTs);
|
||||
newStartTs = ++newStartTs; // increments ts by 1 - next edge event search starts from current offset + 1
|
||||
List<AttributeKvEntry> attributes = Collections.singletonList(new BaseAttributeKvEntry(new LongDataEntry(QUEUE_START_TS_ATTR_KEY, newStartTs), System.currentTimeMillis()));
|
||||
List<AttributeKvEntry> attributes = Collections.singletonList(
|
||||
new BaseAttributeKvEntry(
|
||||
new LongDataEntry(QUEUE_START_TS_ATTR_KEY, newStartTs), System.currentTimeMillis()));
|
||||
return ctx.getAttributesService().save(edge.getTenantId(), edge.getId(), DataConstants.SERVER_SCOPE, attributes);
|
||||
}
|
||||
|
||||
|
||||
@ -15,25 +15,13 @@
|
||||
*/
|
||||
package org.thingsboard.server.service.edge.rpc.fetch;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.thingsboard.server.common.data.BaseData;
|
||||
import org.thingsboard.server.common.data.edge.Edge;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEvent;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEventType;
|
||||
import org.thingsboard.server.common.data.id.EntityId;
|
||||
import org.thingsboard.server.common.data.id.EventId;
|
||||
import org.thingsboard.server.common.data.id.HasId;
|
||||
import org.thingsboard.server.common.data.id.HasUUID;
|
||||
import org.thingsboard.server.common.data.id.IdBased;
|
||||
import org.thingsboard.server.common.data.id.RuleChainId;
|
||||
import org.thingsboard.server.common.data.id.TenantId;
|
||||
import org.thingsboard.server.common.data.id.UUIDBased;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
import org.thingsboard.server.common.data.rule.RuleChain;
|
||||
import org.thingsboard.server.service.edge.rpc.EdgeEventUtils;
|
||||
import org.thingsboard.server.common.data.page.SortOrder;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
@ -64,6 +64,10 @@
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-databind</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.data</groupId>
|
||||
<artifactId>spring-data-commons</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-test</artifactId>
|
||||
|
||||
@ -18,9 +18,18 @@ package org.thingsboard.server.common.data.page;
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import lombok.Data;
|
||||
|
||||
import org.springframework.data.domain.Sort;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Data
|
||||
public class PageLink {
|
||||
|
||||
protected static final String DEFAULT_SORT_PROPERTY = "id";
|
||||
private static final Sort DEFAULT_SORT = Sort.by(Sort.Direction.ASC, DEFAULT_SORT_PROPERTY);
|
||||
|
||||
private final String textSearch;
|
||||
private final int pageSize;
|
||||
private final int page;
|
||||
@ -57,4 +66,28 @@ public class PageLink {
|
||||
return new PageLink(this.pageSize, this.page+1, this.textSearch, this.sortOrder);
|
||||
}
|
||||
|
||||
public Sort toSort(SortOrder sortOrder, Map<String,String> columnMap) {
|
||||
if (sortOrder == null) {
|
||||
return DEFAULT_SORT;
|
||||
} else {
|
||||
String property = sortOrder.getProperty();
|
||||
if (columnMap.containsKey(property)) {
|
||||
property = columnMap.get(property);
|
||||
}
|
||||
return Sort.by(Sort.Direction.fromString(sortOrder.getDirection().name()), property);
|
||||
}
|
||||
}
|
||||
|
||||
public Sort toSort(List<SortOrder> sortOrders, Map<String,String> columnMap) {
|
||||
return Sort.by(sortOrders.stream().map(s -> toSortOrder(s, columnMap)).collect(Collectors.toList()));
|
||||
}
|
||||
|
||||
private Sort.Order toSortOrder(SortOrder sortOrder, Map<String,String> columnMap) {
|
||||
String property = sortOrder.getProperty();
|
||||
if (columnMap.containsKey(property)) {
|
||||
property = columnMap.get(property);
|
||||
}
|
||||
return new Sort.Order(Sort.Direction.fromString(sortOrder.getDirection().name()), property, Sort.NullHandling.NULLS_LAST);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -19,6 +19,11 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.ToString;
|
||||
import org.springframework.data.domain.Sort;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@Data
|
||||
@ToString(callSuper = true)
|
||||
@ -61,4 +66,30 @@ public class TimePageLink extends PageLink {
|
||||
return new TimePageLink(this.getPageSize(), this.getPage()+1, this.getTextSearch(), this.getSortOrder(),
|
||||
this.startTime, this.endTime);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Sort toSort(SortOrder sortOrder, Map<String,String> columnMap) {
|
||||
if (sortOrder == null) {
|
||||
return super.toSort(sortOrder, columnMap);
|
||||
} else {
|
||||
return toSort(new ArrayList<>(List.of(sortOrder)), columnMap);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Sort toSort(List<SortOrder> sortOrders, Map<String,String> columnMap) {
|
||||
if (!isDefaultSortOrderAvailable(sortOrders)) {
|
||||
sortOrders.add(new SortOrder(DEFAULT_SORT_PROPERTY, SortOrder.Direction.ASC));
|
||||
}
|
||||
return super.toSort(sortOrders, columnMap);
|
||||
}
|
||||
|
||||
private boolean isDefaultSortOrderAvailable(List<SortOrder> sortOrders) {
|
||||
for (SortOrder sortOrder : sortOrders) {
|
||||
if (DEFAULT_SORT_PROPERTY.equals(sortOrder.getProperty())) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@ -18,7 +18,7 @@ package org.thingsboard.server.dao;
|
||||
import org.springframework.data.domain.Page;
|
||||
import org.springframework.data.domain.PageRequest;
|
||||
import org.springframework.data.domain.Pageable;
|
||||
import org.springframework.data.domain.Sort;
|
||||
|
||||
import org.thingsboard.server.common.data.id.UUIDBased;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.PageLink;
|
||||
@ -32,13 +32,9 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public abstract class DaoUtil {
|
||||
|
||||
public static final String DEFAULT_SORT_PROPERTY = "id";
|
||||
public static final Sort DEFAULT_SORT = Sort.by(Sort.Direction.ASC, DEFAULT_SORT_PROPERTY);
|
||||
|
||||
private DaoUtil() {
|
||||
}
|
||||
|
||||
@ -56,7 +52,7 @@ public abstract class DaoUtil {
|
||||
}
|
||||
|
||||
public static Pageable toPageable(PageLink pageLink, Map<String,String> columnMap) {
|
||||
return PageRequest.of(pageLink.getPage(), pageLink.getPageSize(), toSort(pageLink.getSortOrder(), columnMap));
|
||||
return PageRequest.of(pageLink.getPage(), pageLink.getPageSize(), pageLink.toSort(pageLink.getSortOrder(), columnMap));
|
||||
}
|
||||
|
||||
public static Pageable toPageable(PageLink pageLink, List<SortOrder> sortOrders) {
|
||||
@ -64,43 +60,7 @@ public abstract class DaoUtil {
|
||||
}
|
||||
|
||||
public static Pageable toPageable(PageLink pageLink, Map<String,String> columnMap, List<SortOrder> sortOrders) {
|
||||
return PageRequest.of(pageLink.getPage(), pageLink.getPageSize(), toSort(sortOrders, columnMap));
|
||||
}
|
||||
|
||||
public static Sort toSort(SortOrder sortOrder) {
|
||||
return toSort(sortOrder, Collections.emptyMap());
|
||||
}
|
||||
|
||||
public static Sort toSort(SortOrder sortOrder, Map<String,String> columnMap) {
|
||||
if (sortOrder == null) {
|
||||
return DEFAULT_SORT;
|
||||
} else {
|
||||
String property = sortOrder.getProperty();
|
||||
if (columnMap.containsKey(property)) {
|
||||
property = columnMap.get(property);
|
||||
}
|
||||
return Sort.by(Sort.Direction.fromString(sortOrder.getDirection().name()), property);
|
||||
}
|
||||
}
|
||||
|
||||
public static Sort toSort(List<SortOrder> sortOrders) {
|
||||
return toSort(sortOrders, Collections.emptyMap());
|
||||
}
|
||||
|
||||
public static Sort toSort(List<SortOrder> sortOrders, Map<String,String> columnMap) {
|
||||
return toSort(sortOrders, columnMap, Sort.NullHandling.NULLS_LAST);
|
||||
}
|
||||
|
||||
public static Sort toSort(List<SortOrder> sortOrders, Map<String,String> columnMap, Sort.NullHandling nullHandlingHint) {
|
||||
return Sort.by(sortOrders.stream().map(s -> toSortOrder(s, columnMap, nullHandlingHint)).collect(Collectors.toList()));
|
||||
}
|
||||
|
||||
public static Sort.Order toSortOrder(SortOrder sortOrder, Map<String,String> columnMap, Sort.NullHandling nullHandlingHint) {
|
||||
String property = sortOrder.getProperty();
|
||||
if (columnMap.containsKey(property)) {
|
||||
property = columnMap.get(property);
|
||||
}
|
||||
return new Sort.Order(Sort.Direction.fromString(sortOrder.getDirection().name()), property, nullHandlingHint);
|
||||
return PageRequest.of(pageLink.getPage(), pageLink.getPageSize(), pageLink.toSort(sortOrders, columnMap));
|
||||
}
|
||||
|
||||
public static <T> List<T> convertDataList(Collection<? extends ToData<T>> toDataList) {
|
||||
|
||||
@ -30,7 +30,7 @@ public interface EdgeEventRepository extends PagingAndSortingRepository<EdgeEven
|
||||
@Query("SELECT e FROM EdgeEventEntity e WHERE " +
|
||||
"e.tenantId = :tenantId " +
|
||||
"AND e.edgeId = :edgeId " +
|
||||
"AND (:startTime IS NULL OR e.createdTime >= :startTime) " +
|
||||
"AND (:startTime IS NULL OR e.createdTime > :startTime) " +
|
||||
"AND (:endTime IS NULL OR e.createdTime <= :endTime) " +
|
||||
"AND LOWER(e.edgeEventType) LIKE LOWER(CONCAT('%', :textSearch, '%'))"
|
||||
)
|
||||
@ -44,7 +44,7 @@ public interface EdgeEventRepository extends PagingAndSortingRepository<EdgeEven
|
||||
@Query("SELECT e FROM EdgeEventEntity e WHERE " +
|
||||
"e.tenantId = :tenantId " +
|
||||
"AND e.edgeId = :edgeId " +
|
||||
"AND (:startTime IS NULL OR e.createdTime >= :startTime) " +
|
||||
"AND (:startTime IS NULL OR e.createdTime > :startTime) " +
|
||||
"AND (:endTime IS NULL OR e.createdTime <= :endTime) " +
|
||||
"AND e.edgeEventAction <> 'TIMESERIES_UPDATED' " +
|
||||
"AND LOWER(e.edgeEventType) LIKE LOWER(CONCAT('%', :textSearch, '%'))"
|
||||
|
||||
@ -16,23 +16,19 @@
|
||||
package org.thingsboard.server.dao.sql.edge;
|
||||
|
||||
import com.datastax.oss.driver.api.core.uuid.Uuids;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.data.repository.CrudRepository;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEvent;
|
||||
import org.thingsboard.server.common.data.edge.EdgeEventActionType;
|
||||
import org.thingsboard.server.common.data.id.EdgeEventId;
|
||||
import org.thingsboard.server.common.data.id.EdgeId;
|
||||
import org.thingsboard.server.common.data.id.EventId;
|
||||
import org.thingsboard.server.common.data.page.PageData;
|
||||
import org.thingsboard.server.common.data.page.TimePageLink;
|
||||
import org.thingsboard.server.dao.DaoUtil;
|
||||
import org.thingsboard.server.dao.edge.EdgeEventDao;
|
||||
import org.thingsboard.server.dao.model.sql.EdgeEventEntity;
|
||||
import org.thingsboard.server.dao.model.sql.EventEntity;
|
||||
import org.thingsboard.server.dao.sql.JpaAbstractSearchTextDao;
|
||||
|
||||
import java.sql.Connection;
|
||||
|
||||
@ -106,7 +106,7 @@ public abstract class BaseEdgeEventServiceTest extends AbstractServiceTest {
|
||||
EdgeId edgeId = new EdgeId(Uuids.timeBased());
|
||||
DeviceId deviceId = new DeviceId(Uuids.timeBased());
|
||||
TenantId tenantId = new TenantId(Uuids.timeBased());
|
||||
TimePageLink pageLink = new TimePageLink(1);
|
||||
TimePageLink pageLink = new TimePageLink(1, 0, null, new SortOrder("createdTime", SortOrder.Direction.ASC));
|
||||
|
||||
EdgeEvent edgeEventWithTsUpdate = generateEdgeEvent(tenantId, edgeId, deviceId, EdgeEventActionType.TIMESERIES_UPDATED);
|
||||
edgeEventService.save(edgeEventWithTsUpdate);
|
||||
|
||||
6
pom.xml
6
pom.xml
@ -40,6 +40,7 @@
|
||||
<jakarta.xml.bind-api.version>2.3.2</jakarta.xml.bind-api.version>
|
||||
<jaxb-runtime.version>2.3.2</jaxb-runtime.version>
|
||||
<spring-boot.version>2.3.12.RELEASE</spring-boot.version>
|
||||
<spring-data.version>2.3.9.RELEASE</spring-data.version>
|
||||
<spring.version>5.2.16.RELEASE</spring.version>
|
||||
<spring-redis.version>5.2.11.RELEASE</spring-redis.version>
|
||||
<spring-security.version>5.4.7</spring-security.version>
|
||||
@ -1130,6 +1131,11 @@
|
||||
<artifactId>spring-boot-starter-data-jpa</artifactId>
|
||||
<version>${spring-boot.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.data</groupId>
|
||||
<artifactId>spring-data-commons</artifactId>
|
||||
<version>${spring-data.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
|
||||
@ -98,9 +98,7 @@ export class EdgeDownlinkTableConfig extends EntityTableConfig<EdgeEvent, TimePa
|
||||
map[attribute.key] = attribute;
|
||||
return map;
|
||||
}, {});
|
||||
if (edge.queueStartTs) {
|
||||
this.queueStartTs = edge.queueStartTs.lastUpdateTs;
|
||||
}
|
||||
this.queueStartTs = edge.queueStartTs && edge.queueStartTs.value ? edge.queueStartTs.value : 0;
|
||||
}
|
||||
|
||||
private updateColumns(updateTableColumns: boolean = false): void {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user