diff --git a/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsService.java b/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsService.java index 7582d036e2..160d4bb565 100644 --- a/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsService.java +++ b/application/src/main/java/org/thingsboard/server/service/edqs/DefaultEdqsService.java @@ -125,6 +125,7 @@ public class DefaultEdqsService implements EdqsService { executor.submit(() -> { try { EdqsSyncState syncState = getSyncState(); + // FIXME: Slavik smart events check if (edqsSyncService.isSyncNeeded() || syncState == null || syncState.getStatus() != EdqsSyncStatus.FINISHED) { if (hashPartitionService.isSystemPartitionMine(ServiceType.TB_CORE)) { processSystemRequest(ToCoreEdqsRequest.builder() diff --git a/application/src/main/java/org/thingsboard/server/service/edqs/EdqsDataLoader.java b/application/src/main/java/org/thingsboard/server/service/edqs/EdqsDataLoader.java deleted file mode 100644 index 69a3f6108d..0000000000 --- a/application/src/main/java/org/thingsboard/server/service/edqs/EdqsDataLoader.java +++ /dev/null @@ -1,539 +0,0 @@ -/** - * Copyright © 2016-2024 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.service.edqs; - -import com.fasterxml.jackson.databind.MappingIterator; -import com.fasterxml.jackson.dataformat.csv.CsvMapper; -import com.fasterxml.jackson.dataformat.csv.CsvSchema; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.thingsboard.common.util.ThingsBoardThreadFactory; -import org.thingsboard.server.common.data.ApiUsageState; -import org.thingsboard.server.common.data.AttributeScope; -import org.thingsboard.server.common.data.Customer; -import org.thingsboard.server.common.data.Dashboard; -import org.thingsboard.server.common.data.Device; -import org.thingsboard.server.common.data.DeviceProfile; -import org.thingsboard.server.common.data.DeviceProfileType; -import org.thingsboard.server.common.data.EntityType; -import org.thingsboard.server.common.data.EntityView; -import org.thingsboard.server.common.data.ObjectType; -import org.thingsboard.server.common.data.StringUtils; -import org.thingsboard.server.common.data.Tenant; -import org.thingsboard.server.common.data.User; -import org.thingsboard.server.common.data.asset.Asset; -import org.thingsboard.server.common.data.asset.AssetProfile; -import org.thingsboard.server.common.data.converter.Converter; -import org.thingsboard.server.common.data.converter.ConverterType; -import org.thingsboard.server.common.data.edge.Edge; -import org.thingsboard.server.common.data.edqs.AttributeKv; -import org.thingsboard.server.common.data.edqs.LatestTsKv; -import org.thingsboard.server.common.data.group.EntityGroup; -import org.thingsboard.server.common.data.id.ApiUsageStateId; -import org.thingsboard.server.common.data.id.AssetId; -import org.thingsboard.server.common.data.id.AssetProfileId; -import org.thingsboard.server.common.data.id.ConverterId; -import org.thingsboard.server.common.data.id.CustomerId; -import org.thingsboard.server.common.data.id.DashboardId; -import org.thingsboard.server.common.data.id.DeviceId; -import org.thingsboard.server.common.data.id.DeviceProfileId; -import org.thingsboard.server.common.data.id.EdgeId; -import org.thingsboard.server.common.data.id.EntityGroupId; -import org.thingsboard.server.common.data.id.EntityId; -import org.thingsboard.server.common.data.id.EntityIdFactory; -import org.thingsboard.server.common.data.id.EntityViewId; -import org.thingsboard.server.common.data.id.IntegrationId; -import org.thingsboard.server.common.data.id.RoleId; -import org.thingsboard.server.common.data.id.RuleChainId; -import org.thingsboard.server.common.data.id.SchedulerEventId; -import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.data.id.TenantProfileId; -import org.thingsboard.server.common.data.id.UserId; -import org.thingsboard.server.common.data.id.WidgetTypeId; -import org.thingsboard.server.common.data.id.WidgetsBundleId; -import org.thingsboard.server.common.data.integration.Integration; -import org.thingsboard.server.common.data.integration.IntegrationType; -import org.thingsboard.server.common.data.kv.AttributeKvEntry; -import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; -import org.thingsboard.server.common.data.kv.BasicTsKvEntry; -import org.thingsboard.server.common.data.kv.BooleanDataEntry; -import org.thingsboard.server.common.data.kv.DoubleDataEntry; -import org.thingsboard.server.common.data.kv.JsonDataEntry; -import org.thingsboard.server.common.data.kv.KvEntry; -import org.thingsboard.server.common.data.kv.LongDataEntry; -import org.thingsboard.server.common.data.kv.StringDataEntry; -import org.thingsboard.server.common.data.relation.EntityRelation; -import org.thingsboard.server.common.data.relation.RelationTypeGroup; -import org.thingsboard.server.common.data.role.Role; -import org.thingsboard.server.common.data.role.RoleType; -import org.thingsboard.server.common.data.rule.RuleChain; -import org.thingsboard.server.common.data.scheduler.SchedulerEvent; -import org.thingsboard.server.common.data.widget.WidgetType; -import org.thingsboard.server.common.data.widget.WidgetsBundle; -import org.thingsboard.server.common.msg.edqs.EdqsService; -import org.thingsboard.server.edqs.processor.EdqsConverter; - -import java.io.FileReader; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.function.Consumer; - -import static org.thingsboard.common.util.JacksonUtil.toJsonNode; - - -@RequiredArgsConstructor -@Slf4j -//@Service -public class EdqsDataLoader { - - private final EdqsService edqsService; - private final EdqsConverter edqsConverter; - - public final static TenantId MAIN = TenantId.fromUUID(UUID.fromString("2a209df0-c7ff-11ea-a3e0-f321b0429d60")); - - private final String folder = "/home/viacheslav/Downloads/schwarz"; - - private ExecutorService executor = Executors.newFixedThreadPool(5, ThingsBoardThreadFactory.forName("edqs-publisher")); - -// @AfterStartUp(order = 100) - public void load() throws Exception { - loadCustomers(); - loadDeviceProfile(); - loadDevices(); - loadAssets(); - loadEdges(); - loadEntityViews(); - loadTenants(); - loadUsers(); - loadDashboards(); - loadRuleChains(); - loadWidgetType(); - loadWidgetBundle(); - loadConverters(); - loadIntegrations(); - loadSchedulerEvents(); - loadRoles(); - loadApiUsageStates(); - loadAssetProfile(); - loadEntityGroups(); - loadRelations(); - - loadAttributes(); - loadTs(); - } - - private void loadCustomers() throws Exception { - load("customer.csv", (values) -> { - Customer customer = new Customer(); - customer.setTitle(values.get("title")); - customer.setId(new CustomerId(UUID.fromString(values.get("id")))); - customer.setCreatedTime(Long.parseLong(values.get("created_time"))); - customer.setTenantId(tenantId(values.get("tenant_id"))); - var parentCustomerId = values.get("parent_customer_id"); - if (StringUtils.isNotEmpty(parentCustomerId)) { - customer.setParentCustomerId(new CustomerId(UUID.fromString(parentCustomerId))); - } - edqsService.onUpdate(customer.getTenantId(), customer.getId(), customer); - }); - } - - private void loadDevices() throws Exception { - load("device.csv", (values) -> { - Device device = new Device(); - device.setType(values.get("type")); - device.setName(values.get("name")); - device.setLabel(values.get("label")); - device.setId(new DeviceId(uuid(values.get("id")))); - device.setCreatedTime(parseLong(values.get("created_time"))); - device.setCustomerId(customerId(values.get("customer_id"))); - device.setTenantId(tenantId(values.get("tenant_id"))); - device.setDeviceProfileId(new DeviceProfileId(uuid(values.get("device_profile_id")))); - device.setAdditionalInfo(toJsonNode(values.get("additional_info"))); - - edqsService.onUpdate(device.getTenantId(), device.getId(), device); - }); - } - - private void loadAssets() throws Exception { - load("asset.csv", (values) -> { - Asset asset = new Asset(); - asset.setType(values.get("type")); - asset.setName(values.get("name")); - asset.setLabel(values.get("label")); - asset.setId(new AssetId(uuid(values.get("id")))); - asset.setCreatedTime(parseLong(values.get("created_time"))); - asset.setCustomerId(customerId(values.get("customer_id"))); - asset.setTenantId(tenantId(values.get("tenant_id"))); - asset.setAssetProfileId(new AssetProfileId(uuid(values.get("asset_profile_id")))); - asset.setAdditionalInfo(toJsonNode(values.get("additional_info"))); - - edqsService.onUpdate(asset.getTenantId(), asset.getId(), asset); - }); - } - - private void loadEdges() throws Exception { - load("edge.csv", (values) -> { - Edge edge = new Edge(); - edge.setId(new EdgeId(uuid(values.get("id")))); - edge.setCreatedTime(parseLong(values.get("created_time"))); - edge.setType(values.get("type")); - edge.setName(values.get("name")); - edge.setLabel(values.get("label")); - edge.setCustomerId(customerId(values.get("customer_id"))); - edge.setTenantId(tenantId(values.get("tenant_id"))); - edge.setAdditionalInfo(toJsonNode(values.get("additional_info"))); - - edqsService.onUpdate(edge.getTenantId(), edge.getId(), edge); - }); - } - - private void loadEntityViews() throws Exception { - load("entity_view.csv", (values) -> { - EntityView entityView = new EntityView(); - entityView.setId(new EntityViewId(uuid(values.get("id")))); - entityView.setCreatedTime(parseLong(values.get("created_time"))); - entityView.setType(values.get("type")); - entityView.setName(values.get("name")); - entityView.setCustomerId(customerId(values.get("customer_id"))); - entityView.setTenantId(tenantId(values.get("tenant_id"))); - entityView.setAdditionalInfo(toJsonNode(values.get("additional_info"))); - - edqsService.onUpdate(entityView.getTenantId(), entityView.getId(), entityView); - }); - } - - private void loadTenants() throws Exception { - load("tenant.csv", (values) -> { - Tenant tenant = new Tenant(); - tenant.setId(new TenantId(uuid(values.get("id")))); - tenant.setCreatedTime(parseLong(values.get("created_time"))); - tenant.setEmail(values.get("email")); - tenant.setTitle(values.get("title")); - tenant.setCountry(values.get("country")); - tenant.setState(values.get("state")); - tenant.setCity(values.get("city")); - tenant.setAddress(values.get("address")); - tenant.setAddress2(values.get("address2")); - tenant.setZip(values.get("zip")); - tenant.setPhone(values.get("phone")); - tenant.setRegion(values.get("region")); - tenant.setTenantProfileId(new TenantProfileId(uuid(values.get("tenant_profile_id")))); - tenant.setAdditionalInfo(toJsonNode(values.get("additional_info"))); - edqsService.onUpdate(MAIN, tenant.getId(), tenant); - }); - } - - private void loadUsers() throws Exception { - load("user.csv", (values) -> { - User user = new User(); - user.setId(new UserId(uuid(values.get("id")))); - user.setCreatedTime(parseLong(values.get("created_time"))); - user.setTenantId(tenantId(values.get("tenant_id"))); - user.setFirstName(values.get("first_name")); - user.setLastName(values.get("last_name")); - user.setEmail(values.get("email")); - user.setPhone(values.get("phone")); - user.setAdditionalInfo(toJsonNode(values.get("additional_info"))); - - edqsService.onUpdate(user.getTenantId(), user.getId(), user); - }); - } - - private void loadDashboards() throws Exception { - load("dashboard.csv", (values) -> { - Dashboard dashboard = new Dashboard(); - dashboard.setId(new DashboardId(uuid(values.get("id")))); - dashboard.setCreatedTime(parseLong(values.get("created_time"))); - dashboard.setTenantId(tenantId(values.get("tenant_id"))); - dashboard.setTitle(values.get("title")); - - edqsService.onUpdate(dashboard.getTenantId(), dashboard.getId(), dashboard); - }); - } - - private void loadEntityGroups() throws Exception { - load("entity_group.csv", (values) -> { - EntityGroup entityGroup = new EntityGroup(); - entityGroup.setId(new EntityGroupId(uuid(values.get("id")))); - entityGroup.setCreatedTime(parseLong(values.get("created_time"))); - entityGroup.setName(values.get("name")); - entityGroup.setOwnerId(entityId(values.get("owner_type"), values.get("owner_id"))); - entityGroup.setType(EntityType.valueOf(values.get("type"))); - edqsService.onUpdate(MAIN, entityGroup.getId(), entityGroup); - }); - } - - private void loadRelations() throws Exception { - load("relation.csv", (values) -> { - EntityRelation entityRelation = new EntityRelation(); - entityRelation.setFrom(entityId(values.get("from_type"), values.get("from_id"))); - entityRelation.setTo(entityId(values.get("to_type"), values.get("to_id"))); - entityRelation.setTypeGroup(RelationTypeGroup.valueOf(values.get("relation_type_group"))); - entityRelation.setType(values.get("relation_type")); - edqsService.onUpdate(MAIN, ObjectType.RELATION, entityRelation); - }); - } - - private void loadRuleChains() throws Exception { - load("rule_chain.csv", (values) -> { - RuleChain ruleChain = new RuleChain(); - ruleChain.setId(new RuleChainId(uuid(values.get("id")))); - ruleChain.setCreatedTime(parseLong(values.get("created_time"))); - ruleChain.setName(values.get("name")); - ruleChain.setTenantId(tenantId(values.get("tenant_id"))); - ruleChain.setAdditionalInfo(toJsonNode(values.get("additional_info"))); - - edqsService.onUpdate(ruleChain.getTenantId(), ruleChain.getId(), ruleChain); - }); - } - - private void loadWidgetType() throws Exception { - load("widget_type.csv", (values) -> { - WidgetType widgetType = new WidgetType(); - widgetType.setId(new WidgetTypeId(uuid(values.get("id")))); - widgetType.setCreatedTime(parseLong(values.get("created_time"))); - widgetType.setName(values.get("name")); - widgetType.setTenantId(tenantId(values.get("tenant_id"))); - - edqsService.onUpdate(widgetType.getTenantId(), widgetType.getId(), widgetType); - }); - } - - private void loadWidgetBundle() throws Exception { - load("widgets_bundle.csv", (values) -> { - WidgetsBundle widgetsBundle = new WidgetsBundle(); - widgetsBundle.setId(new WidgetsBundleId(uuid(values.get("id")))); - widgetsBundle.setCreatedTime(parseLong(values.get("created_time"))); - widgetsBundle.setTitle(values.get("title")); - widgetsBundle.setTenantId(tenantId(values.get("tenant_id"))); - - edqsService.onUpdate(widgetsBundle.getTenantId(), widgetsBundle.getId(), widgetsBundle); - }); - } - - private void loadConverters() throws Exception { - load("converter.csv", (values) -> { - Converter converter = new Converter(); - converter.setId(new ConverterId(uuid(values.get("id")))); - converter.setCreatedTime(parseLong(values.get("created_time"))); - converter.setName(values.get("name")); - converter.setType(ConverterType.valueOf(values.get("type"))); - converter.setTenantId(tenantId(values.get("tenant_id"))); - converter.setEdgeTemplate(parseBoolean(values.get("is_edge_template"))); - converter.setAdditionalInfo(toJsonNode(values.get("additional_info"))); - - edqsService.onUpdate(converter.getTenantId(), converter.getId(), converter); - }); - } - - private void loadIntegrations() throws Exception { - load("integration.csv", (values) -> { - Integration integration = new Integration(); - integration.setId(new IntegrationId(uuid(values.get("id")))); - integration.setCreatedTime(parseLong(values.get("created_time"))); - integration.setName(values.get("name")); - integration.setType(IntegrationType.valueOf(values.get("type"))); - integration.setTenantId(tenantId(values.get("tenant_id"))); - integration.setEdgeTemplate(parseBoolean(values.get("is_edge_template"))); - integration.setAdditionalInfo(toJsonNode(values.get("additional_info"))); - - edqsService.onUpdate(integration.getTenantId(), integration.getId(), integration); - }); - } - - private void loadSchedulerEvents() throws Exception { - load("scheduler_event.csv", (values) -> { - SchedulerEvent schedulerEvent = new SchedulerEvent(); - schedulerEvent.setId(new SchedulerEventId(uuid(values.get("id")))); - schedulerEvent.setCreatedTime(parseLong(values.get("created_time"))); - schedulerEvent.setName(values.get("name")); - schedulerEvent.setType(values.get("type")); - schedulerEvent.setTenantId(tenantId(values.get("tenant_id"))); - schedulerEvent.setConfiguration(toJsonNode(values.get("configuration"))); - schedulerEvent.setSchedule(toJsonNode(values.get("schedule"))); - schedulerEvent.setOriginatorId(entityId(values.get("originator_type"), values.get("originator_id"))); - schedulerEvent.setAdditionalInfo(toJsonNode(values.get("additional_info"))); - - edqsService.onUpdate(schedulerEvent.getTenantId(), schedulerEvent.getId(), schedulerEvent); - }); - } - - private void loadRoles() throws Exception { - load("role.csv", (values) -> { - Role role = new Role(); - role.setId(new RoleId(uuid(values.get("id")))); - role.setCreatedTime(parseLong(values.get("created_time"))); - role.setName(values.get("name")); - role.setType(RoleType.valueOf(values.get("type"))); - role.setTenantId(tenantId(values.get("tenant_id"))); - role.setAdditionalInfo(toJsonNode(values.get("additional_info"))); - - edqsService.onUpdate(role.getTenantId(), role.getId(), role); - }); - } - - private void loadApiUsageStates() throws Exception { - load("api_usage_state.csv", (values) -> { - ApiUsageState apiUsageState = new ApiUsageState(); - apiUsageState.setId(new ApiUsageStateId(uuid(values.get("id")))); - apiUsageState.setCreatedTime(parseLong(values.get("created_time"))); - apiUsageState.setEntityId(entityId(values.get("entity_type"), values.get("entity_id"))); - apiUsageState.setTenantId(tenantId(values.get("tenant_id"))); - - edqsService.onUpdate(apiUsageState.getTenantId(), apiUsageState.getId(), apiUsageState); - }); - } - - private void loadDeviceProfile() throws Exception { - load("device_profile.csv", (values) -> { - DeviceProfile deviceProfile = new DeviceProfile(); - deviceProfile.setId(new DeviceProfileId(uuid(values.get("id")))); - deviceProfile.setCreatedTime(parseLong(values.get("created_time"))); - deviceProfile.setName(values.get("name")); - deviceProfile.setType(DeviceProfileType.valueOf(values.get("type"))); - deviceProfile.setTenantId(tenantId(values.get("tenant_id"))); - - edqsService.onUpdate(deviceProfile.getTenantId(), deviceProfile.getId(), deviceProfile); - }); - } - - private void loadAssetProfile() throws Exception { - load("asset_profile.csv", (values) -> { - AssetProfile assetProfile = new AssetProfile(); - assetProfile.setId(new AssetProfileId(uuid(values.get("id")))); - assetProfile.setCreatedTime(parseLong(values.get("created_time"))); - assetProfile.setName(values.get("name")); - assetProfile.setTenantId(tenantId(values.get("tenant_id"))); - - edqsService.onUpdate(assetProfile.getTenantId(), assetProfile.getId(), assetProfile); - }); - } - - private void loadAttributes() throws Exception { - load("attribute.csv", (values) -> { - EntityId entityId = EntityIdFactory.getByTypeAndId(values.get("entity_type"), values.get("entity_id")); - long ts = parseLong(values.get("last_update_ts")); - AttributeScope scope = AttributeScope.valueOf(values.get("attribute_type")); - String key = values.get("attribute_key"); - KvEntry kvEntry; - if (StringUtils.isNotEmpty(values.get("bool_v"))) { - kvEntry = new BooleanDataEntry(key, "t".equals(values.get("bool_v"))); - } else if (StringUtils.isNotEmpty(values.get("str_v"))) { - kvEntry = new StringDataEntry(key, values.get("str_v")); - } else if (StringUtils.isNotEmpty(values.get("long_v"))) { - kvEntry = new LongDataEntry(key, parseLong(values.get("long_v"))); - } else if (StringUtils.isNotEmpty(values.get("dbl_v"))) { - kvEntry = new DoubleDataEntry(key, Double.parseDouble(values.get("dbl_v"))); - } else if (StringUtils.isNotEmpty(values.get("json_v"))) { - kvEntry = new JsonDataEntry(key, values.get("json_v")); - } else { - kvEntry = new StringDataEntry(key, ""); - } - AttributeKvEntry attributeKvEntry = new BaseAttributeKvEntry(ts, kvEntry); - AttributeKv attributeKv = new AttributeKv(entityId, scope, attributeKvEntry, 0); - edqsService.onUpdate(MAIN, ObjectType.ATTRIBUTE_KV, attributeKv); - }); - } - - private void loadTs() throws Exception { - load("ts_kv.csv", (values) -> { - var entityTypeStr = values.get("find_entity_type"); - if (StringUtils.isEmpty(entityTypeStr)) { - return; - } - EntityId entityId = EntityIdFactory.getByTypeAndId(values.get("find_entity_type"), values.get("entity_id")); - long ts = parseLong(values.get("ts")); - String key = values.get("key"); - KvEntry kvEntry; - if (StringUtils.isNotEmpty(values.get("bool_v"))) { - kvEntry = new BooleanDataEntry(key, "t".equals(values.get("bool_v"))); - } else if (StringUtils.isNotEmpty(values.get("str_v"))) { - kvEntry = new StringDataEntry(key, values.get("str_v")); - } else if (StringUtils.isNotEmpty(values.get("long_v"))) { - kvEntry = new LongDataEntry(key, parseLong(values.get("long_v"))); - } else if (StringUtils.isNotEmpty(values.get("dbl_v"))) { - kvEntry = new DoubleDataEntry(key, Double.parseDouble(values.get("dbl_v"))); - } else if (StringUtils.isNotEmpty(values.get("json_v"))) { - kvEntry = new JsonDataEntry(key, values.get("json_v")); - } else { - kvEntry = new StringDataEntry(key, ""); - } - BasicTsKvEntry tsKvEntry = new BasicTsKvEntry(ts, kvEntry); - edqsService.onUpdate(MAIN, ObjectType.LATEST_TS_KV, new LatestTsKv(entityId, tsKvEntry, 0L)); - }); - } - - private void load(String file, Consumer> function) throws Exception { - Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("loader-" + file)).submit(() -> { - try { - long ts = System.currentTimeMillis(); - CsvSchema schema = CsvSchema.emptySchema().withHeader().withColumnSeparator('|'); - CsvMapper mapper = new CsvMapper(); - MappingIterator> it = mapper - .readerFor(Map.class) - .with(schema) - .readValues(new FileReader(folder + "/" + file)); - - int success = 0; - int failure = 0; - while (it.hasNextValue()) { - Map row = it.nextValue(); - try { - function.accept(row); - success++; - if (success % 1000 == 0) { - log.info("Loaded [{}] from [{}]", success, file); - } - } catch (Exception e) { - log.error("Failed to parse str: [{}]", row, e); - failure++; - } - } - log.info("Loaded [{}] from [{}] in {}ms. Failures {}", success, file, (System.currentTimeMillis() - ts), failure); - } catch (Throwable t) { - log.error("Failed to load data from [{}]", file, t); - } - }); - } - - private static TenantId tenantId(String id) { - return TenantId.fromUUID(UUID.fromString(id)); - } - - private static CustomerId customerId(String id) { - var c = new CustomerId(UUID.fromString(id)); - return c.isNullUid() ? null : c; - } - - private static EntityId entityId(String type, String id) { - return EntityIdFactory.getByTypeAndId(type, id); - } - - private static UUID uuid(String id) { - return UUID.fromString(id); - } - - private static long parseLong(String time) { - return Long.parseLong(time); - } - - private static boolean parseBoolean(String value) { - return Boolean.parseBoolean(value); - } - -} diff --git a/application/src/main/java/org/thingsboard/server/service/edqs/EdqsSyncService.java b/application/src/main/java/org/thingsboard/server/service/edqs/EdqsSyncService.java index 8e12da56e0..bf8e096059 100644 --- a/application/src/main/java/org/thingsboard/server/service/edqs/EdqsSyncService.java +++ b/application/src/main/java/org/thingsboard/server/service/edqs/EdqsSyncService.java @@ -32,8 +32,6 @@ import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageDataIterable; -import org.thingsboard.server.common.data.page.SortOrder; -import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.RelationTypeGroup; import org.thingsboard.server.dao.Dao; import org.thingsboard.server.dao.attributes.AttributesDao; @@ -41,13 +39,15 @@ import org.thingsboard.server.dao.dictionary.KeyDictionaryDao; import org.thingsboard.server.dao.entity.EntityDaoRegistry; import org.thingsboard.server.dao.group.EntityGroupDao; import org.thingsboard.server.dao.model.sql.AttributeKvEntity; +import org.thingsboard.server.dao.model.sql.RelationEntity; import org.thingsboard.server.dao.model.sqlts.dictionary.KeyDictionaryEntry; import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestEntity; -import org.thingsboard.server.dao.relation.RelationDao; +import org.thingsboard.server.dao.sql.relation.RelationRepository; +import org.thingsboard.server.dao.sqlts.latest.TsKvLatestRepository; import org.thingsboard.server.dao.tenant.TenantDao; -import org.thingsboard.server.dao.timeseries.TimeseriesLatestDao; import java.util.EnumSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -92,11 +92,11 @@ public abstract class EdqsSyncService { @Autowired private KeyDictionaryDao keyDictionaryDao; @Autowired - private RelationDao relationDao; + private RelationRepository relationRepository; @Autowired private EntityGroupDao entityGroupDao; @Autowired - private TimeseriesLatestDao timeseriesLatestDao; + private TsKvLatestRepository tsKvLatestRepository; @Autowired @Lazy private DefaultEdqsService edqsService; @@ -171,7 +171,7 @@ public abstract class EdqsSyncService { private void syncEntityGroups() { log.info("Synchronizing entity groups to EDQS"); long ts = System.currentTimeMillis(); - var entityGroups = new PageDataIterable<>(entityGroupDao::findAllFields, 10000); + var entityGroups = new PageDataIterable<>(entityGroupDao::findAllFields, 30000); for (EntityFields groupFields : entityGroups) { EntityIdInfo entityIdInfo = entityInfoMap.get(groupFields.getOwnerId()); if (entityIdInfo != null) { @@ -187,18 +187,43 @@ public abstract class EdqsSyncService { private void syncRelations() { log.info("Synchronizing relations to EDQS"); long ts = System.currentTimeMillis(); - var relations = new PageDataIterable<>(relationDao::findAll, 10000); - for (EntityRelation relation : relations) { - if (relation.getTypeGroup() == RelationTypeGroup.COMMON || relation.getTypeGroup() == RelationTypeGroup.FROM_ENTITY_GROUP) { - EntityIdInfo entityIdInfo = entityInfoMap.get(relation.getFrom().getId()); + UUID lastFromEntityId = UUID.fromString("00000000-0000-0000-0000-000000000000"); + String lastFromEntityType = ""; + String lastRelationTypeGroup = ""; + String lastRelationType = ""; + UUID lastToEntityId = UUID.fromString("00000000-0000-0000-0000-000000000000"); + String lastToEntityType = ""; + + while (true) { + List batch = relationRepository.findNextBatch(lastFromEntityId, lastFromEntityType, lastRelationTypeGroup, + lastRelationType, lastToEntityId, lastToEntityType, 10000); + if (batch.isEmpty()) { + break; + } + processRelationBatch(batch); + + RelationEntity lastRecord = batch.get(batch.size() - 1); + lastFromEntityId = lastRecord.getFromId(); + lastFromEntityType = lastRecord.getFromType(); + lastRelationTypeGroup = lastRecord.getRelationTypeGroup(); + lastRelationType = lastRecord.getRelationType(); + lastToEntityId = lastRecord.getToId(); + lastToEntityType = lastRecord.getToType(); + } + log.info("Finished synchronizing relations to EDQS in {} ms", (System.currentTimeMillis() - ts)); + } + + private void processRelationBatch(List relations) { + for (RelationEntity relation : relations) { + if (RelationTypeGroup.COMMON.name().equals(relation.getRelationTypeGroup()) || (RelationTypeGroup.FROM_ENTITY_GROUP.name().equals(relation.getRelationTypeGroup()))) { + EntityIdInfo entityIdInfo = entityInfoMap.get(relation.getFromId()); if (entityIdInfo != null) { - process(entityIdInfo.tenantId(), RELATION, relation); + process(entityIdInfo.tenantId(), RELATION, relation.toData()); } else { - log.info("Relation from entity not found: " + relation.getFrom()); + log.info("Relation from entity not found: " + relation.getFromId()); } } } - log.info("Finished synchronizing relations to EDQS in {} ms", (System.currentTimeMillis() - ts)); } private void loadKeyDictionary() { @@ -214,8 +239,28 @@ public abstract class EdqsSyncService { private void syncAttributes() { log.info("Synchronizing attributes to EDQS"); long ts = System.currentTimeMillis(); - var attributes = new PageDataIterable<>(attributesDao::findAll, 10000); - for (AttributeKvEntity attribute : attributes) { + + UUID lastEntityId = UUID.fromString("00000000-0000-0000-0000-000000000000"); + int lastAttributeType = Integer.MIN_VALUE; + int lastAttributeKey = Integer.MIN_VALUE; + + while (true) { + List batch = attributesDao.findNextBatch(lastEntityId, lastAttributeType, lastAttributeKey, 10000); + if (batch.isEmpty()) { + break; + } + processAttributeBatch(batch); + + AttributeKvEntity lastRecord = batch.get(batch.size() - 1); + lastEntityId = lastRecord.getId().getEntityId(); + lastAttributeType = lastRecord.getId().getAttributeType(); + lastAttributeKey = lastRecord.getId().getAttributeKey(); + } + log.info("Finished synchronizing attributes to EDQS in {} ms", (System.currentTimeMillis() - ts)); + } + + private void processAttributeBatch(List batch) { + for (AttributeKvEntity attribute : batch) { attribute.setStrKey(getStrKeyOrFetchFromDb(attribute.getId().getAttributeKey())); UUID entityId = attribute.getId().getEntityId(); EntityIdInfo entityIdInfo = entityInfoMap.get(entityId); @@ -230,13 +275,29 @@ public abstract class EdqsSyncService { attribute.getVersion()); process(entityIdInfo.tenantId(), ATTRIBUTE_KV, attributeKv); } - log.info("Finished synchronizing attributes to EDQS in {} ms", (System.currentTimeMillis() - ts)); } private void syncLatestTimeseries() { log.info("Synchronizing latest timeseries to EDQS"); long ts = System.currentTimeMillis(); - var tsKvLatestEntities = new PageDataIterable<>(pageLink -> timeseriesLatestDao.findAllLatest(pageLink), 10000); + UUID lastEntityId = UUID.fromString("00000000-0000-0000-0000-000000000000"); + int lastKey = Integer.MIN_VALUE; + + while (true) { + List batch = tsKvLatestRepository.findNextBatch(lastEntityId, lastKey, 10000); + if (batch.isEmpty()) { + break; + } + processTsKvLatestBatch(batch); + + TsKvLatestEntity lastRecord = batch.get(batch.size() - 1); + lastEntityId = lastRecord.getEntityId(); + lastKey = lastRecord.getKey(); + } + log.info("Finished synchronizing latest timeseries to EDQS in {} ms", (System.currentTimeMillis() - ts)); + } + + private void processTsKvLatestBatch(List tsKvLatestEntities) { for (TsKvLatestEntity tsKvLatestEntity : tsKvLatestEntities) { try { String strKey = getStrKeyOrFetchFromDb(tsKvLatestEntity.getKey()); @@ -256,7 +317,6 @@ public abstract class EdqsSyncService { log.error("Failed to sync latest timeseries: {}", tsKvLatestEntity, e); } } - log.info("Finished synchronizing latest timeseries to EDQS in {} ms", (System.currentTimeMillis() - ts)); } private String getStrKeyOrFetchFromDb(int key) { @@ -265,7 +325,9 @@ public abstract class EdqsSyncService { return strKey; } else { strKey = keyDictionaryDao.getKey(key); - keys.putIfAbsent(key, strKey); + if (strKey != null) { + keys.put(key, strKey); + } } return strKey; } diff --git a/application/src/main/java/org/thingsboard/server/service/edqs/KafkaEdqsSyncService.java b/application/src/main/java/org/thingsboard/server/service/edqs/KafkaEdqsSyncService.java index d51d87ed01..f4e9a02b45 100644 --- a/application/src/main/java/org/thingsboard/server/service/edqs/KafkaEdqsSyncService.java +++ b/application/src/main/java/org/thingsboard/server/service/edqs/KafkaEdqsSyncService.java @@ -40,7 +40,7 @@ public class KafkaEdqsSyncService extends EdqsSyncService { @Override public boolean isSyncNeeded() { - return kafkaAdmin.isTopicEmpty(EdqsQueue.STATE.getTopic()); + return kafkaAdmin.isTopicEmpty(EdqsQueue.EVENTS.getTopic()); } diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 9b182ce3a7..dd0e5948c7 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -1616,11 +1616,11 @@ queue: # Kafka properties for Edge event topic edge-event: "${TB_QUEUE_KAFKA_EDGE_EVENT_TOPIC_PROPERTIES:retention.ms:2592000000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" # Kafka properties for EDQS events topics. Partitions number must be the same as queue.edqs.partitions - edqs-events: "${TB_QUEUE_KAFKA_EDQS_EVENTS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:1048576000;partitions:12;min.insync.replicas:1}" + edqs-events: "${TB_QUEUE_KAFKA_EDQS_EVENTS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:-1;partitions:12;min.insync.replicas:1}" # Kafka properties for EDQS requests topic (default: 3 minutes retention). Partitions number must be the same as queue.edqs.partitions edqs-requests: "${TB_QUEUE_KAFKA_EDQS_REQUESTS_TOPIC_PROPERTIES:retention.ms:180000;segment.bytes:52428800;retention.bytes:1048576000;partitions:12;min.insync.replicas:1}" # Kafka properties for EDQS state topic (infinite retention, compaction). Partitions number must be the same as queue.edqs.partitions - edqs-state: "${TB_QUEUE_KAFKA_EDQS_LATEST_EVENTS_TOPIC_PROPERTIES:retention.ms:-1;segment.bytes:52428800;retention.bytes:1048576000;partitions:12;min.insync.replicas:1;cleanup.policy:compact}" + edqs-state: "${TB_QUEUE_KAFKA_EDQS_STATE_TOPIC_PROPERTIES:retention.ms:-1;segment.bytes:52428800;retention.bytes:-1;partitions:12;min.insync.replicas:1;cleanup.policy:compact}" consumer-stats: # Prints lag between consumer group offset and last messages offset in Kafka topics enabled: "${TB_QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}" @@ -1701,6 +1701,7 @@ queue: local: rocksdb_path: "${TB_EDQS_ROCKSDB_PATH:/tmp/edqs-backup}" partitions: "${TB_EDQS_PARTITIONS:12}" + partitioning_strategy: "${TB_EDQS_PARTITIONING_STRATEGY:tenant}" # tenant or none. For 'none', each instance handles all partitions and duplicates all the data requests_topic: "${TB_EDQS_REQUESTS_TOPIC:edqs.requests}" responses_topic: "${TB_EDQS_RESPONSES_TOPIC:edqs.responses}" poll_interval: "${TB_EDQS_POLL_INTERVAL_MS:125}" diff --git a/application/src/test/java/org/thingsboard/server/service/entitiy/EntityServiceTest.java b/application/src/test/java/org/thingsboard/server/service/entitiy/EntityServiceTest.java index dc29cec3cc..04bad95be7 100644 --- a/application/src/test/java/org/thingsboard/server/service/entitiy/EntityServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/entitiy/EntityServiceTest.java @@ -210,7 +210,6 @@ public class EntityServiceTest extends AbstractControllerTest { countByQueryAndCheck(countQuery, 0); } - @Test public void testCountHierarchicalEntitiesByQuery() throws InterruptedException { List assets = new ArrayList<>(); @@ -463,6 +462,7 @@ public class EntityServiceTest extends AbstractControllerTest { deviceService.deleteDevicesByTenantId(tenantId); } + // fails for sql implementation until we fix the issue with the relation query @Test public void testCountHierarchicalEntitiesByMultiRootQuery() throws InterruptedException { List buildings = new ArrayList<>(); @@ -1443,18 +1443,29 @@ public class EntityServiceTest extends AbstractControllerTest { String deviceName = result.getData().get(0).getLatest().get(EntityKeyType.ENTITY_FIELD).get("name").getValue(); assertThat(deviceName).isEqualTo(customerDevices.get(0).getName()); + // find by customer user with generic permission + MergedUserPermissions mergedGenericPermission = new MergedUserPermissions(Map.of(Resource.DEVICE, Set.of(Operation.READ)), Collections.emptyMap()); + PageData customerResults = findByQueryAndCheck(customerId, mergedGenericPermission, query, 1); + + String cutomerDeviceName = customerResults.getData().get(0).getLatest().get(EntityKeyType.ENTITY_FIELD).get("name").getValue(); + assertThat(cutomerDeviceName).isEqualTo(customerDevices.get(0).getName()); + + // find by customer user with group permission + MergedUserPermissions mergedGroupOnlyPermission = new MergedUserPermissions(Collections.emptyMap(), Map.of(customerDeviceGroup.getId(), new MergedGroupPermissionInfo(EntityType.DEVICE, Set.of(Operation.READ)))); + PageData result2 = findByQueryAndCheck(customerId, mergedGroupOnlyPermission, query, 1); + + String resultDeviceName2 = result2.getData().get(0).getLatest().get(EntityKeyType.ENTITY_FIELD).get("name").getValue(); + assertThat(resultDeviceName2).isEqualTo(customerDevices.get(0).getName()); + // try to find tenant device by customer user SingleEntityFilter tenantDeviceFilter = new SingleEntityFilter(); tenantDeviceFilter.setSingleEntity(tenantDevices.get(0).getId()); EntityDataQuery customerQuery2 = new EntityDataQuery(tenantDeviceFilter, pageLink, entityFields, null, null); - PageData customerResults2 = entityService.findEntityDataByQuery(tenantId, customerId, customerQuery2); - - assertEquals(0, customerResults2.getTotalElements()); + findByQueryAndCheck(customerId, mergedGenericPermission, customerQuery2, 0); // find by tenant user with group permission - PageData results3 = entityService.findEntityDataByQuery(tenantId, new CustomerId(EntityId.NULL_UUID), query); + PageData results3 = findByQueryAndCheck(new CustomerId(EntityId.NULL_UUID), mergedGroupOnlyPermission, query, 1); - assertEquals(1, results3.getTotalElements()); String deviceName3 = results3.getData().get(0).getLatest().get(EntityKeyType.ENTITY_FIELD).get("name").getValue(); assertThat(deviceName3).isEqualTo(customerDevices.get(0).getName()); } @@ -1488,16 +1499,14 @@ public class EntityServiceTest extends AbstractControllerTest { // find by customer user with generic permissions apiUsageStateService.createDefaultApiUsageState(tenantId, customerId); - PageData customerResult = entityService.findEntityDataByQuery(tenantId, customerId, query); + PageData customerResult = findByQueryAndCheck(customerId, query, 1); - assertEquals(1, customerResult.getTotalElements()); String customerResultName = customerResult.getData().get(0).getLatest().get(EntityKeyType.ENTITY_FIELD).get("name").getValue(); assertThat(customerResultName).isEqualTo(TEST_CUSTOMER_NAME); // find by tenant user with customerId filter apiUsageStateFilter.setCustomerId(customerId); - PageData tenantResult = searchEntities(query); - assertEquals(1, tenantResult.getTotalElements()); + PageData tenantResult = findByQueryAndCheck(query, 1); String tenantResultName = tenantResult.getData().get(0).getLatest().get(EntityKeyType.ENTITY_FIELD).get("name").getValue(); assertThat(tenantResultName).isEqualTo(TEST_CUSTOMER_NAME); } @@ -1625,6 +1634,231 @@ public class EntityServiceTest extends AbstractControllerTest { deviceService.deleteDevicesByTenantId(tenantId); } + @Test + public void testFindEntityDataByRelationQuery_blobEntity_customerLevel() { + final int deviceCnt = 2; + final int relationsCnt = 3; + final int blobEntitiesCnt = deviceCnt * relationsCnt; + + Customer customer = new Customer(); + customer.setTenantId(tenantId); + customer.setTitle("Customer Relation Query"); + customer = customerService.saveCustomer(customer); + + List devices = new ArrayList<>(); + for (int i = 0; i < deviceCnt; i++) { + Device device = new Device(); + device.setTenantId(tenantId); + device.setName("Device relation query " + i); + device.setCustomerId(customer.getId()); + device.setType("default"); + devices.add(deviceService.saveDevice(device)); + } + + List blobEntities = new ArrayList<>(); + for (int i = 0; i < blobEntitiesCnt; i++) { + BlobEntity blobEntity = new BlobEntity(); + blobEntity.setName("Blob relation query " + i); + blobEntity.setTenantId(tenantId); + blobEntity.setContentType("image/png"); + blobEntity.setData(ByteBuffer.allocate(1024)); + blobEntity.setCustomerId(customer.getId()); + blobEntity.setType("Report"); + blobEntities.add(blobEntityService.saveBlobEntity(blobEntity)); + } + + for (int i = 0; i < deviceCnt; i++) { + for (int j = 0; j < relationsCnt; j++) { + EntityRelation relationEntity = new EntityRelation(); + relationEntity.setFrom(devices.get(i).getId()); + relationEntity.setTo(blobEntities.get(j + (i * relationsCnt)).getId()); + relationEntity.setTypeGroup(RelationTypeGroup.COMMON); + relationEntity.setType("fileAttached"); + relationService.saveRelation(tenantId, relationEntity); + } + } + + MergedUserPermissions mergedUserPermissions = new MergedUserPermissions(Map.of(ALL, Set.of(Operation.ALL)), Collections.emptyMap()); + + RelationEntityTypeFilter relationEntityTypeFilter = new RelationEntityTypeFilter("fileAttached", Collections.singletonList(EntityType.BLOB_ENTITY)); + RelationsQueryFilter filter = new RelationsQueryFilter(); + filter.setFilters(Collections.singletonList(relationEntityTypeFilter)); + filter.setDirection(EntitySearchDirection.FROM); + EntityDataPageLink pageLink = new EntityDataPageLink(10, 0, null, null); + + for (Device device : devices) { + filter.setRootEntity(device.getId()); + + EntityDataQuery query = new EntityDataQuery(filter, pageLink, Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); + findByQueryAndCheck(customer.getId(), mergedUserPermissions, query, relationsCnt); + countByQueryAndCheck(customer.getId(), mergedUserPermissions, query, relationsCnt); + /* + In order to be careful with updating Relation Query while adding new Entity Type, + this checkup will help to find place, where you could check the correctness of building query + */ + Assert.assertEquals(38, EntityType.values().length); + } + } + + @Test + public void testFindEntitiesByRelationEntityTypeFilterWithTenantGroupPermission() { + final int assetCount = 2; + final int relationsCnt = 4; + final int deviceEntitiesCnt = assetCount * relationsCnt; + + EntityGroup deviceGroup = new EntityGroup(); + deviceGroup.setName("Device Tenant Level Group"); + deviceGroup.setOwnerId(tenantId); + deviceGroup.setTenantId(tenantId); + deviceGroup.setType(EntityType.DEVICE); + deviceGroup = entityGroupService.saveEntityGroup(tenantId, tenantId, deviceGroup); + + List assets = new ArrayList<>(); + for (int i = 0; i < assetCount; i++) { + Asset building = new Asset(); + building.setTenantId(tenantId); + building.setName("Building _" + i); + building.setType("building"); + building = assetService.saveAsset(building); + assets.add(building); + } + + List devices = new ArrayList<>(); + for (int i = 0; i < deviceEntitiesCnt; i++) { + Device device = new Device(); + device.setTenantId(tenantId); + device.setName("Test device " + i); + device.setType("default"); + Device savedDevice = deviceService.saveDevice(device); + devices.add(savedDevice); + if (i % 2 == 0) { + entityGroupService.addEntityToEntityGroup(tenantId, deviceGroup.getId(), savedDevice.getId()); + } + } + + for (int i = 0; i < assetCount; i++) { + for (int j = 0; j < relationsCnt; j++) { + EntityRelation relationEntity = new EntityRelation(); + relationEntity.setFrom(assets.get(i).getId()); + relationEntity.setTo(devices.get(j + (i * relationsCnt)).getId()); + relationEntity.setTypeGroup(RelationTypeGroup.COMMON); + relationEntity.setType("contains"); + relationService.saveRelation(tenantId, relationEntity); + } + } + + MergedUserPermissions groupOnlyPermission = new MergedUserPermissions(Collections.emptyMap(), + Map.of(deviceGroup.getId(), new MergedGroupPermissionInfo(EntityType.DEVICE, Set.of(Operation.READ)))); + + RelationEntityTypeFilter relationEntityTypeFilter = new RelationEntityTypeFilter("contains", Collections.singletonList(EntityType.DEVICE)); + RelationsQueryFilter filter = new RelationsQueryFilter(); + filter.setFilters(Collections.singletonList(relationEntityTypeFilter)); + filter.setDirection(EntitySearchDirection.FROM); + EntityDataPageLink pageLink = new EntityDataPageLink(10, 0, null, null); + List keyFiltersEqualString = createStringKeyFilters("name", EntityKeyType.ENTITY_FIELD, StringOperation.STARTS_WITH, "Test device "); + + for (Asset asset : assets) { + filter.setRootEntity(asset.getId()); + + EntityDataQuery query = new EntityDataQuery(filter, pageLink, Collections.emptyList(), Collections.emptyList(), keyFiltersEqualString); + findByQueryAndCheck(new CustomerId(EntityId.NULL_UUID), groupOnlyPermission, query, relationsCnt / 2); + countByQueryAndCheck(new CustomerId(EntityId.NULL_UUID), groupOnlyPermission, query, relationsCnt / 2); + } + } + + @Test + public void testFindEntitiesWithRelationEntityTypeFilterByCustomerUser() { + Customer customer = new Customer(); + customer.setTenantId(tenantId); + customer.setTitle("Customer Relation Query"); + customer = customerService.saveCustomer(customer); + + final int assetCount = 2; + final int relationsCnt = 4; + final int deviceEntitiesCnt = assetCount * relationsCnt; + + EntityGroup deviceGroup = new EntityGroup(); + deviceGroup.setName("Device Tenant Level Group"); + deviceGroup.setOwnerId(customer.getId()); + deviceGroup.setTenantId(tenantId); + deviceGroup.setType(EntityType.DEVICE); + deviceGroup = entityGroupService.saveEntityGroup(tenantId, tenantId, deviceGroup); + + List assets = new ArrayList<>(); + for (int i = 0; i < assetCount; i++) { + Asset building = new Asset(); + building.setTenantId(tenantId); + building.setCustomerId(customer.getId()); + building.setName("Building _" + i); + building.setType("building"); + building = assetService.saveAsset(building); + assets.add(building); + } + + List devices = new ArrayList<>(); + for (int i = 0; i < deviceEntitiesCnt; i++) { + Device device = new Device(); + device.setTenantId(tenantId); + device.setCustomerId(customer.getId()); + device.setName("Test device " + i); + device.setType("default"); + Device savedDevice = deviceService.saveDevice(device); + devices.add(savedDevice); + if (i % 2 == 0) { + entityGroupService.addEntityToEntityGroup(tenantId, deviceGroup.getId(), savedDevice.getId()); + } + } + + for (int i = 0; i < assetCount; i++) { + for (int j = 0; j < relationsCnt; j++) { + EntityRelation relationEntity = new EntityRelation(); + relationEntity.setFrom(assets.get(i).getId()); + relationEntity.setTo(devices.get(j + (i * relationsCnt)).getId()); + relationEntity.setTypeGroup(RelationTypeGroup.COMMON); + relationEntity.setType("contains"); + relationService.saveRelation(tenantId, relationEntity); + } + } + + MergedUserPermissions mergedGroupOnlyPermission = new MergedUserPermissions(Collections.emptyMap(), Map.of(deviceGroup.getId(), new MergedGroupPermissionInfo(EntityType.DEVICE, Set.of(Operation.ALL)))); + MergedUserPermissions mergedGenericOnlyPermission = new MergedUserPermissions(Map.of(Resource.ALL, Set.of(Operation.ALL)), Collections.emptyMap()); + MergedUserPermissions mergedGenericAndGroupPermission = new MergedUserPermissions(Map.of(Resource.ALL, Set.of(Operation.ALL)), Map.of(deviceGroup.getId(), new MergedGroupPermissionInfo(EntityType.DEVICE, Set.of(Operation.ALL)))); + RelationEntityTypeFilter relationEntityTypeFilter = new RelationEntityTypeFilter("contains", Collections.singletonList(EntityType.DEVICE)); + RelationsQueryFilter filter = new RelationsQueryFilter(); + filter.setFilters(Collections.singletonList(relationEntityTypeFilter)); + filter.setDirection(EntitySearchDirection.FROM); + EntityDataPageLink pageLink = new EntityDataPageLink(10, 0, null, null); + List keyFiltersEqualString = createStringKeyFilters("name", EntityKeyType.ENTITY_FIELD, StringOperation.STARTS_WITH, "Test device "); + + EntityDataQuery query = new EntityDataQuery(filter, pageLink, Collections.emptyList(), Collections.emptyList(), keyFiltersEqualString); + + for (Asset asset : assets) { + filter.setRootEntity(asset.getId()); + + //check by user with generic permission + PageData relationsResult = findByQueryAndCheck(customer.getId(), mergedGenericOnlyPermission, query, relationsCnt); + countByQueryAndCheck(customer.getId(), mergedGenericOnlyPermission, query, relationsCnt); + + //check by user with generic and group permission + PageData relationsResult1 = findByQueryAndCheck(customer.getId(), mergedGenericAndGroupPermission, query, relationsCnt); + countByQueryAndCheck(customer.getId(), mergedGenericAndGroupPermission, query, relationsCnt); + + //check by other customer user with group only permission + PageData relationsResult2 = findByQueryAndCheck(otherCustomerId, mergedGroupOnlyPermission, query, relationsCnt / 2); + long relationsResultCnt2 = countByQueryAndCheck(otherCustomerId, mergedGroupOnlyPermission, query, relationsCnt / 2); + + Assert.assertEquals(relationsCnt / 2, relationsResult2.getData().size()); + Assert.assertEquals(relationsCnt / 2, relationsResultCnt2); + + //check by other customer user with generic and group only permission + PageData relationsResult3 = findByQueryAndCheck(otherCustomerId, mergedGenericAndGroupPermission, query, relationsCnt / 2); + long relationsResultCnt3 = countByQueryAndCheck(otherCustomerId, mergedGenericAndGroupPermission, query, relationsCnt / 2); + + Assert.assertEquals(relationsCnt / 2, relationsResult3.getData().size()); + Assert.assertEquals(relationsCnt / 2, relationsResultCnt3); + } + } + @Test public void testBuildNumericPredicateQueryOperations() throws ExecutionException, InterruptedException { @@ -2422,7 +2656,7 @@ public class EntityServiceTest extends AbstractControllerTest { return timeseriesService.save(tenantId, entityId, timeseries); } - private void createMultiRootHierarchy(List buildings, List apartments, + protected void createMultiRootHierarchy(List buildings, List apartments, Map> entityNameByTypeMap, Map childParentRelationMap) throws InterruptedException { for (int k = 0; k < 3; k++) { @@ -2507,7 +2741,7 @@ public class EntityServiceTest extends AbstractControllerTest { entityView.setEndTimeMs(256); entityView.setExternalId(new EntityViewId(UUID.randomUUID())); entityView.setAdditionalInfo(JacksonUtil.newObjectNode().put("test", "test")); - entityView = entityViewDao.save(tenantId, entityView); + entityView = entityViewService.saveEntityView(entityView); EntityViewTypeFilter entityViewTypeFilter = new EntityViewTypeFilter(); entityViewTypeFilter.setEntityViewNameFilter("test"); @@ -2518,21 +2752,18 @@ public class EntityServiceTest extends AbstractControllerTest { ); EntityDataQuery query = new EntityDataQuery(entityViewTypeFilter, pageLink, entityFields, Collections.emptyList(), null); - PageData relationsResult = entityService.findEntityDataByQuery(tenantId, new CustomerId(EntityId.NULL_UUID), query); - assertThat(relationsResult.getData()).hasSize(1); + PageData relationsResult = findByQueryAndCheck(new CustomerId(EntityId.NULL_UUID), query, 1); assertThat(relationsResult.getData().get(0).getLatest().get(EntityKeyType.ENTITY_FIELD).get("name").getValue()).isEqualTo(entityView.getName()); // find with non existing name entityViewTypeFilter.setEntityViewNameFilter("non-existing"); - PageData relationsResult2 = entityService.findEntityDataByQuery(tenantId, new CustomerId(EntityId.NULL_UUID), query); - assertThat(relationsResult2.getData()).hasSize(0); + findByQueryAndCheck(new CustomerId(EntityId.NULL_UUID), query, 0); // find with non existing type entityViewTypeFilter.setEntityViewNameFilter(null); entityViewTypeFilter.setEntityViewTypes(Collections.singletonList("non-existing")); - PageData relationsResult3 = entityService.findEntityDataByQuery(tenantId, new CustomerId(EntityId.NULL_UUID), query); - assertThat(relationsResult3.getData()).hasSize(0); + findByQueryAndCheck(new CustomerId(EntityId.NULL_UUID), query, 0); } private PageData findByQuery(EntityDataQuery query) { diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/edqs/fields/FieldsUtil.java b/common/data/src/main/java/org/thingsboard/server/common/data/edqs/fields/FieldsUtil.java index 5a0db392ef..f1cf51bf8b 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/edqs/fields/FieldsUtil.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/edqs/fields/FieldsUtil.java @@ -22,6 +22,7 @@ import org.thingsboard.server.common.data.Dashboard; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.DeviceProfileType; +import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityView; import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.TenantProfile; @@ -275,6 +276,7 @@ public class FieldsUtil { return ApiUsageStateFields.builder() .id(entity.getUuidId()) .createdTime(entity.getCreatedTime()) + .customerId(entity.getEntityId().getEntityType() == EntityType.CUSTOMER ? entity.getEntityId().getId() : null) .entityId(entity.getEntityId()) .transportState(entity.getTransportState()) .dbStorageState(entity.getDbStorageState()) diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java index 5383311e1f..24526205d8 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/processor/EdqsProcessor.java @@ -42,10 +42,11 @@ import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.queue.QueueConfig; +import org.thingsboard.server.common.data.util.CollectionsUtil; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; -import org.thingsboard.server.edqs.state.EdqsStateService; import org.thingsboard.server.edqs.repo.EdqRepository; +import org.thingsboard.server.edqs.state.EdqsStateService; import org.thingsboard.server.edqs.util.EdqsPartitionService; import org.thingsboard.server.edqs.util.VersionsStore; import org.thingsboard.server.gen.transport.TransportProtos; @@ -70,6 +71,7 @@ import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @EdqsComponent @@ -96,6 +98,8 @@ public class EdqsProcessor implements TbQueueHandler, private final VersionsStore versionsStore = new VersionsStore(); + private final AtomicInteger counter = new AtomicInteger(); // FIXME: TMP + @PostConstruct private void init() { consumersExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("edqs-consumer")); @@ -152,15 +156,17 @@ public class EdqsProcessor implements TbQueueHandler, responseTemplate.subscribe(withTopic(partitions, config.getRequestsTopic())); Set oldPartitions = event.getOldPartitions().get(new QueueKey(ServiceType.EDQS)); - Set removedPartitions = Sets.difference(oldPartitions, newPartitions).stream() - .map(tpi -> tpi.getPartition().orElse(-1)).collect(Collectors.toSet()); - if (config.getPartitioningStrategy() != EdqsPartitioningStrategy.TENANT && !removedPartitions.isEmpty()) { - log.warn("Partitions {} were removed but shouldn't be (due to NONE partitioning strategy)", removedPartitions); + if (CollectionsUtil.isNotEmpty(oldPartitions)) { + Set removedPartitions = Sets.difference(oldPartitions, newPartitions).stream() + .map(tpi -> tpi.getPartition().orElse(-1)).collect(Collectors.toSet()); + if (config.getPartitioningStrategy() != EdqsPartitioningStrategy.TENANT && !removedPartitions.isEmpty()) { + log.warn("Partitions {} were removed but shouldn't be (due to NONE partitioning strategy)", removedPartitions); + } + repository.clearIf(tenantId -> { + Integer partition = partitionService.resolvePartition(tenantId); + return partition != null && removedPartitions.contains(partition); + }); } - repository.clearIf(tenantId -> { - Integer partition = partitionService.resolvePartition(tenantId); - return partition != null && removedPartitions.contains(partition); - }); } catch (Throwable t) { log.error("Failed to handle partition change event {}", event, t); } @@ -221,7 +227,11 @@ public class EdqsProcessor implements TbQueueHandler, } EdqsObject object = converter.deserialize(objectType, eventMsg.getData().toByteArray()); - log.info("[{}] Processing event [{}] [{}] [{}] [{}]", tenantId, objectType, eventType, key, version); + log.debug("[{}] Processing event [{}] [{}] [{}] [{}]", tenantId, objectType, eventType, key, version); + int count = counter.incrementAndGet(); + if (count % 100000 == 0) { + log.info("Processed {} events", count); + } EdqsEvent event = EdqsEvent.builder() .tenantId(tenantId) diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/query/processor/ApiUsageStateQueryProcessor.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/query/processor/ApiUsageStateQueryProcessor.java index c2821161c7..7dbda46068 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/query/processor/ApiUsageStateQueryProcessor.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/query/processor/ApiUsageStateQueryProcessor.java @@ -76,8 +76,7 @@ public class ApiUsageStateQueryProcessor extends AbstractSingleEntityTypeQueryPr @Override protected boolean matches(EntityData ed) { ApiUsageStateFields entityFields = (ApiUsageStateFields) ed.getFields(); - return super.matches(ed) && (filter.getCustomerId() != null ? entityFields.getEntityId().equals(filter.getCustomerId()) : - entityFields.getEntityId().equals(repository.getTenantId())); + return super.matches(ed) && (filter.getCustomerId() == null || filter.getCustomerId().equals(entityFields.getEntityId())); } @Override diff --git a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java index e8e8ee610e..04ef73faa1 100644 --- a/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java +++ b/common/edqs/src/main/java/org/thingsboard/server/edqs/state/KafkaEdqsStateService.java @@ -71,7 +71,8 @@ public class KafkaEdqsStateService implements EdqsStateService { private ScheduledExecutorService scheduler; private final VersionsStore versionsStore = new VersionsStore(); - private final AtomicInteger restoredCount = new AtomicInteger(); + private final AtomicInteger stateReadCount = new AtomicInteger(); + private final AtomicInteger eventsReadCount = new AtomicInteger(); @PostConstruct private void init() { @@ -79,7 +80,7 @@ public class KafkaEdqsStateService implements EdqsStateService { mgmtExecutor = ThingsBoardExecutors.newWorkStealingPool(4, "edqs-backup-consumer-mgmt"); scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor("edqs-backup-scheduler"); - stateConsumer = MainQueueConsumerManager., QueueConfig>builder() + stateConsumer = MainQueueConsumerManager., QueueConfig>builder() // FIXME Slavik: if topic is empty .queueKey(new QueueKey(ServiceType.EDQS, EdqsQueue.STATE.getTopic())) .config(QueueConfig.of(true, config.getPollInterval())) .msgPackProcessor((msgs, consumer, config) -> { @@ -88,8 +89,8 @@ public class KafkaEdqsStateService implements EdqsStateService { ToEdqsMsg msg = queueMsg.getValue(); log.trace("Processing message: {}", msg); edqsProcessor.process(msg, EdqsQueue.STATE); - if (restoredCount.incrementAndGet() % 1000 == 0) { - log.info("Processed {} msgs", restoredCount.get()); + if (stateReadCount.incrementAndGet() % 100000 == 0) { + log.info("[state] Processed {} msgs", stateReadCount.get()); } } catch (Throwable t) { log.error("Failed to process message: {}", queueMsg, t); @@ -103,7 +104,7 @@ public class KafkaEdqsStateService implements EdqsStateService { .scheduler(scheduler) .build(); - eventsConsumer = QueueConsumerManager.>builder() + eventsConsumer = QueueConsumerManager.>builder() // FIXME Slavik writes to the state while we read it, slows down the start .name("edqs-events-to-backup-consumer") .pollInterval(config.getPollInterval()) .msgPackProcessor((msgs, consumer) -> { @@ -115,6 +116,9 @@ public class KafkaEdqsStateService implements EdqsStateService { if (msg.hasEventMsg()) { EdqsEventMsg eventMsg = msg.getEventMsg(); String key = eventMsg.getKey(); + if (eventsReadCount.incrementAndGet() % 100000 == 0) { + log.info("[events-to-backup] Processed {} msgs", eventsReadCount.get()); + } if (eventMsg.hasVersion()) { if (!versionsStore.isNew(key, eventMsg.getVersion())) { return; @@ -153,14 +157,14 @@ public class KafkaEdqsStateService implements EdqsStateService { @Override public void restore(Set partitions) { - restoredCount.set(0); + stateReadCount.set(0); //TODO Slavik: do not support remote mode in monolith setup long startTs = System.currentTimeMillis(); log.info("Restore started for partitions {}", partitions.stream().map(tpi -> tpi.getPartition().orElse(-1)).sorted().toList()); stateConsumer.doUpdate(partitions); // calling blocking doUpdate instead of update stateConsumer.awaitStop(0); // consumers should stop on their own because EdqsQueue.STATE.stopWhenRead is true, we just need to wait - log.info("Restore finished in {} ms. Processed {} msgs", (System.currentTimeMillis() - startTs), restoredCount.get()); + log.info("Restore finished in {} ms. Processed {} msgs", (System.currentTimeMillis() - startTs), stateReadCount.get()); } @Override diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java index 165a54b300..ea9ba3f245 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/discovery/HashPartitionService.java @@ -482,7 +482,7 @@ public class HashPartitionService implements PartitionService { private Set toTpiList(QueueKey queueKey, List partitions) { if (partitions == null) { - return null; + return Collections.emptySet(); } return partitions.stream() .map(partition -> buildTopicPartitionInfo(queueKey, partition)) diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java index 167fbb1751..47c776c742 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/kafka/TbKafkaConsumerTemplate.java @@ -144,10 +144,11 @@ public class TbKafkaConsumerTemplate extends AbstractTbQue stopWatch.stop(); log.trace("poll topic {} took {}ms", getTopic(), stopWatch.getTotalTimeMillis()); + List> recordList; if (records.isEmpty()) { - return Collections.emptyList(); + recordList = Collections.emptyList(); } else { - List> recordList = new ArrayList<>(256); + recordList = new ArrayList<>(256); records.forEach(record -> { recordList.add(record); if (stopWhenRead) { @@ -163,17 +164,18 @@ public class TbKafkaConsumerTemplate extends AbstractTbQue } } }); - if (endOffsets != null && endOffsets.isEmpty()) { - log.info("Reached end offsets for {}, stopping consumer", consumer.assignment()); - stop(); - } - return recordList; } + if (stopWhenRead && endOffsets.isEmpty()) { + log.info("Reached end offset for {}, stopping consumer", consumer.assignment()); + stop(); + } + return recordList; } private void onPartitionsAssigned() { if (stopWhenRead) { endOffsets = consumer.endOffsets(consumer.assignment()).entrySet().stream() + .filter(entry -> entry.getValue() > 0) .collect(Collectors.toMap(entry -> entry.getKey().partition(), Map.Entry::getValue)); } } diff --git a/common/stats/src/main/java/org/thingsboard/server/common/stats/DefaultStatsFactory.java b/common/stats/src/main/java/org/thingsboard/server/common/stats/DefaultStatsFactory.java index ca97792186..2fc7970fc9 100644 --- a/common/stats/src/main/java/org/thingsboard/server/common/stats/DefaultStatsFactory.java +++ b/common/stats/src/main/java/org/thingsboard/server/common/stats/DefaultStatsFactory.java @@ -38,7 +38,7 @@ public class DefaultStatsFactory implements StatsFactory { private static final Counter STUB_COUNTER = new StubCounter(); - @Autowired + @Autowired(required = false) // FIXME Slavik !!! private MeterRegistry meterRegistry; @Value("${metrics.enabled:false}") diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesDao.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesDao.java index 2c37788173..6678be6009 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesDao.java @@ -22,14 +22,13 @@ import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; -import org.thingsboard.server.common.data.page.PageData; -import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.util.TbPair; import org.thingsboard.server.dao.model.sql.AttributeKvEntity; import java.util.Collection; import java.util.List; import java.util.Optional; +import java.util.UUID; /** * @author Andrew Shvayka @@ -42,14 +41,14 @@ public interface AttributesDao { List findAll(TenantId tenantId, EntityId entityId, AttributeScope attributeScope); - PageData findAll(PageLink pageLink); - ListenableFuture save(TenantId tenantId, EntityId entityId, AttributeScope attributeScope, AttributeKvEntry attribute); List> removeAll(TenantId tenantId, EntityId entityId, AttributeScope attributeScope, List keys); List>> removeAllWithVersions(TenantId tenantId, EntityId entityId, AttributeScope attributeScope, List keys); + List findNextBatch(UUID entityId, int attributeType, int attributeKey, int batchSize); + List findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId); List findAllKeysByEntityIds(TenantId tenantId, List entityIds); diff --git a/dao/src/main/java/org/thingsboard/server/dao/relation/RelationDao.java b/dao/src/main/java/org/thingsboard/server/dao/relation/RelationDao.java index d29b57c5c3..d668f51dbd 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/relation/RelationDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/relation/RelationDao.java @@ -18,8 +18,6 @@ package org.thingsboard.server.dao.relation; import com.google.common.util.concurrent.ListenableFuture; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.data.page.PageData; -import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.RelationTypeGroup; import org.thingsboard.server.common.data.rule.RuleChainType; @@ -43,8 +41,6 @@ public interface RelationDao { List findAllByToAndType(TenantId tenantId, EntityId to, String relationType, RelationTypeGroup typeGroup); - PageData findAll(PageLink pageLink); - ListenableFuture checkRelationAsync(TenantId tenantId, EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup); boolean checkRelation(TenantId tenantId, EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvRepository.java index 90f49c57fd..de5651be00 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvRepository.java @@ -59,4 +59,12 @@ public interface AttributeKvRepository extends JpaRepository findAllKeysByEntityIdsAndAttributeType(@Param("entityIds") List entityIds, @Param("attributeType") int attributeType); + + @Query(value = "SELECT attribute_key, attribute_type, entity_id, bool_v, dbl_v, json_v, last_update_ts, long_v, str_v, version FROM attribute_kv WHERE (entity_id, attribute_type, attribute_key) > " + + "(:entityId, :attributeType, :attributeKey) ORDER BY entity_id, attribute_type, attribute_key LIMIT :batchSize", nativeQuery = true) + List findNextBatch(@Param("entityId") UUID entityId, + @Param("attributeType") int attributeType, + @Param("attributeKey") int attributeKey, + @Param("batchSize") int batchSize); + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java index b344a0ca41..db166c44b7 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java @@ -23,7 +23,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; -import org.springframework.data.domain.Page; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import org.thingsboard.server.common.data.AttributeScope; @@ -31,8 +30,6 @@ import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; -import org.thingsboard.server.common.data.page.PageData; -import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.util.TbPair; import org.thingsboard.server.common.stats.StatsFactory; import org.thingsboard.server.dao.DaoUtil; @@ -51,8 +48,8 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; import java.util.List; -import java.util.Map; import java.util.Optional; +import java.util.UUID; import java.util.function.Function; import java.util.stream.Collectors; @@ -157,9 +154,8 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl } @Override - public PageData findAll(PageLink pageLink) { - Page attributes = attributeKvRepository.findAll(DaoUtil.toPageable(pageLink)); - return DaoUtil.pageToPageData(attributes); + public List findNextBatch(UUID entityId, int attributeType, int attributeKey, int batchSize) { + return attributeKvRepository.findNextBatch(entityId, attributeType, attributeKey, batchSize); } @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultEntityQueryRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultEntityQueryRepository.java index 31443f7f47..c54b5561ed 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultEntityQueryRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultEntityQueryRepository.java @@ -693,6 +693,10 @@ public class DefaultEntityQueryRepository implements EntityQueryRepository { SELECT_ADDRESS + ", " + SELECT_ADDRESS_2 + ", " + SELECT_ZIP + ", " + SELECT_PHONE + ", " + SELECT_ADDITIONAL_INFO + (entityFilter.isMultiRoot() ? (", " + SELECT_RELATED_PARENT_ID) : "") + ", entity.entity_type as entity_type"; + /* + * FIXME: + * target entities are duplicated in result list, if search direction is TO and multiple relations are references to target entity + * */ String from = getQueryTemplate(entityFilter.getDirection(), entityFilter.isMultiRoot()); if (entityFilter.isMultiRoot()) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/relation/JpaRelationDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/relation/JpaRelationDao.java index 1df7115518..118c3c1bcc 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/relation/JpaRelationDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/relation/JpaRelationDao.java @@ -24,8 +24,6 @@ import org.springframework.util.CollectionUtils; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.data.page.PageData; -import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.RelationTypeGroup; import org.thingsboard.server.common.data.rule.RuleChainType; @@ -129,10 +127,6 @@ public class JpaRelationDao extends JpaAbstractDaoListeningExecutorService imple typeGroup.name())); } - @Override - public PageData findAll(PageLink pageLink) { - return DaoUtil.toPageData(relationRepository.findAll(DaoUtil.toPageable(pageLink))); - } @Override public ListenableFuture checkRelationAsync(TenantId tenantId, EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/relation/RelationRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/relation/RelationRepository.java index d945f1f640..3c69ae095f 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/relation/RelationRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/relation/RelationRepository.java @@ -85,6 +85,15 @@ public interface RelationRepository @Query("DELETE FROM RelationEntity r where r.fromId = :fromId and r.fromType = :fromType and r.relationTypeGroup in :relationTypeGroups") void deleteByFromIdAndFromTypeAndRelationTypeGroupIn(@Param("fromId") UUID fromId, @Param("fromType") String fromType, @Param("relationTypeGroups") List relationTypeGroups); - @Query("SELECT e FROM RelationEntity e ORDER BY e.fromId, e.fromType, e.toId, e.toType, e.relationType, e.relationTypeGroup") - Page findAll(Pageable pageable); + @Query(value = "SELECT from_id, from_type, relation_type_group, relation_type, to_id, to_type, additional_info, version FROM relation" + + " WHERE (from_id, from_type, relation_type_group, relation_type, to_id, to_type) > " + + "(:fromId, :fromType, :relationTypeGroup, :relationType, :toId, :toType) ORDER BY " + + "from_id, from_type, relation_type_group, relation_type, to_id, to_type LIMIT :batchSize", nativeQuery = true) + List findNextBatch(@Param("fromId") UUID fromId, + @Param("fromType") String fromType, + @Param("relationTypeGroup") String relationTypeGroup, + @Param("relationType") String relationType, + @Param("toId") UUID toId, + @Param("toType") String toType, + @Param("batchSize") int batchSize); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/CachedRedisSqlTimeseriesLatestDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/CachedRedisSqlTimeseriesLatestDao.java index 0e9cebd150..e6175409de 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/CachedRedisSqlTimeseriesLatestDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/CachedRedisSqlTimeseriesLatestDao.java @@ -171,9 +171,5 @@ public class CachedRedisSqlTimeseriesLatestDao extends BaseAbstractSqlTimeseries return sqlDao.findAllKeysByEntityIds(tenantId, entityIds); } - @Override - public PageData findAllLatest(PageLink pageLink) { - return null; - } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java index fd1b180eef..dde89e107d 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java @@ -188,10 +188,6 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme return tsKvLatestRepository.findAllKeysByEntityIds(entityIds.stream().map(EntityId::getId).collect(Collectors.toList())); } - @Override - public PageData findAllLatest(PageLink pageLink) { - return DaoUtil.pageToPageData(tsKvLatestRepository.findAll(DaoUtil.toPageable(pageLink, "entityId", "key"))); - } private ListenableFuture getNewLatestEntryFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { ListenableFuture> future = findNewLatestEntryFuture(tenantId, entityId, query); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/latest/TsKvLatestRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/latest/TsKvLatestRepository.java index 833ffd185e..2c97ba30ba 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/latest/TsKvLatestRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/latest/TsKvLatestRepository.java @@ -20,6 +20,7 @@ import org.springframework.data.domain.Pageable; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.Query; import org.springframework.data.repository.query.Param; +import org.thingsboard.server.dao.model.sql.AttributeKvEntity; import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestCompositeKey; import org.thingsboard.server.dao.model.sqlts.latest.TsKvLatestEntity; @@ -46,4 +47,11 @@ public interface TsKvLatestRepository extends JpaRepository findAll(Pageable pageable); + + @Query(value = "SELECT entity_id, key, ts, bool_v, str_v, long_v, dbl_v, json_v, version FROM ts_kv_latest WHERE (entity_id, key) > " + + "(:entityId, :key) ORDER BY entity_id, key LIMIT :batchSize", nativeQuery = true) + List findNextBatch(@Param("entityId") UUID entityId, + @Param("key") int key, + @Param("batchSize") int batchSize); + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesLatestDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesLatestDao.java index 99bf57a1db..6f00437672 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesLatestDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesLatestDao.java @@ -103,10 +103,6 @@ public class CassandraBaseTimeseriesLatestDao extends AbstractCassandraBaseTimes return Collections.emptyList(); } - @Override - public PageData findAllLatest(PageLink pageLink) { - return null; - } @Override public ListenableFuture saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesLatestDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesLatestDao.java index 1372f2f5d8..b2d7c0eff5 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesLatestDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesLatestDao.java @@ -54,5 +54,4 @@ public interface TimeseriesLatestDao { List findAllKeysByEntityIds(TenantId tenantId, List entityIds); - PageData findAllLatest(PageLink pageLink); } diff --git a/edqs/src/main/java/org/thingsboard/server/edqs/ThingsboardEdqsApplication.java b/edqs/src/main/java/org/thingsboard/server/edqs/ThingsboardEdqsApplication.java index 1157f74de1..7fed88b0c3 100644 --- a/edqs/src/main/java/org/thingsboard/server/edqs/ThingsboardEdqsApplication.java +++ b/edqs/src/main/java/org/thingsboard/server/edqs/ThingsboardEdqsApplication.java @@ -30,7 +30,7 @@ import java.util.Arrays; @EnableAsync @EnableScheduling @ComponentScan({"org.thingsboard.server.edqs", "org.thingsboard.server.queue.edqs", "org.thingsboard.server.queue.discovery", "org.thingsboard.server.queue.kafka", - "org.thingsboard.server.queue.settings", "org.thingsboard.server.queue.environment"}) + "org.thingsboard.server.queue.settings", "org.thingsboard.server.queue.environment", "org.thingsboard.server.common.stats"}) @Slf4j public class ThingsboardEdqsApplication { @@ -42,8 +42,8 @@ public class ThingsboardEdqsApplication { } // @Bean - public ApplicationRunner runner(CSVLoader loader, EdqRepository edqRepository) { - return args -> { +// public ApplicationRunner runner(CSVLoader loader, EdqRepository edqRepository) { +// return args -> { // long startTs = System.currentTimeMillis(); // var loader = new TenantRepoLoader(new TenantRepo(TenantId.fromUUID(UUID.fromString("2a209df0-c7ff-11ea-a3e0-f321b0429d60")))); // loader.load(); @@ -103,8 +103,8 @@ public class ThingsboardEdqsApplication { // }); // Thread.sleep(5000); // } - }; - } +// }; +// } private static String[] updateArguments(String[] args) { if (Arrays.stream(args).noneMatch(arg -> arg.startsWith(SPRING_CONFIG_NAME_KEY))) { diff --git a/edqs/src/main/resources/edqs.yml b/edqs/src/main/resources/edqs.yml index f5c83178c3..143a42fb68 100644 --- a/edqs/src/main/resources/edqs.yml +++ b/edqs/src/main/resources/edqs.yml @@ -52,15 +52,14 @@ queue: # For debug level print-interval-ms: "${TB_QUEUE_IN_MEMORY_STATS_PRINT_INTERVAL_MS:60000}" edqs: - enabled: "${TB_EDQS_ENABLED:true}" mode: "${TB_EDQS_MODE:local}" partitions: "${TB_EDQS_PARTITIONS:12}" + partitioning_strategy: "${TB_EDQS_PARTITIONING_STRATEGY:tenant}" # tenant or none. For 'none', each instance handles all partitions and duplicates all the data requests_topic: "${TB_EDQS_REQUESTS_TOPIC:edqs.requests}" responses_topic: "${TB_EDQS_RESPONSES_TOPIC:edqs.responses}" poll_interval: "${TB_EDQS_POLL_INTERVAL_MS:125}" max_pending_requests: "${TB_EDQS_MAX_PENDING_REQUESTS:10000}" max_request_timeout: "${TB_EDQS_MAX_REQUEST_TIMEOUT:10000}" - partitioning_strategy: "${TB_EDQS_PARTITIONING_STRATEGY:tenant}" # tenant or none. For 'none', each instance handles all partitions and duplicates all the data kafka: # Kafka Bootstrap nodes in "host:port" format bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}" @@ -171,11 +170,11 @@ queue: # Kafka properties for Housekeeper reprocessing topic; retention.ms is set to 90 days; partitions is set to 1 since only one reprocessing service is running at a time housekeeper-reprocessing: "${TB_QUEUE_KAFKA_HOUSEKEEPER_REPROCESSING_TOPIC_PROPERTIES:retention.ms:7776000000;segment.bytes:52428800;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" # Kafka properties for EDQS events topics. Partitions number must be the same as queue.edqs.partitions - edqs-events: "${TB_QUEUE_KAFKA_EDQS_EVENTS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:1048576000;partitions:12;min.insync.replicas:1}" + edqs-events: "${TB_QUEUE_KAFKA_EDQS_EVENTS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:52428800;retention.bytes:-1;partitions:12;min.insync.replicas:1}" # Kafka properties for EDQS requests topic (default: 3 minutes retention). Partitions number must be the same as queue.edqs.partitions edqs-requests: "${TB_QUEUE_KAFKA_EDQS_REQUESTS_TOPIC_PROPERTIES:retention.ms:180000;segment.bytes:52428800;retention.bytes:1048576000;partitions:12;min.insync.replicas:1}" # Kafka properties for EDQS state topic (infinite retention, compaction). Partitions number must be the same as queue.edqs.partitions - edqs-state: "${TB_QUEUE_KAFKA_EDQS_LATEST_EVENTS_TOPIC_PROPERTIES:retention.ms:-1;segment.bytes:52428800;retention.bytes:1048576000;partitions:12;min.insync.replicas:1;cleanup.policy:compact}" + edqs-state: "${TB_QUEUE_KAFKA_EDQS_STATE_TOPIC_PROPERTIES:retention.ms:-1;segment.bytes:52428800;retention.bytes:-1;partitions:12;min.insync.replicas:1;cleanup.policy:compact}" consumer-stats: # Prints lag between consumer group offset and last messages offset in Kafka topics enabled: "${TB_QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}"