Optimized postgres sync (#3208)

* optimized postgres sync

* Fix consumer stopping when stopWhenRead enabled

* Fix NPE on EDQS repartitioning

* fixed EntityServiceTest

* Fix EDQS yml props

---------

Co-authored-by: ViacheslavKlimov <viacheslavklimov11@gmail.com>
This commit is contained in:
ViacheslavKlimov 2025-01-30 12:43:14 +02:00
parent 86b5378d59
commit c8db304ce6
27 changed files with 426 additions and 653 deletions

View File

@ -125,6 +125,7 @@ public class DefaultEdqsService implements EdqsService {
executor.submit(() -> {
try {
EdqsSyncState syncState = getSyncState();
// FIXME: Slavik smart events check
if (edqsSyncService.isSyncNeeded() || syncState == null || syncState.getStatus() != EdqsSyncStatus.FINISHED) {
if (hashPartitionService.isSystemPartitionMine(ServiceType.TB_CORE)) {
processSystemRequest(ToCoreEdqsRequest.builder()

View File

@ -1,539 +0,0 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.edqs;
import com.fasterxml.jackson.databind.MappingIterator;
import com.fasterxml.jackson.dataformat.csv.CsvMapper;
import com.fasterxml.jackson.dataformat.csv.CsvSchema;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.ApiUsageState;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.Customer;
import org.thingsboard.server.common.data.Dashboard;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.DeviceProfileType;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.EntityView;
import org.thingsboard.server.common.data.ObjectType;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.asset.AssetProfile;
import org.thingsboard.server.common.data.converter.Converter;
import org.thingsboard.server.common.data.converter.ConverterType;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edqs.AttributeKv;
import org.thingsboard.server.common.data.edqs.LatestTsKv;
import org.thingsboard.server.common.data.group.EntityGroup;
import org.thingsboard.server.common.data.id.ApiUsageStateId;
import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.AssetProfileId;
import org.thingsboard.server.common.data.id.ConverterId;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DashboardId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.EdgeId;
import org.thingsboard.server.common.data.id.EntityGroupId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.EntityViewId;
import org.thingsboard.server.common.data.id.IntegrationId;
import org.thingsboard.server.common.data.id.RoleId;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.SchedulerEventId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.TenantProfileId;
import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.common.data.id.WidgetTypeId;
import org.thingsboard.server.common.data.id.WidgetsBundleId;
import org.thingsboard.server.common.data.integration.Integration;
import org.thingsboard.server.common.data.integration.IntegrationType;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.BooleanDataEntry;
import org.thingsboard.server.common.data.kv.DoubleDataEntry;
import org.thingsboard.server.common.data.kv.JsonDataEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
import org.thingsboard.server.common.data.role.Role;
import org.thingsboard.server.common.data.role.RoleType;
import org.thingsboard.server.common.data.rule.RuleChain;
import org.thingsboard.server.common.data.scheduler.SchedulerEvent;
import org.thingsboard.server.common.data.widget.WidgetType;
import org.thingsboard.server.common.data.widget.WidgetsBundle;
import org.thingsboard.server.common.msg.edqs.EdqsService;
import org.thingsboard.server.edqs.processor.EdqsConverter;
import java.io.FileReader;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import static org.thingsboard.common.util.JacksonUtil.toJsonNode;
@RequiredArgsConstructor
@Slf4j
//@Service
public class EdqsDataLoader {
private final EdqsService edqsService;
private final EdqsConverter edqsConverter;
public final static TenantId MAIN = TenantId.fromUUID(UUID.fromString("2a209df0-c7ff-11ea-a3e0-f321b0429d60"));
private final String folder = "/home/viacheslav/Downloads/schwarz";
private ExecutorService executor = Executors.newFixedThreadPool(5, ThingsBoardThreadFactory.forName("edqs-publisher"));
// @AfterStartUp(order = 100)
public void load() throws Exception {
loadCustomers();
loadDeviceProfile();
loadDevices();
loadAssets();
loadEdges();
loadEntityViews();
loadTenants();
loadUsers();
loadDashboards();
loadRuleChains();
loadWidgetType();
loadWidgetBundle();
loadConverters();
loadIntegrations();
loadSchedulerEvents();
loadRoles();
loadApiUsageStates();
loadAssetProfile();
loadEntityGroups();
loadRelations();
loadAttributes();
loadTs();
}
private void loadCustomers() throws Exception {
load("customer.csv", (values) -> {
Customer customer = new Customer();
customer.setTitle(values.get("title"));
customer.setId(new CustomerId(UUID.fromString(values.get("id"))));
customer.setCreatedTime(Long.parseLong(values.get("created_time")));
customer.setTenantId(tenantId(values.get("tenant_id")));
var parentCustomerId = values.get("parent_customer_id");
if (StringUtils.isNotEmpty(parentCustomerId)) {
customer.setParentCustomerId(new CustomerId(UUID.fromString(parentCustomerId)));
}
edqsService.onUpdate(customer.getTenantId(), customer.getId(), customer);
});
}
private void loadDevices() throws Exception {
load("device.csv", (values) -> {
Device device = new Device();
device.setType(values.get("type"));
device.setName(values.get("name"));
device.setLabel(values.get("label"));
device.setId(new DeviceId(uuid(values.get("id"))));
device.setCreatedTime(parseLong(values.get("created_time")));
device.setCustomerId(customerId(values.get("customer_id")));
device.setTenantId(tenantId(values.get("tenant_id")));
device.setDeviceProfileId(new DeviceProfileId(uuid(values.get("device_profile_id"))));
device.setAdditionalInfo(toJsonNode(values.get("additional_info")));
edqsService.onUpdate(device.getTenantId(), device.getId(), device);
});
}
private void loadAssets() throws Exception {
load("asset.csv", (values) -> {
Asset asset = new Asset();
asset.setType(values.get("type"));
asset.setName(values.get("name"));
asset.setLabel(values.get("label"));
asset.setId(new AssetId(uuid(values.get("id"))));
asset.setCreatedTime(parseLong(values.get("created_time")));
asset.setCustomerId(customerId(values.get("customer_id")));
asset.setTenantId(tenantId(values.get("tenant_id")));
asset.setAssetProfileId(new AssetProfileId(uuid(values.get("asset_profile_id"))));
asset.setAdditionalInfo(toJsonNode(values.get("additional_info")));
edqsService.onUpdate(asset.getTenantId(), asset.getId(), asset);
});
}
private void loadEdges() throws Exception {
load("edge.csv", (values) -> {
Edge edge = new Edge();
edge.setId(new EdgeId(uuid(values.get("id"))));
edge.setCreatedTime(parseLong(values.get("created_time")));
edge.setType(values.get("type"));
edge.setName(values.get("name"));
edge.setLabel(values.get("label"));
edge.setCustomerId(customerId(values.get("customer_id")));
edge.setTenantId(tenantId(values.get("tenant_id")));
edge.setAdditionalInfo(toJsonNode(values.get("additional_info")));
edqsService.onUpdate(edge.getTenantId(), edge.getId(), edge);
});
}
private void loadEntityViews() throws Exception {
load("entity_view.csv", (values) -> {
EntityView entityView = new EntityView();
entityView.setId(new EntityViewId(uuid(values.get("id"))));
entityView.setCreatedTime(parseLong(values.get("created_time")));
entityView.setType(values.get("type"));
entityView.setName(values.get("name"));
entityView.setCustomerId(customerId(values.get("customer_id")));
entityView.setTenantId(tenantId(values.get("tenant_id")));
entityView.setAdditionalInfo(toJsonNode(values.get("additional_info")));
edqsService.onUpdate(entityView.getTenantId(), entityView.getId(), entityView);
});
}
private void loadTenants() throws Exception {
load("tenant.csv", (values) -> {
Tenant tenant = new Tenant();
tenant.setId(new TenantId(uuid(values.get("id"))));
tenant.setCreatedTime(parseLong(values.get("created_time")));
tenant.setEmail(values.get("email"));
tenant.setTitle(values.get("title"));
tenant.setCountry(values.get("country"));
tenant.setState(values.get("state"));
tenant.setCity(values.get("city"));
tenant.setAddress(values.get("address"));
tenant.setAddress2(values.get("address2"));
tenant.setZip(values.get("zip"));
tenant.setPhone(values.get("phone"));
tenant.setRegion(values.get("region"));
tenant.setTenantProfileId(new TenantProfileId(uuid(values.get("tenant_profile_id"))));
tenant.setAdditionalInfo(toJsonNode(values.get("additional_info")));
edqsService.onUpdate(MAIN, tenant.getId(), tenant);
});
}
private void loadUsers() throws Exception {
load("user.csv", (values) -> {
User user = new User();
user.setId(new UserId(uuid(values.get("id"))));
user.setCreatedTime(parseLong(values.get("created_time")));
user.setTenantId(tenantId(values.get("tenant_id")));
user.setFirstName(values.get("first_name"));
user.setLastName(values.get("last_name"));
user.setEmail(values.get("email"));
user.setPhone(values.get("phone"));
user.setAdditionalInfo(toJsonNode(values.get("additional_info")));
edqsService.onUpdate(user.getTenantId(), user.getId(), user);
});
}
private void loadDashboards() throws Exception {
load("dashboard.csv", (values) -> {
Dashboard dashboard = new Dashboard();
dashboard.setId(new DashboardId(uuid(values.get("id"))));
dashboard.setCreatedTime(parseLong(values.get("created_time")));
dashboard.setTenantId(tenantId(values.get("tenant_id")));
dashboard.setTitle(values.get("title"));
edqsService.onUpdate(dashboard.getTenantId(), dashboard.getId(), dashboard);
});
}
private void loadEntityGroups() throws Exception {
load("entity_group.csv", (values) -> {
EntityGroup entityGroup = new EntityGroup();
entityGroup.setId(new EntityGroupId(uuid(values.get("id"))));
entityGroup.setCreatedTime(parseLong(values.get("created_time")));
entityGroup.setName(values.get("name"));
entityGroup.setOwnerId(entityId(values.get("owner_type"), values.get("owner_id")));
entityGroup.setType(EntityType.valueOf(values.get("type")));
edqsService.onUpdate(MAIN, entityGroup.getId(), entityGroup);
});
}
private void loadRelations() throws Exception {
load("relation.csv", (values) -> {
EntityRelation entityRelation = new EntityRelation();
entityRelation.setFrom(entityId(values.get("from_type"), values.get("from_id")));
entityRelation.setTo(entityId(values.get("to_type"), values.get("to_id")));
entityRelation.setTypeGroup(RelationTypeGroup.valueOf(values.get("relation_type_group")));
entityRelation.setType(values.get("relation_type"));
edqsService.onUpdate(MAIN, ObjectType.RELATION, entityRelation);
});
}
private void loadRuleChains() throws Exception {
load("rule_chain.csv", (values) -> {
RuleChain ruleChain = new RuleChain();
ruleChain.setId(new RuleChainId(uuid(values.get("id"))));
ruleChain.setCreatedTime(parseLong(values.get("created_time")));
ruleChain.setName(values.get("name"));
ruleChain.setTenantId(tenantId(values.get("tenant_id")));
ruleChain.setAdditionalInfo(toJsonNode(values.get("additional_info")));
edqsService.onUpdate(ruleChain.getTenantId(), ruleChain.getId(), ruleChain);
});
}
private void loadWidgetType() throws Exception {
load("widget_type.csv", (values) -> {
WidgetType widgetType = new WidgetType();
widgetType.setId(new WidgetTypeId(uuid(values.get("id"))));
widgetType.setCreatedTime(parseLong(values.get("created_time")));
widgetType.setName(values.get("name"));
widgetType.setTenantId(tenantId(values.get("tenant_id")));
edqsService.onUpdate(widgetType.getTenantId(), widgetType.getId(), widgetType);
});
}
private void loadWidgetBundle() throws Exception {
load("widgets_bundle.csv", (values) -> {
WidgetsBundle widgetsBundle = new WidgetsBundle();
widgetsBundle.setId(new WidgetsBundleId(uuid(values.get("id"))));
widgetsBundle.setCreatedTime(parseLong(values.get("created_time")));
widgetsBundle.setTitle(values.get("title"));
widgetsBundle.setTenantId(tenantId(values.get("tenant_id")));
edqsService.onUpdate(widgetsBundle.getTenantId(), widgetsBundle.getId(), widgetsBundle);
});
}
private void loadConverters() throws Exception {
load("converter.csv", (values) -> {
Converter converter = new Converter();
converter.setId(new ConverterId(uuid(values.get("id"))));
converter.setCreatedTime(parseLong(values.get("created_time")));
converter.setName(values.get("name"));
converter.setType(ConverterType.valueOf(values.get("type")));
converter.setTenantId(tenantId(values.get("tenant_id")));
converter.setEdgeTemplate(parseBoolean(values.get("is_edge_template")));
converter.setAdditionalInfo(toJsonNode(values.get("additional_info")));
edqsService.onUpdate(converter.getTenantId(), converter.getId(), converter);
});
}
private void loadIntegrations() throws Exception {
load("integration.csv", (values) -> {
Integration integration = new Integration();
integration.setId(new IntegrationId(uuid(values.get("id"))));
integration.setCreatedTime(parseLong(values.get("created_time")));
integration.setName(values.get("name"));
integration.setType(IntegrationType.valueOf(values.get("type")));
integration.setTenantId(tenantId(values.get("tenant_id")));
integration.setEdgeTemplate(parseBoolean(values.get("is_edge_template")));
integration.setAdditionalInfo(toJsonNode(values.get("additional_info")));
edqsService.onUpdate(integration.getTenantId(), integration.getId(), integration);
});
}
private void loadSchedulerEvents() throws Exception {
load("scheduler_event.csv", (values) -> {
SchedulerEvent schedulerEvent = new SchedulerEvent();
schedulerEvent.setId(new SchedulerEventId(uuid(values.get("id"))));
schedulerEvent.setCreatedTime(parseLong(values.get("created_time")));
schedulerEvent.setName(values.get("name"));
schedulerEvent.setType(values.get("type"));
schedulerEvent.setTenantId(tenantId(values.get("tenant_id")));
schedulerEvent.setConfiguration(toJsonNode(values.get("configuration")));
schedulerEvent.setSchedule(toJsonNode(values.get("schedule")));
schedulerEvent.setOriginatorId(entityId(values.get("originator_type"), values.get("originator_id")));
schedulerEvent.setAdditionalInfo(toJsonNode(values.get("additional_info")));
edqsService.onUpdate(schedulerEvent.getTenantId(), schedulerEvent.getId(), schedulerEvent);
});
}
private void loadRoles() throws Exception {
load("role.csv", (values) -> {
Role role = new Role();
role.setId(new RoleId(uuid(values.get("id"))));
role.setCreatedTime(parseLong(values.get("created_time")));
role.setName(values.get("name"));
role.setType(RoleType.valueOf(values.get("type")));
role.setTenantId(tenantId(values.get("tenant_id")));
role.setAdditionalInfo(toJsonNode(values.get("additional_info")));
edqsService.onUpdate(role.getTenantId(), role.getId(), role);
});
}
private void loadApiUsageStates() throws Exception {
load("api_usage_state.csv", (values) -> {
ApiUsageState apiUsageState = new ApiUsageState();
apiUsageState.setId(new ApiUsageStateId(uuid(values.get("id"))));
apiUsageState.setCreatedTime(parseLong(values.get("created_time")));
apiUsageState.setEntityId(entityId(values.get("entity_type"), values.get("entity_id")));
apiUsageState.setTenantId(tenantId(values.get("tenant_id")));
edqsService.onUpdate(apiUsageState.getTenantId(), apiUsageState.getId(), apiUsageState);
});
}
private void loadDeviceProfile() throws Exception {
load("device_profile.csv", (values) -> {
DeviceProfile deviceProfile = new DeviceProfile();
deviceProfile.setId(new DeviceProfileId(uuid(values.get("id"))));
deviceProfile.setCreatedTime(parseLong(values.get("created_time")));
deviceProfile.setName(values.get("name"));
deviceProfile.setType(DeviceProfileType.valueOf(values.get("type")));
deviceProfile.setTenantId(tenantId(values.get("tenant_id")));
edqsService.onUpdate(deviceProfile.getTenantId(), deviceProfile.getId(), deviceProfile);
});
}
private void loadAssetProfile() throws Exception {
load("asset_profile.csv", (values) -> {
AssetProfile assetProfile = new AssetProfile();
assetProfile.setId(new AssetProfileId(uuid(values.get("id"))));
assetProfile.setCreatedTime(parseLong(values.get("created_time")));
assetProfile.setName(values.get("name"));
assetProfile.setTenantId(tenantId(values.get("tenant_id")));
edqsService.onUpdate(assetProfile.getTenantId(), assetProfile.getId(), assetProfile);
});
}
private void loadAttributes() throws Exception {
load("attribute.csv", (values) -> {
EntityId entityId = EntityIdFactory.getByTypeAndId(values.get("entity_type"), values.get("entity_id"));
long ts = parseLong(values.get("last_update_ts"));
AttributeScope scope = AttributeScope.valueOf(values.get("attribute_type"));
String key = values.get("attribute_key");
KvEntry kvEntry;
if (StringUtils.isNotEmpty(values.get("bool_v"))) {
kvEntry = new BooleanDataEntry(key, "t".equals(values.get("bool_v")));
} else if (StringUtils.isNotEmpty(values.get("str_v"))) {
kvEntry = new StringDataEntry(key, values.get("str_v"));
} else if (StringUtils.isNotEmpty(values.get("long_v"))) {
kvEntry = new LongDataEntry(key, parseLong(values.get("long_v")));
} else if (StringUtils.isNotEmpty(values.get("dbl_v"))) {
kvEntry = new DoubleDataEntry(key, Double.parseDouble(values.get("dbl_v")));
} else if (StringUtils.isNotEmpty(values.get("json_v"))) {
kvEntry = new JsonDataEntry(key, values.get("json_v"));
} else {
kvEntry = new StringDataEntry(key, "");
}
AttributeKvEntry attributeKvEntry = new BaseAttributeKvEntry(ts, kvEntry);
AttributeKv attributeKv = new AttributeKv(entityId, scope, attributeKvEntry, 0);
edqsService.onUpdate(MAIN, ObjectType.ATTRIBUTE_KV, attributeKv);
});
}
private void loadTs() throws Exception {
load("ts_kv.csv", (values) -> {
var entityTypeStr = values.get("find_entity_type");
if (StringUtils.isEmpty(entityTypeStr)) {
return;
}
EntityId entityId = EntityIdFactory.getByTypeAndId(values.get("find_entity_type"), values.get("entity_id"));
long ts = parseLong(values.get("ts"));
String key = values.get("key");
KvEntry kvEntry;
if (StringUtils.isNotEmpty(values.get("bool_v"))) {
kvEntry = new BooleanDataEntry(key, "t".equals(values.get("bool_v")));
} else if (StringUtils.isNotEmpty(values.get("str_v"))) {
kvEntry = new StringDataEntry(key, values.get("str_v"));
} else if (StringUtils.isNotEmpty(values.get("long_v"))) {
kvEntry = new LongDataEntry(key, parseLong(values.get("long_v")));
} else if (StringUtils.isNotEmpty(values.get("dbl_v"))) {
kvEntry = new DoubleDataEntry(key, Double.parseDouble(values.get("dbl_v")));
} else if (StringUtils.isNotEmpty(values.get("json_v"))) {
kvEntry = new JsonDataEntry(key, values.get("json_v"));
} else {
kvEntry = new StringDataEntry(key, "");
}
BasicTsKvEntry tsKvEntry = new BasicTsKvEntry(ts, kvEntry);
edqsService.onUpdate(MAIN, ObjectType.LATEST_TS_KV, new LatestTsKv(entityId, tsKvEntry, 0L));
});
}
private void load(String file, Consumer<Map<String, String>> function) throws Exception {
Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("loader-" + file)).submit(() -> {
try {
long ts = System.currentTimeMillis();
CsvSchema schema = CsvSchema.emptySchema().withHeader().withColumnSeparator('|');
CsvMapper mapper = new CsvMapper();
MappingIterator<Map<String, String>> it = mapper
.readerFor(Map.class)
.with(schema)
.readValues(new FileReader(folder + "/" + file));
int success = 0;
int failure = 0;
while (it.hasNextValue()) {
Map<String, String> row = it.nextValue();
try {
function.accept(row);
success++;
if (success % 1000 == 0) {
log.info("Loaded [{}] from [{}]", success, file);
}
} catch (Exception e) {
log.error("Failed to parse str: [{}]", row, e);
failure++;
}
}
log.info("Loaded [{}] from [{}] in {}ms. Failures {}", success, file, (System.currentTimeMillis() - ts), failure);
} catch (Throwable t) {
log.error("Failed to load data from [{}]", file, t);
}
});
}
private static TenantId tenantId(String id) {
return TenantId.fromUUID(UUID.fromString(id));
}
private static CustomerId customerId(String id) {
var c = new CustomerId(UUID.fromString(id));
return c.isNullUid() ? null : c;
}
private static EntityId entityId(String type, String id) {
return EntityIdFactory.getByTypeAndId(type, id);
}
private static UUID uuid(String id) {
return UUID.fromString(id);
}
private static long parseLong(String time) {
return Long.parseLong(time);
}
private static boolean parseBoolean(String value) {
return Boolean.parseBoolean(value);
}
}

View File

@ -32,8 +32,6 @@ import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageDataIterable;
import org.thingsboard.server.common.data.page.SortOrder;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
import org.thingsboard.server.dao.Dao;
import org.thingsboard.server.dao.attributes.AttributesDao;
@ -41,13 +39,15 @@ import org.thingsboard.server.dao.dictionary.KeyDictionaryDao;
import org.thingsboard.server.dao.entity.EntityDaoRegistry;
import org.thingsboard.server.dao.group.EntityGroupDao;
import org.thingsboard.server.dao.model.sql.AttributeKvEntity;
import org.thingsboard.server.dao.model.sql.RelationEntity;
import org.thingsboard.server.dao.model.sqlts.dictionary.KeyDictionaryEntry;
import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestEntity;
import org.thingsboard.server.dao.relation.RelationDao;
import org.thingsboard.server.dao.sql.relation.RelationRepository;
import org.thingsboard.server.dao.sqlts.latest.TsKvLatestRepository;
import org.thingsboard.server.dao.tenant.TenantDao;
import org.thingsboard.server.dao.timeseries.TimeseriesLatestDao;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
@ -92,11 +92,11 @@ public abstract class EdqsSyncService {
@Autowired
private KeyDictionaryDao keyDictionaryDao;
@Autowired
private RelationDao relationDao;
private RelationRepository relationRepository;
@Autowired
private EntityGroupDao entityGroupDao;
@Autowired
private TimeseriesLatestDao timeseriesLatestDao;
private TsKvLatestRepository tsKvLatestRepository;
@Autowired
@Lazy
private DefaultEdqsService edqsService;
@ -171,7 +171,7 @@ public abstract class EdqsSyncService {
private void syncEntityGroups() {
log.info("Synchronizing entity groups to EDQS");
long ts = System.currentTimeMillis();
var entityGroups = new PageDataIterable<>(entityGroupDao::findAllFields, 10000);
var entityGroups = new PageDataIterable<>(entityGroupDao::findAllFields, 30000);
for (EntityFields groupFields : entityGroups) {
EntityIdInfo entityIdInfo = entityInfoMap.get(groupFields.getOwnerId());
if (entityIdInfo != null) {
@ -187,18 +187,43 @@ public abstract class EdqsSyncService {
private void syncRelations() {
log.info("Synchronizing relations to EDQS");
long ts = System.currentTimeMillis();
var relations = new PageDataIterable<>(relationDao::findAll, 10000);
for (EntityRelation relation : relations) {
if (relation.getTypeGroup() == RelationTypeGroup.COMMON || relation.getTypeGroup() == RelationTypeGroup.FROM_ENTITY_GROUP) {
EntityIdInfo entityIdInfo = entityInfoMap.get(relation.getFrom().getId());
UUID lastFromEntityId = UUID.fromString("00000000-0000-0000-0000-000000000000");
String lastFromEntityType = "";
String lastRelationTypeGroup = "";
String lastRelationType = "";
UUID lastToEntityId = UUID.fromString("00000000-0000-0000-0000-000000000000");
String lastToEntityType = "";
while (true) {
List<RelationEntity> batch = relationRepository.findNextBatch(lastFromEntityId, lastFromEntityType, lastRelationTypeGroup,
lastRelationType, lastToEntityId, lastToEntityType, 10000);
if (batch.isEmpty()) {
break;
}
processRelationBatch(batch);
RelationEntity lastRecord = batch.get(batch.size() - 1);
lastFromEntityId = lastRecord.getFromId();
lastFromEntityType = lastRecord.getFromType();
lastRelationTypeGroup = lastRecord.getRelationTypeGroup();
lastRelationType = lastRecord.getRelationType();
lastToEntityId = lastRecord.getToId();
lastToEntityType = lastRecord.getToType();
}
log.info("Finished synchronizing relations to EDQS in {} ms", (System.currentTimeMillis() - ts));
}
private void processRelationBatch(List<RelationEntity> relations) {
for (RelationEntity relation : relations) {
if (RelationTypeGroup.COMMON.name().equals(relation.getRelationTypeGroup()) || (RelationTypeGroup.FROM_ENTITY_GROUP.name().equals(relation.getRelationTypeGroup()))) {
EntityIdInfo entityIdInfo = entityInfoMap.get(relation.getFromId());
if (entityIdInfo != null) {
process(entityIdInfo.tenantId(), RELATION, relation);
process(entityIdInfo.tenantId(), RELATION, relation.toData());
} else {
log.info("Relation from entity not found: " + relation.getFrom());
log.info("Relation from entity not found: " + relation.getFromId());
}
}
}
log.info("Finished synchronizing relations to EDQS in {} ms", (System.currentTimeMillis() - ts));
}
private void loadKeyDictionary() {
@ -214,8 +239,28 @@ public abstract class EdqsSyncService {
private void syncAttributes() {
log.info("Synchronizing attributes to EDQS");
long ts = System.currentTimeMillis();
var attributes = new PageDataIterable<>(attributesDao::findAll, 10000);
for (AttributeKvEntity attribute : attributes) {
UUID lastEntityId = UUID.fromString("00000000-0000-0000-0000-000000000000");
int lastAttributeType = Integer.MIN_VALUE;
int lastAttributeKey = Integer.MIN_VALUE;
while (true) {
List<AttributeKvEntity> batch = attributesDao.findNextBatch(lastEntityId, lastAttributeType, lastAttributeKey, 10000);
if (batch.isEmpty()) {
break;
}
processAttributeBatch(batch);
AttributeKvEntity lastRecord = batch.get(batch.size() - 1);
lastEntityId = lastRecord.getId().getEntityId();
lastAttributeType = lastRecord.getId().getAttributeType();
lastAttributeKey = lastRecord.getId().getAttributeKey();
}
log.info("Finished synchronizing attributes to EDQS in {} ms", (System.currentTimeMillis() - ts));
}
private void processAttributeBatch(List<AttributeKvEntity> batch) {
for (AttributeKvEntity attribute : batch) {
attribute.setStrKey(getStrKeyOrFetchFromDb(attribute.getId().getAttributeKey()));
UUID entityId = attribute.getId().getEntityId();
EntityIdInfo entityIdInfo = entityInfoMap.get(entityId);
@ -230,13 +275,29 @@ public abstract class EdqsSyncService {
attribute.getVersion());
process(entityIdInfo.tenantId(), ATTRIBUTE_KV, attributeKv);
}
log.info("Finished synchronizing attributes to EDQS in {} ms", (System.currentTimeMillis() - ts));
}
private void syncLatestTimeseries() {
log.info("Synchronizing latest timeseries to EDQS");
long ts = System.currentTimeMillis();
var tsKvLatestEntities = new PageDataIterable<>(pageLink -> timeseriesLatestDao.findAllLatest(pageLink), 10000);
UUID lastEntityId = UUID.fromString("00000000-0000-0000-0000-000000000000");
int lastKey = Integer.MIN_VALUE;
while (true) {
List<TsKvLatestEntity> batch = tsKvLatestRepository.findNextBatch(lastEntityId, lastKey, 10000);
if (batch.isEmpty()) {
break;
}
processTsKvLatestBatch(batch);
TsKvLatestEntity lastRecord = batch.get(batch.size() - 1);
lastEntityId = lastRecord.getEntityId();
lastKey = lastRecord.getKey();
}
log.info("Finished synchronizing latest timeseries to EDQS in {} ms", (System.currentTimeMillis() - ts));
}
private void processTsKvLatestBatch(List<TsKvLatestEntity> tsKvLatestEntities) {
for (TsKvLatestEntity tsKvLatestEntity : tsKvLatestEntities) {
try {
String strKey = getStrKeyOrFetchFromDb(tsKvLatestEntity.getKey());
@ -256,7 +317,6 @@ public abstract class EdqsSyncService {
log.error("Failed to sync latest timeseries: {}", tsKvLatestEntity, e);
}
}
log.info("Finished synchronizing latest timeseries to EDQS in {} ms", (System.currentTimeMillis() - ts));
}
private String getStrKeyOrFetchFromDb(int key) {
@ -265,7 +325,9 @@ public abstract class EdqsSyncService {
return strKey;
} else {
strKey = keyDictionaryDao.getKey(key);
keys.putIfAbsent(key, strKey);
if (strKey != null) {
keys.put(key, strKey);
}
}
return strKey;
}

View File

@ -40,7 +40,7 @@ public class KafkaEdqsSyncService extends EdqsSyncService {
@Override
public boolean isSyncNeeded() {
return kafkaAdmin.isTopicEmpty(EdqsQueue.STATE.getTopic());
return kafkaAdmin.isTopicEmpty(EdqsQueue.EVENTS.getTopic());
}

View File

@ -1616,11 +1616,11 @@ queue:
# Kafka properties for Edge event topic
edge-event: "${TB_QUEUE_KAFKA_EDGE_EVENT_TOPIC_PROPERTIES:retention.ms:2592000000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
# Kafka properties for EDQS events topics. Partitions number must be the same as queue.edqs.partitions
edqs-events: "${TB_QUEUE_KAFKA_EDQS_EVENTS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:1048576000;partitions:12;min.insync.replicas:1}"
edqs-events: "${TB_QUEUE_KAFKA_EDQS_EVENTS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:-1;partitions:12;min.insync.replicas:1}"
# Kafka properties for EDQS requests topic (default: 3 minutes retention). Partitions number must be the same as queue.edqs.partitions
edqs-requests: "${TB_QUEUE_KAFKA_EDQS_REQUESTS_TOPIC_PROPERTIES:retention.ms:180000;segment.bytes:52428800;retention.bytes:1048576000;partitions:12;min.insync.replicas:1}"
# Kafka properties for EDQS state topic (infinite retention, compaction). Partitions number must be the same as queue.edqs.partitions
edqs-state: "${TB_QUEUE_KAFKA_EDQS_LATEST_EVENTS_TOPIC_PROPERTIES:retention.ms:-1;segment.bytes:52428800;retention.bytes:1048576000;partitions:12;min.insync.replicas:1;cleanup.policy:compact}"
edqs-state: "${TB_QUEUE_KAFKA_EDQS_STATE_TOPIC_PROPERTIES:retention.ms:-1;segment.bytes:52428800;retention.bytes:-1;partitions:12;min.insync.replicas:1;cleanup.policy:compact}"
consumer-stats:
# Prints lag between consumer group offset and last messages offset in Kafka topics
enabled: "${TB_QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}"
@ -1701,6 +1701,7 @@ queue:
local:
rocksdb_path: "${TB_EDQS_ROCKSDB_PATH:/tmp/edqs-backup}"
partitions: "${TB_EDQS_PARTITIONS:12}"
partitioning_strategy: "${TB_EDQS_PARTITIONING_STRATEGY:tenant}" # tenant or none. For 'none', each instance handles all partitions and duplicates all the data
requests_topic: "${TB_EDQS_REQUESTS_TOPIC:edqs.requests}"
responses_topic: "${TB_EDQS_RESPONSES_TOPIC:edqs.responses}"
poll_interval: "${TB_EDQS_POLL_INTERVAL_MS:125}"

View File

@ -210,7 +210,6 @@ public class EntityServiceTest extends AbstractControllerTest {
countByQueryAndCheck(countQuery, 0);
}
@Test
public void testCountHierarchicalEntitiesByQuery() throws InterruptedException {
List<Asset> assets = new ArrayList<>();
@ -463,6 +462,7 @@ public class EntityServiceTest extends AbstractControllerTest {
deviceService.deleteDevicesByTenantId(tenantId);
}
// fails for sql implementation until we fix the issue with the relation query
@Test
public void testCountHierarchicalEntitiesByMultiRootQuery() throws InterruptedException {
List<Asset> buildings = new ArrayList<>();
@ -1443,18 +1443,29 @@ public class EntityServiceTest extends AbstractControllerTest {
String deviceName = result.getData().get(0).getLatest().get(EntityKeyType.ENTITY_FIELD).get("name").getValue();
assertThat(deviceName).isEqualTo(customerDevices.get(0).getName());
// find by customer user with generic permission
MergedUserPermissions mergedGenericPermission = new MergedUserPermissions(Map.of(Resource.DEVICE, Set.of(Operation.READ)), Collections.emptyMap());
PageData<EntityData> customerResults = findByQueryAndCheck(customerId, mergedGenericPermission, query, 1);
String cutomerDeviceName = customerResults.getData().get(0).getLatest().get(EntityKeyType.ENTITY_FIELD).get("name").getValue();
assertThat(cutomerDeviceName).isEqualTo(customerDevices.get(0).getName());
// find by customer user with group permission
MergedUserPermissions mergedGroupOnlyPermission = new MergedUserPermissions(Collections.emptyMap(), Map.of(customerDeviceGroup.getId(), new MergedGroupPermissionInfo(EntityType.DEVICE, Set.of(Operation.READ))));
PageData<EntityData> result2 = findByQueryAndCheck(customerId, mergedGroupOnlyPermission, query, 1);
String resultDeviceName2 = result2.getData().get(0).getLatest().get(EntityKeyType.ENTITY_FIELD).get("name").getValue();
assertThat(resultDeviceName2).isEqualTo(customerDevices.get(0).getName());
// try to find tenant device by customer user
SingleEntityFilter tenantDeviceFilter = new SingleEntityFilter();
tenantDeviceFilter.setSingleEntity(tenantDevices.get(0).getId());
EntityDataQuery customerQuery2 = new EntityDataQuery(tenantDeviceFilter, pageLink, entityFields, null, null);
PageData<EntityData> customerResults2 = entityService.findEntityDataByQuery(tenantId, customerId, customerQuery2);
assertEquals(0, customerResults2.getTotalElements());
findByQueryAndCheck(customerId, mergedGenericPermission, customerQuery2, 0);
// find by tenant user with group permission
PageData<EntityData> results3 = entityService.findEntityDataByQuery(tenantId, new CustomerId(EntityId.NULL_UUID), query);
PageData<EntityData> results3 = findByQueryAndCheck(new CustomerId(EntityId.NULL_UUID), mergedGroupOnlyPermission, query, 1);
assertEquals(1, results3.getTotalElements());
String deviceName3 = results3.getData().get(0).getLatest().get(EntityKeyType.ENTITY_FIELD).get("name").getValue();
assertThat(deviceName3).isEqualTo(customerDevices.get(0).getName());
}
@ -1488,16 +1499,14 @@ public class EntityServiceTest extends AbstractControllerTest {
// find by customer user with generic permissions
apiUsageStateService.createDefaultApiUsageState(tenantId, customerId);
PageData<EntityData> customerResult = entityService.findEntityDataByQuery(tenantId, customerId, query);
PageData<EntityData> customerResult = findByQueryAndCheck(customerId, query, 1);
assertEquals(1, customerResult.getTotalElements());
String customerResultName = customerResult.getData().get(0).getLatest().get(EntityKeyType.ENTITY_FIELD).get("name").getValue();
assertThat(customerResultName).isEqualTo(TEST_CUSTOMER_NAME);
// find by tenant user with customerId filter
apiUsageStateFilter.setCustomerId(customerId);
PageData<EntityData> tenantResult = searchEntities(query);
assertEquals(1, tenantResult.getTotalElements());
PageData<EntityData> tenantResult = findByQueryAndCheck(query, 1);
String tenantResultName = tenantResult.getData().get(0).getLatest().get(EntityKeyType.ENTITY_FIELD).get("name").getValue();
assertThat(tenantResultName).isEqualTo(TEST_CUSTOMER_NAME);
}
@ -1625,6 +1634,231 @@ public class EntityServiceTest extends AbstractControllerTest {
deviceService.deleteDevicesByTenantId(tenantId);
}
@Test
public void testFindEntityDataByRelationQuery_blobEntity_customerLevel() {
final int deviceCnt = 2;
final int relationsCnt = 3;
final int blobEntitiesCnt = deviceCnt * relationsCnt;
Customer customer = new Customer();
customer.setTenantId(tenantId);
customer.setTitle("Customer Relation Query");
customer = customerService.saveCustomer(customer);
List<Device> devices = new ArrayList<>();
for (int i = 0; i < deviceCnt; i++) {
Device device = new Device();
device.setTenantId(tenantId);
device.setName("Device relation query " + i);
device.setCustomerId(customer.getId());
device.setType("default");
devices.add(deviceService.saveDevice(device));
}
List<BlobEntity> blobEntities = new ArrayList<>();
for (int i = 0; i < blobEntitiesCnt; i++) {
BlobEntity blobEntity = new BlobEntity();
blobEntity.setName("Blob relation query " + i);
blobEntity.setTenantId(tenantId);
blobEntity.setContentType("image/png");
blobEntity.setData(ByteBuffer.allocate(1024));
blobEntity.setCustomerId(customer.getId());
blobEntity.setType("Report");
blobEntities.add(blobEntityService.saveBlobEntity(blobEntity));
}
for (int i = 0; i < deviceCnt; i++) {
for (int j = 0; j < relationsCnt; j++) {
EntityRelation relationEntity = new EntityRelation();
relationEntity.setFrom(devices.get(i).getId());
relationEntity.setTo(blobEntities.get(j + (i * relationsCnt)).getId());
relationEntity.setTypeGroup(RelationTypeGroup.COMMON);
relationEntity.setType("fileAttached");
relationService.saveRelation(tenantId, relationEntity);
}
}
MergedUserPermissions mergedUserPermissions = new MergedUserPermissions(Map.of(ALL, Set.of(Operation.ALL)), Collections.emptyMap());
RelationEntityTypeFilter relationEntityTypeFilter = new RelationEntityTypeFilter("fileAttached", Collections.singletonList(EntityType.BLOB_ENTITY));
RelationsQueryFilter filter = new RelationsQueryFilter();
filter.setFilters(Collections.singletonList(relationEntityTypeFilter));
filter.setDirection(EntitySearchDirection.FROM);
EntityDataPageLink pageLink = new EntityDataPageLink(10, 0, null, null);
for (Device device : devices) {
filter.setRootEntity(device.getId());
EntityDataQuery query = new EntityDataQuery(filter, pageLink, Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
findByQueryAndCheck(customer.getId(), mergedUserPermissions, query, relationsCnt);
countByQueryAndCheck(customer.getId(), mergedUserPermissions, query, relationsCnt);
/*
In order to be careful with updating Relation Query while adding new Entity Type,
this checkup will help to find place, where you could check the correctness of building query
*/
Assert.assertEquals(38, EntityType.values().length);
}
}
@Test
public void testFindEntitiesByRelationEntityTypeFilterWithTenantGroupPermission() {
final int assetCount = 2;
final int relationsCnt = 4;
final int deviceEntitiesCnt = assetCount * relationsCnt;
EntityGroup deviceGroup = new EntityGroup();
deviceGroup.setName("Device Tenant Level Group");
deviceGroup.setOwnerId(tenantId);
deviceGroup.setTenantId(tenantId);
deviceGroup.setType(EntityType.DEVICE);
deviceGroup = entityGroupService.saveEntityGroup(tenantId, tenantId, deviceGroup);
List<Asset> assets = new ArrayList<>();
for (int i = 0; i < assetCount; i++) {
Asset building = new Asset();
building.setTenantId(tenantId);
building.setName("Building _" + i);
building.setType("building");
building = assetService.saveAsset(building);
assets.add(building);
}
List<Device> devices = new ArrayList<>();
for (int i = 0; i < deviceEntitiesCnt; i++) {
Device device = new Device();
device.setTenantId(tenantId);
device.setName("Test device " + i);
device.setType("default");
Device savedDevice = deviceService.saveDevice(device);
devices.add(savedDevice);
if (i % 2 == 0) {
entityGroupService.addEntityToEntityGroup(tenantId, deviceGroup.getId(), savedDevice.getId());
}
}
for (int i = 0; i < assetCount; i++) {
for (int j = 0; j < relationsCnt; j++) {
EntityRelation relationEntity = new EntityRelation();
relationEntity.setFrom(assets.get(i).getId());
relationEntity.setTo(devices.get(j + (i * relationsCnt)).getId());
relationEntity.setTypeGroup(RelationTypeGroup.COMMON);
relationEntity.setType("contains");
relationService.saveRelation(tenantId, relationEntity);
}
}
MergedUserPermissions groupOnlyPermission = new MergedUserPermissions(Collections.emptyMap(),
Map.of(deviceGroup.getId(), new MergedGroupPermissionInfo(EntityType.DEVICE, Set.of(Operation.READ))));
RelationEntityTypeFilter relationEntityTypeFilter = new RelationEntityTypeFilter("contains", Collections.singletonList(EntityType.DEVICE));
RelationsQueryFilter filter = new RelationsQueryFilter();
filter.setFilters(Collections.singletonList(relationEntityTypeFilter));
filter.setDirection(EntitySearchDirection.FROM);
EntityDataPageLink pageLink = new EntityDataPageLink(10, 0, null, null);
List<KeyFilter> keyFiltersEqualString = createStringKeyFilters("name", EntityKeyType.ENTITY_FIELD, StringOperation.STARTS_WITH, "Test device ");
for (Asset asset : assets) {
filter.setRootEntity(asset.getId());
EntityDataQuery query = new EntityDataQuery(filter, pageLink, Collections.emptyList(), Collections.emptyList(), keyFiltersEqualString);
findByQueryAndCheck(new CustomerId(EntityId.NULL_UUID), groupOnlyPermission, query, relationsCnt / 2);
countByQueryAndCheck(new CustomerId(EntityId.NULL_UUID), groupOnlyPermission, query, relationsCnt / 2);
}
}
@Test
public void testFindEntitiesWithRelationEntityTypeFilterByCustomerUser() {
Customer customer = new Customer();
customer.setTenantId(tenantId);
customer.setTitle("Customer Relation Query");
customer = customerService.saveCustomer(customer);
final int assetCount = 2;
final int relationsCnt = 4;
final int deviceEntitiesCnt = assetCount * relationsCnt;
EntityGroup deviceGroup = new EntityGroup();
deviceGroup.setName("Device Tenant Level Group");
deviceGroup.setOwnerId(customer.getId());
deviceGroup.setTenantId(tenantId);
deviceGroup.setType(EntityType.DEVICE);
deviceGroup = entityGroupService.saveEntityGroup(tenantId, tenantId, deviceGroup);
List<Asset> assets = new ArrayList<>();
for (int i = 0; i < assetCount; i++) {
Asset building = new Asset();
building.setTenantId(tenantId);
building.setCustomerId(customer.getId());
building.setName("Building _" + i);
building.setType("building");
building = assetService.saveAsset(building);
assets.add(building);
}
List<Device> devices = new ArrayList<>();
for (int i = 0; i < deviceEntitiesCnt; i++) {
Device device = new Device();
device.setTenantId(tenantId);
device.setCustomerId(customer.getId());
device.setName("Test device " + i);
device.setType("default");
Device savedDevice = deviceService.saveDevice(device);
devices.add(savedDevice);
if (i % 2 == 0) {
entityGroupService.addEntityToEntityGroup(tenantId, deviceGroup.getId(), savedDevice.getId());
}
}
for (int i = 0; i < assetCount; i++) {
for (int j = 0; j < relationsCnt; j++) {
EntityRelation relationEntity = new EntityRelation();
relationEntity.setFrom(assets.get(i).getId());
relationEntity.setTo(devices.get(j + (i * relationsCnt)).getId());
relationEntity.setTypeGroup(RelationTypeGroup.COMMON);
relationEntity.setType("contains");
relationService.saveRelation(tenantId, relationEntity);
}
}
MergedUserPermissions mergedGroupOnlyPermission = new MergedUserPermissions(Collections.emptyMap(), Map.of(deviceGroup.getId(), new MergedGroupPermissionInfo(EntityType.DEVICE, Set.of(Operation.ALL))));
MergedUserPermissions mergedGenericOnlyPermission = new MergedUserPermissions(Map.of(Resource.ALL, Set.of(Operation.ALL)), Collections.emptyMap());
MergedUserPermissions mergedGenericAndGroupPermission = new MergedUserPermissions(Map.of(Resource.ALL, Set.of(Operation.ALL)), Map.of(deviceGroup.getId(), new MergedGroupPermissionInfo(EntityType.DEVICE, Set.of(Operation.ALL))));
RelationEntityTypeFilter relationEntityTypeFilter = new RelationEntityTypeFilter("contains", Collections.singletonList(EntityType.DEVICE));
RelationsQueryFilter filter = new RelationsQueryFilter();
filter.setFilters(Collections.singletonList(relationEntityTypeFilter));
filter.setDirection(EntitySearchDirection.FROM);
EntityDataPageLink pageLink = new EntityDataPageLink(10, 0, null, null);
List<KeyFilter> keyFiltersEqualString = createStringKeyFilters("name", EntityKeyType.ENTITY_FIELD, StringOperation.STARTS_WITH, "Test device ");
EntityDataQuery query = new EntityDataQuery(filter, pageLink, Collections.emptyList(), Collections.emptyList(), keyFiltersEqualString);
for (Asset asset : assets) {
filter.setRootEntity(asset.getId());
//check by user with generic permission
PageData<EntityData> relationsResult = findByQueryAndCheck(customer.getId(), mergedGenericOnlyPermission, query, relationsCnt);
countByQueryAndCheck(customer.getId(), mergedGenericOnlyPermission, query, relationsCnt);
//check by user with generic and group permission
PageData<EntityData> relationsResult1 = findByQueryAndCheck(customer.getId(), mergedGenericAndGroupPermission, query, relationsCnt);
countByQueryAndCheck(customer.getId(), mergedGenericAndGroupPermission, query, relationsCnt);
//check by other customer user with group only permission
PageData<EntityData> relationsResult2 = findByQueryAndCheck(otherCustomerId, mergedGroupOnlyPermission, query, relationsCnt / 2);
long relationsResultCnt2 = countByQueryAndCheck(otherCustomerId, mergedGroupOnlyPermission, query, relationsCnt / 2);
Assert.assertEquals(relationsCnt / 2, relationsResult2.getData().size());
Assert.assertEquals(relationsCnt / 2, relationsResultCnt2);
//check by other customer user with generic and group only permission
PageData<EntityData> relationsResult3 = findByQueryAndCheck(otherCustomerId, mergedGenericAndGroupPermission, query, relationsCnt / 2);
long relationsResultCnt3 = countByQueryAndCheck(otherCustomerId, mergedGenericAndGroupPermission, query, relationsCnt / 2);
Assert.assertEquals(relationsCnt / 2, relationsResult3.getData().size());
Assert.assertEquals(relationsCnt / 2, relationsResultCnt3);
}
}
@Test
public void testBuildNumericPredicateQueryOperations() throws ExecutionException, InterruptedException {
@ -2422,7 +2656,7 @@ public class EntityServiceTest extends AbstractControllerTest {
return timeseriesService.save(tenantId, entityId, timeseries);
}
private void createMultiRootHierarchy(List<Asset> buildings, List<Asset> apartments,
protected void createMultiRootHierarchy(List<Asset> buildings, List<Asset> apartments,
Map<String, Map<UUID, String>> entityNameByTypeMap,
Map<UUID, UUID> childParentRelationMap) throws InterruptedException {
for (int k = 0; k < 3; k++) {
@ -2507,7 +2741,7 @@ public class EntityServiceTest extends AbstractControllerTest {
entityView.setEndTimeMs(256);
entityView.setExternalId(new EntityViewId(UUID.randomUUID()));
entityView.setAdditionalInfo(JacksonUtil.newObjectNode().put("test", "test"));
entityView = entityViewDao.save(tenantId, entityView);
entityView = entityViewService.saveEntityView(entityView);
EntityViewTypeFilter entityViewTypeFilter = new EntityViewTypeFilter();
entityViewTypeFilter.setEntityViewNameFilter("test");
@ -2518,21 +2752,18 @@ public class EntityServiceTest extends AbstractControllerTest {
);
EntityDataQuery query = new EntityDataQuery(entityViewTypeFilter, pageLink, entityFields, Collections.emptyList(), null);
PageData<EntityData> relationsResult = entityService.findEntityDataByQuery(tenantId, new CustomerId(EntityId.NULL_UUID), query);
assertThat(relationsResult.getData()).hasSize(1);
PageData<EntityData> relationsResult = findByQueryAndCheck(new CustomerId(EntityId.NULL_UUID), query, 1);
assertThat(relationsResult.getData().get(0).getLatest().get(EntityKeyType.ENTITY_FIELD).get("name").getValue()).isEqualTo(entityView.getName());
// find with non existing name
entityViewTypeFilter.setEntityViewNameFilter("non-existing");
PageData<EntityData> relationsResult2 = entityService.findEntityDataByQuery(tenantId, new CustomerId(EntityId.NULL_UUID), query);
assertThat(relationsResult2.getData()).hasSize(0);
findByQueryAndCheck(new CustomerId(EntityId.NULL_UUID), query, 0);
// find with non existing type
entityViewTypeFilter.setEntityViewNameFilter(null);
entityViewTypeFilter.setEntityViewTypes(Collections.singletonList("non-existing"));
PageData<EntityData> relationsResult3 = entityService.findEntityDataByQuery(tenantId, new CustomerId(EntityId.NULL_UUID), query);
assertThat(relationsResult3.getData()).hasSize(0);
findByQueryAndCheck(new CustomerId(EntityId.NULL_UUID), query, 0);
}
private PageData<EntityData> findByQuery(EntityDataQuery query) {

View File

@ -22,6 +22,7 @@ import org.thingsboard.server.common.data.Dashboard;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.DeviceProfileType;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.EntityView;
import org.thingsboard.server.common.data.Tenant;
import org.thingsboard.server.common.data.TenantProfile;
@ -275,6 +276,7 @@ public class FieldsUtil {
return ApiUsageStateFields.builder()
.id(entity.getUuidId())
.createdTime(entity.getCreatedTime())
.customerId(entity.getEntityId().getEntityType() == EntityType.CUSTOMER ? entity.getEntityId().getId() : null)
.entityId(entity.getEntityId())
.transportState(entity.getTransportState())
.dbStorageState(entity.getDbStorageState())

View File

@ -42,10 +42,11 @@ import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.queue.QueueConfig;
import org.thingsboard.server.common.data.util.CollectionsUtil;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.edqs.state.EdqsStateService;
import org.thingsboard.server.edqs.repo.EdqRepository;
import org.thingsboard.server.edqs.state.EdqsStateService;
import org.thingsboard.server.edqs.util.EdqsPartitionService;
import org.thingsboard.server.edqs.util.VersionsStore;
import org.thingsboard.server.gen.transport.TransportProtos;
@ -70,6 +71,7 @@ import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@EdqsComponent
@ -96,6 +98,8 @@ public class EdqsProcessor implements TbQueueHandler<TbProtoQueueMsg<ToEdqsMsg>,
private final VersionsStore versionsStore = new VersionsStore();
private final AtomicInteger counter = new AtomicInteger(); // FIXME: TMP
@PostConstruct
private void init() {
consumersExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("edqs-consumer"));
@ -152,15 +156,17 @@ public class EdqsProcessor implements TbQueueHandler<TbProtoQueueMsg<ToEdqsMsg>,
responseTemplate.subscribe(withTopic(partitions, config.getRequestsTopic()));
Set<TopicPartitionInfo> oldPartitions = event.getOldPartitions().get(new QueueKey(ServiceType.EDQS));
Set<Integer> removedPartitions = Sets.difference(oldPartitions, newPartitions).stream()
.map(tpi -> tpi.getPartition().orElse(-1)).collect(Collectors.toSet());
if (config.getPartitioningStrategy() != EdqsPartitioningStrategy.TENANT && !removedPartitions.isEmpty()) {
log.warn("Partitions {} were removed but shouldn't be (due to NONE partitioning strategy)", removedPartitions);
if (CollectionsUtil.isNotEmpty(oldPartitions)) {
Set<Integer> removedPartitions = Sets.difference(oldPartitions, newPartitions).stream()
.map(tpi -> tpi.getPartition().orElse(-1)).collect(Collectors.toSet());
if (config.getPartitioningStrategy() != EdqsPartitioningStrategy.TENANT && !removedPartitions.isEmpty()) {
log.warn("Partitions {} were removed but shouldn't be (due to NONE partitioning strategy)", removedPartitions);
}
repository.clearIf(tenantId -> {
Integer partition = partitionService.resolvePartition(tenantId);
return partition != null && removedPartitions.contains(partition);
});
}
repository.clearIf(tenantId -> {
Integer partition = partitionService.resolvePartition(tenantId);
return partition != null && removedPartitions.contains(partition);
});
} catch (Throwable t) {
log.error("Failed to handle partition change event {}", event, t);
}
@ -221,7 +227,11 @@ public class EdqsProcessor implements TbQueueHandler<TbProtoQueueMsg<ToEdqsMsg>,
}
EdqsObject object = converter.deserialize(objectType, eventMsg.getData().toByteArray());
log.info("[{}] Processing event [{}] [{}] [{}] [{}]", tenantId, objectType, eventType, key, version);
log.debug("[{}] Processing event [{}] [{}] [{}] [{}]", tenantId, objectType, eventType, key, version);
int count = counter.incrementAndGet();
if (count % 100000 == 0) {
log.info("Processed {} events", count);
}
EdqsEvent event = EdqsEvent.builder()
.tenantId(tenantId)

View File

@ -76,8 +76,7 @@ public class ApiUsageStateQueryProcessor extends AbstractSingleEntityTypeQueryPr
@Override
protected boolean matches(EntityData<?> ed) {
ApiUsageStateFields entityFields = (ApiUsageStateFields) ed.getFields();
return super.matches(ed) && (filter.getCustomerId() != null ? entityFields.getEntityId().equals(filter.getCustomerId()) :
entityFields.getEntityId().equals(repository.getTenantId()));
return super.matches(ed) && (filter.getCustomerId() == null || filter.getCustomerId().equals(entityFields.getEntityId()));
}
@Override

View File

@ -71,7 +71,8 @@ public class KafkaEdqsStateService implements EdqsStateService {
private ScheduledExecutorService scheduler;
private final VersionsStore versionsStore = new VersionsStore();
private final AtomicInteger restoredCount = new AtomicInteger();
private final AtomicInteger stateReadCount = new AtomicInteger();
private final AtomicInteger eventsReadCount = new AtomicInteger();
@PostConstruct
private void init() {
@ -79,7 +80,7 @@ public class KafkaEdqsStateService implements EdqsStateService {
mgmtExecutor = ThingsBoardExecutors.newWorkStealingPool(4, "edqs-backup-consumer-mgmt");
scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor("edqs-backup-scheduler");
stateConsumer = MainQueueConsumerManager.<TbProtoQueueMsg<ToEdqsMsg>, QueueConfig>builder()
stateConsumer = MainQueueConsumerManager.<TbProtoQueueMsg<ToEdqsMsg>, QueueConfig>builder() // FIXME Slavik: if topic is empty
.queueKey(new QueueKey(ServiceType.EDQS, EdqsQueue.STATE.getTopic()))
.config(QueueConfig.of(true, config.getPollInterval()))
.msgPackProcessor((msgs, consumer, config) -> {
@ -88,8 +89,8 @@ public class KafkaEdqsStateService implements EdqsStateService {
ToEdqsMsg msg = queueMsg.getValue();
log.trace("Processing message: {}", msg);
edqsProcessor.process(msg, EdqsQueue.STATE);
if (restoredCount.incrementAndGet() % 1000 == 0) {
log.info("Processed {} msgs", restoredCount.get());
if (stateReadCount.incrementAndGet() % 100000 == 0) {
log.info("[state] Processed {} msgs", stateReadCount.get());
}
} catch (Throwable t) {
log.error("Failed to process message: {}", queueMsg, t);
@ -103,7 +104,7 @@ public class KafkaEdqsStateService implements EdqsStateService {
.scheduler(scheduler)
.build();
eventsConsumer = QueueConsumerManager.<TbProtoQueueMsg<ToEdqsMsg>>builder()
eventsConsumer = QueueConsumerManager.<TbProtoQueueMsg<ToEdqsMsg>>builder() // FIXME Slavik writes to the state while we read it, slows down the start
.name("edqs-events-to-backup-consumer")
.pollInterval(config.getPollInterval())
.msgPackProcessor((msgs, consumer) -> {
@ -115,6 +116,9 @@ public class KafkaEdqsStateService implements EdqsStateService {
if (msg.hasEventMsg()) {
EdqsEventMsg eventMsg = msg.getEventMsg();
String key = eventMsg.getKey();
if (eventsReadCount.incrementAndGet() % 100000 == 0) {
log.info("[events-to-backup] Processed {} msgs", eventsReadCount.get());
}
if (eventMsg.hasVersion()) {
if (!versionsStore.isNew(key, eventMsg.getVersion())) {
return;
@ -153,14 +157,14 @@ public class KafkaEdqsStateService implements EdqsStateService {
@Override
public void restore(Set<TopicPartitionInfo> partitions) {
restoredCount.set(0);
stateReadCount.set(0); //TODO Slavik: do not support remote mode in monolith setup
long startTs = System.currentTimeMillis();
log.info("Restore started for partitions {}", partitions.stream().map(tpi -> tpi.getPartition().orElse(-1)).sorted().toList());
stateConsumer.doUpdate(partitions); // calling blocking doUpdate instead of update
stateConsumer.awaitStop(0); // consumers should stop on their own because EdqsQueue.STATE.stopWhenRead is true, we just need to wait
log.info("Restore finished in {} ms. Processed {} msgs", (System.currentTimeMillis() - startTs), restoredCount.get());
log.info("Restore finished in {} ms. Processed {} msgs", (System.currentTimeMillis() - startTs), stateReadCount.get());
}
@Override

View File

@ -482,7 +482,7 @@ public class HashPartitionService implements PartitionService {
private Set<TopicPartitionInfo> toTpiList(QueueKey queueKey, List<Integer> partitions) {
if (partitions == null) {
return null;
return Collections.emptySet();
}
return partitions.stream()
.map(partition -> buildTopicPartitionInfo(queueKey, partition))

View File

@ -144,10 +144,11 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQue
stopWatch.stop();
log.trace("poll topic {} took {}ms", getTopic(), stopWatch.getTotalTimeMillis());
List<ConsumerRecord<String, byte[]>> recordList;
if (records.isEmpty()) {
return Collections.emptyList();
recordList = Collections.emptyList();
} else {
List<ConsumerRecord<String, byte[]>> recordList = new ArrayList<>(256);
recordList = new ArrayList<>(256);
records.forEach(record -> {
recordList.add(record);
if (stopWhenRead) {
@ -163,17 +164,18 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQue
}
}
});
if (endOffsets != null && endOffsets.isEmpty()) {
log.info("Reached end offsets for {}, stopping consumer", consumer.assignment());
stop();
}
return recordList;
}
if (stopWhenRead && endOffsets.isEmpty()) {
log.info("Reached end offset for {}, stopping consumer", consumer.assignment());
stop();
}
return recordList;
}
private void onPartitionsAssigned() {
if (stopWhenRead) {
endOffsets = consumer.endOffsets(consumer.assignment()).entrySet().stream()
.filter(entry -> entry.getValue() > 0)
.collect(Collectors.toMap(entry -> entry.getKey().partition(), Map.Entry::getValue));
}
}

View File

@ -38,7 +38,7 @@ public class DefaultStatsFactory implements StatsFactory {
private static final Counter STUB_COUNTER = new StubCounter();
@Autowired
@Autowired(required = false) // FIXME Slavik !!!
private MeterRegistry meterRegistry;
@Value("${metrics.enabled:false}")

View File

@ -22,14 +22,13 @@ import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.util.TbPair;
import org.thingsboard.server.dao.model.sql.AttributeKvEntity;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
/**
* @author Andrew Shvayka
@ -42,14 +41,14 @@ public interface AttributesDao {
List<AttributeKvEntry> findAll(TenantId tenantId, EntityId entityId, AttributeScope attributeScope);
PageData<AttributeKvEntity> findAll(PageLink pageLink);
ListenableFuture<Long> save(TenantId tenantId, EntityId entityId, AttributeScope attributeScope, AttributeKvEntry attribute);
List<ListenableFuture<String>> removeAll(TenantId tenantId, EntityId entityId, AttributeScope attributeScope, List<String> keys);
List<ListenableFuture<TbPair<String, Long>>> removeAllWithVersions(TenantId tenantId, EntityId entityId, AttributeScope attributeScope, List<String> keys);
List<AttributeKvEntity> findNextBatch(UUID entityId, int attributeType, int attributeKey, int batchSize);
List<String> findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId);
List<String> findAllKeysByEntityIds(TenantId tenantId, List<EntityId> entityIds);

View File

@ -18,8 +18,6 @@ package org.thingsboard.server.dao.relation;
import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
import org.thingsboard.server.common.data.rule.RuleChainType;
@ -43,8 +41,6 @@ public interface RelationDao {
List<EntityRelation> findAllByToAndType(TenantId tenantId, EntityId to, String relationType, RelationTypeGroup typeGroup);
PageData<EntityRelation> findAll(PageLink pageLink);
ListenableFuture<Boolean> checkRelationAsync(TenantId tenantId, EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup);
boolean checkRelation(TenantId tenantId, EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup);

View File

@ -59,4 +59,12 @@ public interface AttributeKvRepository extends JpaRepository<AttributeKvEntity,
"entity_id in :entityIds AND attribute_type = :attributeType ORDER BY attribute_key", nativeQuery = true)
List<Integer> findAllKeysByEntityIdsAndAttributeType(@Param("entityIds") List<UUID> entityIds,
@Param("attributeType") int attributeType);
@Query(value = "SELECT attribute_key, attribute_type, entity_id, bool_v, dbl_v, json_v, last_update_ts, long_v, str_v, version FROM attribute_kv WHERE (entity_id, attribute_type, attribute_key) > " +
"(:entityId, :attributeType, :attributeKey) ORDER BY entity_id, attribute_type, attribute_key LIMIT :batchSize", nativeQuery = true)
List<AttributeKvEntity> findNextBatch(@Param("entityId") UUID entityId,
@Param("attributeType") int attributeType,
@Param("attributeKey") int attributeKey,
@Param("batchSize") int batchSize);
}

View File

@ -23,7 +23,6 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.domain.Page;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.thingsboard.server.common.data.AttributeScope;
@ -31,8 +30,6 @@ import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.util.TbPair;
import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.dao.DaoUtil;
@ -51,8 +48,8 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -157,9 +154,8 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl
}
@Override
public PageData<AttributeKvEntity> findAll(PageLink pageLink) {
Page<AttributeKvEntity> attributes = attributeKvRepository.findAll(DaoUtil.toPageable(pageLink));
return DaoUtil.pageToPageData(attributes);
public List<AttributeKvEntity> findNextBatch(UUID entityId, int attributeType, int attributeKey, int batchSize) {
return attributeKvRepository.findNextBatch(entityId, attributeType, attributeKey, batchSize);
}
@Override

View File

@ -693,6 +693,10 @@ public class DefaultEntityQueryRepository implements EntityQueryRepository {
SELECT_ADDRESS + ", " + SELECT_ADDRESS_2 + ", " + SELECT_ZIP + ", " + SELECT_PHONE + ", " +
SELECT_ADDITIONAL_INFO + (entityFilter.isMultiRoot() ? (", " + SELECT_RELATED_PARENT_ID) : "") +
", entity.entity_type as entity_type";
/*
* FIXME:
* target entities are duplicated in result list, if search direction is TO and multiple relations are references to target entity
* */
String from = getQueryTemplate(entityFilter.getDirection(), entityFilter.isMultiRoot());
if (entityFilter.isMultiRoot()) {

View File

@ -24,8 +24,6 @@ import org.springframework.util.CollectionUtils;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.relation.EntityRelation;
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
import org.thingsboard.server.common.data.rule.RuleChainType;
@ -129,10 +127,6 @@ public class JpaRelationDao extends JpaAbstractDaoListeningExecutorService imple
typeGroup.name()));
}
@Override
public PageData<EntityRelation> findAll(PageLink pageLink) {
return DaoUtil.toPageData(relationRepository.findAll(DaoUtil.toPageable(pageLink)));
}
@Override
public ListenableFuture<Boolean> checkRelationAsync(TenantId tenantId, EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup) {

View File

@ -85,6 +85,15 @@ public interface RelationRepository
@Query("DELETE FROM RelationEntity r where r.fromId = :fromId and r.fromType = :fromType and r.relationTypeGroup in :relationTypeGroups")
void deleteByFromIdAndFromTypeAndRelationTypeGroupIn(@Param("fromId") UUID fromId, @Param("fromType") String fromType, @Param("relationTypeGroups") List<String> relationTypeGroups);
@Query("SELECT e FROM RelationEntity e ORDER BY e.fromId, e.fromType, e.toId, e.toType, e.relationType, e.relationTypeGroup")
Page<RelationEntity> findAll(Pageable pageable);
@Query(value = "SELECT from_id, from_type, relation_type_group, relation_type, to_id, to_type, additional_info, version FROM relation" +
" WHERE (from_id, from_type, relation_type_group, relation_type, to_id, to_type) > " +
"(:fromId, :fromType, :relationTypeGroup, :relationType, :toId, :toType) ORDER BY " +
"from_id, from_type, relation_type_group, relation_type, to_id, to_type LIMIT :batchSize", nativeQuery = true)
List<RelationEntity> findNextBatch(@Param("fromId") UUID fromId,
@Param("fromType") String fromType,
@Param("relationTypeGroup") String relationTypeGroup,
@Param("relationType") String relationType,
@Param("toId") UUID toId,
@Param("toType") String toType,
@Param("batchSize") int batchSize);
}

View File

@ -171,9 +171,5 @@ public class CachedRedisSqlTimeseriesLatestDao extends BaseAbstractSqlTimeseries
return sqlDao.findAllKeysByEntityIds(tenantId, entityIds);
}
@Override
public PageData<TsKvLatestEntity> findAllLatest(PageLink pageLink) {
return null;
}
}

View File

@ -188,10 +188,6 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme
return tsKvLatestRepository.findAllKeysByEntityIds(entityIds.stream().map(EntityId::getId).collect(Collectors.toList()));
}
@Override
public PageData<TsKvLatestEntity> findAllLatest(PageLink pageLink) {
return DaoUtil.pageToPageData(tsKvLatestRepository.findAll(DaoUtil.toPageable(pageLink, "entityId", "key")));
}
private ListenableFuture<TsKvLatestRemovingResult> getNewLatestEntryFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
ListenableFuture<List<TsKvEntry>> future = findNewLatestEntryFuture(tenantId, entityId, query);

View File

@ -20,6 +20,7 @@ import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import org.thingsboard.server.dao.model.sql.AttributeKvEntity;
import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestCompositeKey;
import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestEntity;
@ -46,4 +47,11 @@ public interface TsKvLatestRepository extends JpaRepository<TsKvLatestEntity, Ts
@Query("SELECT e FROM TsKvLatestEntity e ORDER BY e.entityId ASC, e.key ASC")
Page<TsKvLatestEntity> findAll(Pageable pageable);
@Query(value = "SELECT entity_id, key, ts, bool_v, str_v, long_v, dbl_v, json_v, version FROM ts_kv_latest WHERE (entity_id, key) > " +
"(:entityId, :key) ORDER BY entity_id, key LIMIT :batchSize", nativeQuery = true)
List<TsKvLatestEntity> findNextBatch(@Param("entityId") UUID entityId,
@Param("key") int key,
@Param("batchSize") int batchSize);
}

View File

@ -103,10 +103,6 @@ public class CassandraBaseTimeseriesLatestDao extends AbstractCassandraBaseTimes
return Collections.emptyList();
}
@Override
public PageData<TsKvLatestEntity> findAllLatest(PageLink pageLink) {
return null;
}
@Override
public ListenableFuture<Long> saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) {

View File

@ -54,5 +54,4 @@ public interface TimeseriesLatestDao {
List<String> findAllKeysByEntityIds(TenantId tenantId, List<EntityId> entityIds);
PageData<TsKvLatestEntity> findAllLatest(PageLink pageLink);
}

View File

@ -30,7 +30,7 @@ import java.util.Arrays;
@EnableAsync
@EnableScheduling
@ComponentScan({"org.thingsboard.server.edqs", "org.thingsboard.server.queue.edqs", "org.thingsboard.server.queue.discovery", "org.thingsboard.server.queue.kafka",
"org.thingsboard.server.queue.settings", "org.thingsboard.server.queue.environment"})
"org.thingsboard.server.queue.settings", "org.thingsboard.server.queue.environment", "org.thingsboard.server.common.stats"})
@Slf4j
public class ThingsboardEdqsApplication {
@ -42,8 +42,8 @@ public class ThingsboardEdqsApplication {
}
// @Bean
public ApplicationRunner runner(CSVLoader loader, EdqRepository edqRepository) {
return args -> {
// public ApplicationRunner runner(CSVLoader loader, EdqRepository edqRepository) {
// return args -> {
// long startTs = System.currentTimeMillis();
// var loader = new TenantRepoLoader(new TenantRepo(TenantId.fromUUID(UUID.fromString("2a209df0-c7ff-11ea-a3e0-f321b0429d60"))));
// loader.load();
@ -103,8 +103,8 @@ public class ThingsboardEdqsApplication {
// });
// Thread.sleep(5000);
// }
};
}
// };
// }
private static String[] updateArguments(String[] args) {
if (Arrays.stream(args).noneMatch(arg -> arg.startsWith(SPRING_CONFIG_NAME_KEY))) {

View File

@ -52,15 +52,14 @@ queue:
# For debug level
print-interval-ms: "${TB_QUEUE_IN_MEMORY_STATS_PRINT_INTERVAL_MS:60000}"
edqs:
enabled: "${TB_EDQS_ENABLED:true}"
mode: "${TB_EDQS_MODE:local}"
partitions: "${TB_EDQS_PARTITIONS:12}"
partitioning_strategy: "${TB_EDQS_PARTITIONING_STRATEGY:tenant}" # tenant or none. For 'none', each instance handles all partitions and duplicates all the data
requests_topic: "${TB_EDQS_REQUESTS_TOPIC:edqs.requests}"
responses_topic: "${TB_EDQS_RESPONSES_TOPIC:edqs.responses}"
poll_interval: "${TB_EDQS_POLL_INTERVAL_MS:125}"
max_pending_requests: "${TB_EDQS_MAX_PENDING_REQUESTS:10000}"
max_request_timeout: "${TB_EDQS_MAX_REQUEST_TIMEOUT:10000}"
partitioning_strategy: "${TB_EDQS_PARTITIONING_STRATEGY:tenant}" # tenant or none. For 'none', each instance handles all partitions and duplicates all the data
kafka:
# Kafka Bootstrap nodes in "host:port" format
bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}"
@ -171,11 +170,11 @@ queue:
# Kafka properties for Housekeeper reprocessing topic; retention.ms is set to 90 days; partitions is set to 1 since only one reprocessing service is running at a time
housekeeper-reprocessing: "${TB_QUEUE_KAFKA_HOUSEKEEPER_REPROCESSING_TOPIC_PROPERTIES:retention.ms:7776000000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
# Kafka properties for EDQS events topics. Partitions number must be the same as queue.edqs.partitions
edqs-events: "${TB_QUEUE_KAFKA_EDQS_EVENTS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:1048576000;partitions:12;min.insync.replicas:1}"
edqs-events: "${TB_QUEUE_KAFKA_EDQS_EVENTS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:-1;partitions:12;min.insync.replicas:1}"
# Kafka properties for EDQS requests topic (default: 3 minutes retention). Partitions number must be the same as queue.edqs.partitions
edqs-requests: "${TB_QUEUE_KAFKA_EDQS_REQUESTS_TOPIC_PROPERTIES:retention.ms:180000;segment.bytes:52428800;retention.bytes:1048576000;partitions:12;min.insync.replicas:1}"
# Kafka properties for EDQS state topic (infinite retention, compaction). Partitions number must be the same as queue.edqs.partitions
edqs-state: "${TB_QUEUE_KAFKA_EDQS_LATEST_EVENTS_TOPIC_PROPERTIES:retention.ms:-1;segment.bytes:52428800;retention.bytes:1048576000;partitions:12;min.insync.replicas:1;cleanup.policy:compact}"
edqs-state: "${TB_QUEUE_KAFKA_EDQS_STATE_TOPIC_PROPERTIES:retention.ms:-1;segment.bytes:52428800;retention.bytes:-1;partitions:12;min.insync.replicas:1;cleanup.policy:compact}"
consumer-stats:
# Prints lag between consumer group offset and last messages offset in Kafka topics
enabled: "${TB_QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}"