diff --git a/README.md b/README.md index c9e6e17fc2..6bce2db844 100644 --- a/README.md +++ b/README.md @@ -32,7 +32,6 @@ Collect and Visualize your IoT data in minutes by following this [guide](https:/ ## Support - - [Community chat](https://gitter.im/thingsboard/chat) - [Q&A forum](https://groups.google.com/forum/#!forum/thingsboard) - [Stackoverflow](http://stackoverflow.com/questions/tagged/thingsboard) diff --git a/application/src/main/data/upgrade/basic/schema_update.sql b/application/src/main/data/upgrade/basic/schema_update.sql index e4faca82a5..3aa0f08edf 100644 --- a/application/src/main/data/upgrade/basic/schema_update.sql +++ b/application/src/main/data/upgrade/basic/schema_update.sql @@ -38,8 +38,8 @@ CREATE TABLE IF NOT EXISTS mobile_app_bundle ( ios_app_id uuid UNIQUE, layout_config varchar(16384), oauth2_enabled boolean, - CONSTRAINT fk_android_app_id FOREIGN KEY (android_app_id) REFERENCES mobile_app(id), - CONSTRAINT fk_ios_app_id FOREIGN KEY (ios_app_id) REFERENCES mobile_app(id) + CONSTRAINT fk_android_app_id FOREIGN KEY (android_app_id) REFERENCES mobile_app(id) ON DELETE SET NULL, + CONSTRAINT fk_ios_app_id FOREIGN KEY (ios_app_id) REFERENCES mobile_app(id) ON DELETE SET NULL ); CREATE INDEX IF NOT EXISTS mobile_app_bundle_tenant_id ON mobile_app_bundle(tenant_id); diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index a62cb432da..d27bf40650 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -110,6 +110,7 @@ import org.thingsboard.server.dao.user.UserService; import org.thingsboard.server.dao.widget.WidgetTypeService; import org.thingsboard.server.dao.widget.WidgetsBundleService; import org.thingsboard.server.gen.transport.TransportProtos; +import org.thingsboard.server.queue.TbQueueCallback; import org.thingsboard.server.queue.common.SimpleTbQueueCallback; import org.thingsboard.server.service.executors.PubSubRuleNodeExecutorProvider; import org.thingsboard.server.service.script.RuleNodeJsScriptEngine; @@ -173,8 +174,13 @@ class DefaultTbContext implements TbContext { @Override public void input(TbMsg msg, RuleChainId ruleChainId) { - msg.pushToStack(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId()); - nodeCtx.getChainActor().tell(new RuleChainInputMsg(ruleChainId, msg)); + if (!msg.isValid()) { + return; + } + TbMsg tbMsg = msg.copyWithRuleChainId(ruleChainId); + tbMsg.pushToStack(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId()); + TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getQueueName(), getTenantId(), tbMsg.getOriginator()); + doEnqueue(tpi, tbMsg, new SimpleTbQueueCallback(md -> ack(msg), t -> tellFailure(msg, t))); } @Override @@ -210,14 +216,10 @@ class DefaultTbContext implements TbContext { } return; } - TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder() - .setTenantIdMSB(getTenantId().getId().getMostSignificantBits()) - .setTenantIdLSB(getTenantId().getId().getLeastSignificantBits()) - .setTbMsg(TbMsg.toByteString(tbMsg)).build(); if (nodeCtx.getSelf().isDebugMode()) { mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), tbMsg, "To Root Rule Chain"); } - mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg, new SimpleTbQueueCallback( + doEnqueue(tpi, tbMsg, new SimpleTbQueueCallback( metadata -> { if (onSuccess != null) { onSuccess.run(); @@ -232,6 +234,14 @@ class DefaultTbContext implements TbContext { })); } + private void doEnqueue(TopicPartitionInfo tpi, TbMsg tbMsg, TbQueueCallback callback) { + TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder() + .setTenantIdMSB(getTenantId().getId().getMostSignificantBits()) + .setTenantIdLSB(getTenantId().getId().getLeastSignificantBits()) + .setTbMsg(TbMsg.toByteString(tbMsg)).build(); + mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg, callback); + } + @Override public void enqueueForTellFailure(TbMsg tbMsg, String failureMessage) { TopicPartitionInfo tpi = resolvePartition(tbMsg); diff --git a/application/src/main/java/org/thingsboard/server/config/CustomOAuth2AuthorizationRequestResolver.java b/application/src/main/java/org/thingsboard/server/config/CustomOAuth2AuthorizationRequestResolver.java index 542a7c71cb..c0dfaa599a 100644 --- a/application/src/main/java/org/thingsboard/server/config/CustomOAuth2AuthorizationRequestResolver.java +++ b/application/src/main/java/org/thingsboard/server/config/CustomOAuth2AuthorizationRequestResolver.java @@ -38,6 +38,7 @@ import org.springframework.web.util.UriComponents; import org.springframework.web.util.UriComponentsBuilder; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.id.OAuth2ClientId; +import org.thingsboard.server.common.data.oauth2.PlatformType; import org.thingsboard.server.dao.oauth2.OAuth2Configuration; import org.thingsboard.server.dao.oauth2.OAuth2ClientService; import org.thingsboard.server.queue.util.TbCoreComponent; @@ -85,8 +86,9 @@ public class CustomOAuth2AuthorizationRequestResolver implements OAuth2Authoriza String registrationId = this.resolveRegistrationId(request); String redirectUriAction = getAction(request, "login"); String appPackage = getAppPackage(request); + String platform = getPlatform(request); String appToken = getAppToken(request); - return resolve(request, registrationId, redirectUriAction, appPackage, appToken); + return resolve(request, registrationId, redirectUriAction, appPackage, platform, appToken); } @Override @@ -96,8 +98,9 @@ public class CustomOAuth2AuthorizationRequestResolver implements OAuth2Authoriza } String redirectUriAction = getAction(request, "authorize"); String appPackage = getAppPackage(request); + String platform = getPlatform(request); String appToken = getAppToken(request); - return resolve(request, registrationId, redirectUriAction, appPackage, appToken); + return resolve(request, registrationId, redirectUriAction, appPackage, platform, appToken); } private String getAction(HttpServletRequest request, String defaultAction) { @@ -112,11 +115,15 @@ public class CustomOAuth2AuthorizationRequestResolver implements OAuth2Authoriza return request.getParameter("pkg"); } + private String getPlatform(HttpServletRequest request) { + return request.getParameter("platform"); + } + private String getAppToken(HttpServletRequest request) { return request.getParameter("appToken"); } - private OAuth2AuthorizationRequest resolve(HttpServletRequest request, String oauth2ClientId, String redirectUriAction, String appPackage, String appToken) { + private OAuth2AuthorizationRequest resolve(HttpServletRequest request, String oauth2ClientId, String redirectUriAction, String appPackage, String platform, String appToken) { if (oauth2ClientId == null) { return null; } @@ -132,11 +139,19 @@ public class CustomOAuth2AuthorizationRequestResolver implements OAuth2Authoriza if (StringUtils.isEmpty(appToken)) { throw new IllegalArgumentException("Invalid application token."); } else { - String appSecret = this.oAuth2ClientService.findAppSecret(new OAuth2ClientId(UUID.fromString(oauth2ClientId)), appPackage); - if (StringUtils.isEmpty(appSecret)) { - throw new IllegalArgumentException("Invalid package: " + appPackage + ". No application secret found for Client Registration with given application package."); + String callbackUrlScheme; + if (platform != null) { + callbackUrlScheme = validateMobileAppToken(oauth2ClientId, appPackage, PlatformType.valueOf(platform), appToken); + } else { + // for backward compatibility with mobile apps of version 1.3.0 and less try to validate token with android and then ios app secret + try { + callbackUrlScheme = validateMobileAppToken(oauth2ClientId, appPackage, PlatformType.ANDROID, appToken); + } catch (IllegalArgumentException e) { + log.debug("Failed attempt to validate android application token, oauth client id: [{}], package name: [{}], appToken [{}] ", + oauth2ClientId, appPackage, appToken, e); + callbackUrlScheme = validateMobileAppToken(oauth2ClientId, appPackage, PlatformType.IOS, appToken); + } } - String callbackUrlScheme = this.oAuth2AppTokenFactory.validateTokenAndGetCallbackUrlScheme(appPackage, appToken, appSecret); attributes.put(TbOAuth2ParameterNames.CALLBACK_URL_SCHEME, callbackUrlScheme); } } @@ -174,6 +189,14 @@ public class CustomOAuth2AuthorizationRequestResolver implements OAuth2Authoriza .build(); } + private String validateMobileAppToken(String oauth2ClientId, String appPackage, PlatformType platformType, String appToken) { + String appSecret = this.oAuth2ClientService.findAppSecret(new OAuth2ClientId(UUID.fromString(oauth2ClientId)), appPackage, platformType); + if (StringUtils.isEmpty(appSecret)) { + throw new IllegalArgumentException("Invalid package: " + appPackage + ". No application secret found for Client Registration with given application package."); + } + return this.oAuth2AppTokenFactory.validateTokenAndGetCallbackUrlScheme(appPackage, appToken, appSecret); + } + private String resolveRegistrationId(HttpServletRequest request) { if (this.authorizationRequestMatcher.matches(request)) { return this.authorizationRequestMatcher diff --git a/application/src/main/java/org/thingsboard/server/controller/MobileAppController.java b/application/src/main/java/org/thingsboard/server/controller/MobileAppController.java index da206193fb..362539f6b6 100644 --- a/application/src/main/java/org/thingsboard/server/controller/MobileAppController.java +++ b/application/src/main/java/org/thingsboard/server/controller/MobileAppController.java @@ -41,6 +41,7 @@ import org.thingsboard.server.common.data.mobile.LoginMobileInfo; import org.thingsboard.server.common.data.mobile.UserMobileInfo; import org.thingsboard.server.common.data.mobile.app.MobileApp; import org.thingsboard.server.common.data.mobile.app.MobileAppVersionInfo; +import org.thingsboard.server.common.data.mobile.app.StoreInfo; import org.thingsboard.server.common.data.mobile.bundle.MobileAppBundle; import org.thingsboard.server.common.data.mobile.layout.MobilePage; import org.thingsboard.server.common.data.oauth2.OAuth2ClientLoginInfo; @@ -55,6 +56,7 @@ import org.thingsboard.server.service.security.permission.Operation; import org.thingsboard.server.service.security.permission.Resource; import java.util.List; +import java.util.Optional; import java.util.UUID; import java.util.stream.Collectors; @@ -83,7 +85,9 @@ public class MobileAppController extends BaseController { @RequestParam PlatformType platform) { List oauth2Clients = oAuth2ClientService.findOAuth2ClientLoginInfosByMobilePkgNameAndPlatformType(pkgName, platform); MobileApp mobileApp = mobileAppService.findMobileAppByPkgNameAndPlatformType(pkgName, platform); - return new LoginMobileInfo(oauth2Clients, mobileApp != null ? mobileApp.getVersionInfo() : null); + StoreInfo storeInfo = Optional.ofNullable(mobileApp).map(MobileApp::getStoreInfo).orElse(null); + MobileAppVersionInfo versionInfo = Optional.ofNullable(mobileApp).map(MobileApp::getVersionInfo).orElse(null); + return new LoginMobileInfo(oauth2Clients, storeInfo, versionInfo); } @ApiOperation(value = "Get user mobile app basic info (getUserMobileInfo)", notes = AVAILABLE_FOR_ANY_AUTHORIZED_USER) @@ -97,17 +101,10 @@ public class MobileAppController extends BaseController { User user = userService.findUserById(securityUser.getTenantId(), securityUser.getId()); HomeDashboardInfo homeDashboardInfo = securityUser.isSystemAdmin() ? null : getHomeDashboardInfo(securityUser, user.getAdditionalInfo()); MobileAppBundle mobileAppBundle = mobileAppBundleService.findMobileAppBundleByPkgNameAndPlatform(securityUser.getTenantId(), pkgName, platform); - return new UserMobileInfo(user, homeDashboardInfo, getVisiblePages(mobileAppBundle)); - } - - @ApiOperation(value = "Get mobile app version info (getMobileVersionInfo)") - @GetMapping(value = "/mobile/versionInfo") - public MobileAppVersionInfo getMobileVersionInfo(@Parameter(description = "Mobile application package name") - @RequestParam String pkgName, - @Parameter(description = "Platform type", schema = @Schema(allowableValues = {"ANDROID", "IOS"})) - @RequestParam PlatformType platform) { MobileApp mobileApp = mobileAppService.findMobileAppByPkgNameAndPlatformType(pkgName, platform); - return mobileApp != null ? mobileApp.getVersionInfo() : null; + StoreInfo storeInfo = Optional.ofNullable(mobileApp).map(MobileApp::getStoreInfo).orElse(null); + MobileAppVersionInfo versionInfo = Optional.ofNullable(mobileApp).map(MobileApp::getVersionInfo).orElse(null); + return new UserMobileInfo(user, storeInfo, versionInfo, homeDashboardInfo, getVisiblePages(mobileAppBundle)); } @ApiOperation(value = "Save Or update Mobile app (saveMobileApp)", diff --git a/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java b/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java index 5b741a18b7..91c4e04bdf 100644 --- a/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java +++ b/application/src/main/java/org/thingsboard/server/install/ThingsboardInstallService.java @@ -108,6 +108,7 @@ public class ThingsboardInstallService { String fromVersion = databaseSchemaVersionService.getDbSchemaVersion(); String toVersion = databaseSchemaVersionService.getPackageSchemaVersion(); cacheCleanupService.clearCache(fromVersion, toVersion); + log.info("Upgrading ThingsBoard from version {} to {} ...", fromVersion, toVersion); databaseEntitiesUpgradeService.upgradeDatabase(fromVersion, toVersion); // dataUpdateService.updateData(fromVersion, toVersion); installScripts.updateResourcesUsage(); diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/user/DefaultUserService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/user/DefaultUserService.java index 4e9e8e954c..693f70cc9d 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/user/DefaultUserService.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/user/DefaultUserService.java @@ -35,8 +35,6 @@ import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.entitiy.AbstractTbEntityService; import org.thingsboard.server.service.security.system.SystemSecurityService; -import java.util.concurrent.TimeUnit; - @Service @TbCoreComponent @AllArgsConstructor @@ -90,16 +88,10 @@ public class DefaultUserService extends AbstractTbEntityService implements TbUse public UserActivationLink getActivationLink(TenantId tenantId, CustomerId customerId, UserId userId, HttpServletRequest request) throws ThingsboardException { UserCredentials userCredentials = userService.findUserCredentialsByUserId(tenantId, userId); if (!userCredentials.isEnabled() && userCredentials.getActivateToken() != null) { - long ttl = userCredentials.getActivationTokenTtl(); - if (ttl < TimeUnit.MINUTES.toMillis(15)) { // renew link if less than 15 minutes before expiration - userCredentials = userService.generateUserActivationToken(userCredentials); - userCredentials = userService.saveUserCredentials(tenantId, userCredentials); - ttl = userCredentials.getActivationTokenTtl(); - log.debug("[{}][{}] Regenerated expired user activation token", tenantId, userId); - } + userCredentials = userService.checkUserActivationToken(tenantId, userCredentials); String baseUrl = systemSecurityService.getBaseUrl(tenantId, customerId, request); String link = baseUrl + "/api/noauth/activate?activateToken=" + userCredentials.getActivateToken(); - return new UserActivationLink(link, ttl); + return new UserActivationLink(link, userCredentials.getActivationTokenTtl()); } else { throw new ThingsboardException("User is already activated!", ThingsboardErrorCode.BAD_REQUEST_PARAMS); } diff --git a/application/src/main/java/org/thingsboard/server/service/install/DefaultDatabaseSchemaSettingsService.java b/application/src/main/java/org/thingsboard/server/service/install/DefaultDatabaseSchemaSettingsService.java index 1c0424bdfd..2b1ba2dc12 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/DefaultDatabaseSchemaSettingsService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/DefaultDatabaseSchemaSettingsService.java @@ -53,6 +53,11 @@ public class DefaultDatabaseSchemaSettingsService implements DatabaseSchemaSetti return; } + String product = getProductFromDb(); + if (!CURRENT_PRODUCT.equals(product)) { + onSchemaSettingsError(String.format("Upgrade failed: can't upgrade ThingsBoard %s database using ThingsBoard %s.", product, CURRENT_PRODUCT)); + } + if (dbSchemaVersion.equals(getPackageSchemaVersion())) { onSchemaSettingsError("Upgrade failed: database already upgraded to current version. You can set SKIP_SCHEMA_VERSION_CHECK to 'true' if force re-upgrade needed."); } @@ -62,11 +67,6 @@ public class DefaultDatabaseSchemaSettingsService implements DatabaseSchemaSetti dbSchemaVersion, SUPPORTED_VERSIONS_FOR_UPGRADE )); } - - String product = getProductFromDb(); - if (!CURRENT_PRODUCT.equals(product)) { - onSchemaSettingsError(String.format("Upgrade failed: can't upgrade ThingsBoard %s database using ThingsBoard %s.", product, CURRENT_PRODUCT)); - } } @Deprecated(forRemoval = true, since = "3.9.0") diff --git a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultCacheCleanupService.java b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultCacheCleanupService.java index 329881dbfa..62b3061df7 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultCacheCleanupService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultCacheCleanupService.java @@ -44,7 +44,7 @@ public class DefaultCacheCleanupService implements CacheCleanupService { @Override public void clearCache(String from, String to) throws Exception { log.info("Clearing cache to upgrade from version {} to {}", from, to); - clearAllCaches(); + clearAll(); } void clearAllCaches() { diff --git a/application/src/main/java/org/thingsboard/server/service/install/update/ResourcesUpdater.java b/application/src/main/java/org/thingsboard/server/service/install/update/ResourcesUpdater.java index eb3760cac6..00797da449 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/update/ResourcesUpdater.java +++ b/application/src/main/java/org/thingsboard/server/service/install/update/ResourcesUpdater.java @@ -16,10 +16,12 @@ package org.thingsboard.server.service.install.update; import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Component; +import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.server.common.data.Dashboard; import org.thingsboard.server.common.data.HasImage; import org.thingsboard.server.common.data.id.DashboardId; @@ -42,6 +44,8 @@ import org.thingsboard.server.dao.widget.WidgetTypeDao; import org.thingsboard.server.dao.widget.WidgetTypeService; import org.thingsboard.server.dao.widget.WidgetsBundleDao; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; import java.util.function.Function; @@ -86,44 +90,60 @@ public class ResourcesUpdater { log.debug("Created/updated system images and resources for default dashboard '{}'", defaultDashboard.getTitle()); } + @SneakyThrows public void updateDashboardsResources() { log.info("Updating resources usage in dashboards"); - var dashboards = new PageDataIterable<>(dashboardService::findAllDashboardsIds, 512); - int totalCount = 0; - int updatedCount = 0; - for (DashboardId dashboardId : dashboards) { - Dashboard dashboard = dashboardService.findDashboardById(TenantId.SYS_TENANT_ID, dashboardId); - boolean updated = resourceService.updateResourcesUsage(dashboard); // will convert resources ids to new structure - if (updated) { - dashboardService.saveDashboard(dashboard); - updatedCount++; - } - totalCount++; + var executor = ThingsBoardExecutors.newLimitedTasksExecutor(4, 4, "dashboards-resources-upgrade"); - if (totalCount % 1000 == 0) { - log.info("Processed {} dashboards, updated {}", totalCount, updatedCount); - } + var dashboards = new PageDataIterable<>(dashboardService::findAllDashboardsIds, 512); + AtomicInteger totalCount = new AtomicInteger(); + AtomicInteger updatedCount = new AtomicInteger(); + for (DashboardId dashboardId : dashboards) { + executor.submit(() -> { + Dashboard dashboard = dashboardService.findDashboardById(TenantId.SYS_TENANT_ID, dashboardId); + boolean updated = resourceService.updateResourcesUsage(dashboard); // will convert resources ids to new structure + if (updated) { + dashboardService.saveDashboard(dashboard); + updatedCount.incrementAndGet(); + } + if (totalCount.incrementAndGet() % 1000 == 0) { + log.info("Processed {} dashboards, updated {}", totalCount, updatedCount); + } + }); + } + + executor.shutdown(); + if (!executor.awaitTermination(10, TimeUnit.MINUTES)) { + throw new RuntimeException("Dashboards resources update timeout"); // just in case, should happen } log.info("Updated {} dashboards", updatedCount); } + @SneakyThrows public void updateWidgetsResources() { log.info("Updating resources usage in widgets"); - int totalCount = 0; - int updatedCount = 0; + var executor = ThingsBoardExecutors.newLimitedTasksExecutor(4, 4, "widgets-resources-upgrade"); + + AtomicInteger totalCount = new AtomicInteger(); + AtomicInteger updatedCount = new AtomicInteger(); var widgets = new PageDataIterable<>(widgetTypeService::findAllWidgetTypesIds, 512); for (WidgetTypeId widgetTypeId : widgets) { - WidgetTypeDetails widgetTypeDetails = widgetTypeService.findWidgetTypeDetailsById(TenantId.SYS_TENANT_ID, widgetTypeId); - boolean updated = resourceService.updateResourcesUsage(widgetTypeDetails); - if (updated) { - widgetTypeService.saveWidgetType(widgetTypeDetails); - updatedCount++; - } - totalCount++; + executor.submit(() -> { + WidgetTypeDetails widgetTypeDetails = widgetTypeService.findWidgetTypeDetailsById(TenantId.SYS_TENANT_ID, widgetTypeId); + boolean updated = resourceService.updateResourcesUsage(widgetTypeDetails); + if (updated) { + widgetTypeService.saveWidgetType(widgetTypeDetails); + updatedCount.incrementAndGet(); + } + if (totalCount.incrementAndGet() % 200 == 0) { + log.info("Processed {} widgets, updated {}", totalCount, updatedCount); + } + }); + } - if (totalCount % 200 == 0) { - log.info("Processed {} widgets, updated {}", totalCount, updatedCount); - } + executor.shutdown(); + if (!executor.awaitTermination(10, TimeUnit.MINUTES)) { + throw new RuntimeException("Widgets resources update timeout"); } log.info("Updated {} widgets", updatedCount); } diff --git a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/csv/AbstractBulkImportService.java b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/csv/AbstractBulkImportService.java index ff764b7db3..50d8b9c371 100644 --- a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/csv/AbstractBulkImportService.java +++ b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/csv/AbstractBulkImportService.java @@ -31,7 +31,7 @@ import org.springframework.security.core.context.SecurityContext; import org.springframework.security.core.context.SecurityContextHolder; import org.thingsboard.common.util.DonAsynchron; import org.thingsboard.common.util.JacksonUtil; -import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.server.common.adaptor.JsonConverter; import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.EntityType; @@ -71,8 +71,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -90,16 +89,11 @@ public abstract class AbstractBulkImportService(150_000), - ThingsBoardThreadFactory.forName("bulk-import"), new ThreadPoolExecutor.CallerRunsPolicy()); - executor.allowCoreThreadTimeOut(true); - } + executor = ThingsBoardExecutors.newLimitedTasksExecutor(Runtime.getRuntime().availableProcessors(), 150_000, "bulk-import"); } public final BulkImportResult processBulkImport(BulkImportRequest request, SecurityUser user) throws Exception { @@ -287,8 +281,8 @@ public abstract class AbstractBulkImportService fields = new LinkedHashMap<>(); private final Map kvs = new LinkedHashMap<>(); private int lineNumber; + } @Data diff --git a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java index 9e36fed677..143c6c72a1 100644 --- a/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/rules/flow/AbstractRuleEngineFlowIntegrationTest.java @@ -17,6 +17,7 @@ package org.thingsboard.server.rules.flow; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; +import org.awaitility.Awaitility; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -59,6 +60,7 @@ import org.thingsboard.server.dao.event.EventService; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.mockito.Mockito.spy; @@ -331,6 +333,15 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule RuleChain finalRuleChain = rootRuleChain; RuleNode lastRuleNode = secondaryMetaData.getNodes().stream().filter(node -> !node.getId().equals(finalRuleChain.getFirstRuleNodeId())).findFirst().get(); + Awaitility.await().atMost(TIMEOUT, TimeUnit.SECONDS) + .until(() -> + getDebugEvents(savedTenant.getId(), lastRuleNode.getId(), 1000) + .getData() + .stream() + .filter(filterByPostTelemetryEventType()) + .count() == 2 + ); + eventsPage = getDebugEvents(savedTenant.getId(), lastRuleNode.getId(), 1000); events = eventsPage.getData().stream().filter(filterByPostTelemetryEventType()).collect(Collectors.toList()); diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/oauth2/OAuth2ClientService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/oauth2/OAuth2ClientService.java index 317221fe0a..f293462cc6 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/oauth2/OAuth2ClientService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/oauth2/OAuth2ClientService.java @@ -40,7 +40,7 @@ public interface OAuth2ClientService extends EntityDaoService { OAuth2Client findOAuth2ClientById(TenantId tenantId, OAuth2ClientId providerId); - String findAppSecret(OAuth2ClientId oAuth2ClientId, String pkgName); + String findAppSecret(OAuth2ClientId oAuth2ClientId, String pkgName, PlatformType platformType); void deleteOAuth2ClientById(TenantId tenantId, OAuth2ClientId oAuth2ClientId); diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/user/UserService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/user/UserService.java index 586ae50f5d..6412921675 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/user/UserService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/user/UserService.java @@ -63,6 +63,8 @@ public interface UserService extends EntityDaoService { UserCredentials generateUserActivationToken(UserCredentials userCredentials); + UserCredentials checkUserActivationToken(TenantId tenantId, UserCredentials userCredentials); + UserCredentials replaceUserCredentials(TenantId tenantId, UserCredentials userCredentials); void deleteUser(TenantId tenantId, User user); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/mobile/LoginMobileInfo.java b/common/data/src/main/java/org/thingsboard/server/common/data/mobile/LoginMobileInfo.java index 059dc17295..a28302bd58 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/mobile/LoginMobileInfo.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/mobile/LoginMobileInfo.java @@ -16,9 +16,12 @@ package org.thingsboard.server.common.data.mobile; import org.thingsboard.server.common.data.mobile.app.MobileAppVersionInfo; +import org.thingsboard.server.common.data.mobile.app.StoreInfo; import org.thingsboard.server.common.data.oauth2.OAuth2ClientLoginInfo; import java.util.List; -public record LoginMobileInfo(List oAuth2ClientLoginInfos, MobileAppVersionInfo versionInfo) { +public record LoginMobileInfo(List oAuth2ClientLoginInfos, + StoreInfo storeInfo, + MobileAppVersionInfo versionInfo) { } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/mobile/UserMobileInfo.java b/common/data/src/main/java/org/thingsboard/server/common/data/mobile/UserMobileInfo.java index f7a4033b16..1d20f17bc1 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/mobile/UserMobileInfo.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/mobile/UserMobileInfo.java @@ -18,7 +18,13 @@ package org.thingsboard.server.common.data.mobile; import com.fasterxml.jackson.databind.JsonNode; import org.thingsboard.server.common.data.HomeDashboardInfo; import org.thingsboard.server.common.data.User; +import org.thingsboard.server.common.data.mobile.app.MobileAppVersionInfo; +import org.thingsboard.server.common.data.mobile.app.StoreInfo; -public record UserMobileInfo(User user, HomeDashboardInfo homeDashboardInfo, JsonNode pages) { +public record UserMobileInfo(User user, + StoreInfo storeInfo, + MobileAppVersionInfo versionInfo, + HomeDashboardInfo homeDashboardInfo, + JsonNode pages) { } diff --git a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java index d7f64e58ee..54df37c3f6 100644 --- a/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java +++ b/common/transport/mqtt/src/main/java/org/thingsboard/server/transport/mqtt/MqttTransportHandler.java @@ -313,6 +313,14 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement sendResponseForAdaptorErrorOrCloseContext(ctx, topicName, msgId); } break; + case SUBSCRIBE: + MqttSubscribeMessage subscribeMessage = (MqttSubscribeMessage) msg; + processSubscribe(ctx, subscribeMessage); + break; + case UNSUBSCRIBE: + MqttUnsubscribeMessage unsubscribeMessage = (MqttUnsubscribeMessage) msg; + processUnsubscribe(ctx, unsubscribeMessage); + break; case PINGREQ: ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0))); break; @@ -750,7 +758,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } private void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage mqttMsg) { - if (!checkConnected(ctx, mqttMsg)) { + if (!checkConnected(ctx, mqttMsg) && !deviceSessionCtx.isProvisionOnly()) { ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), Collections.singletonList(MqttReasonCodes.SubAck.NOT_AUTHORIZED.byteValue() & 0xFF))); return; } @@ -760,6 +768,16 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement for (MqttTopicSubscription subscription : mqttMsg.payload().topicSubscriptions()) { String topic = subscription.topicName(); MqttQoS reqQoS = subscription.qualityOfService(); + if (deviceSessionCtx.isProvisionOnly()) { + if (MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC.equals(topic)) { + registerSubQoS(topic, grantedQoSList, reqQoS); + } else { + log.debug("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS); + grantedQoSList.add(ReturnCodeResolver.getSubscriptionReturnCode(deviceSessionCtx.getMqttVersion(), MqttReasonCodes.SubAck.TOPIC_FILTER_INVALID)); + } + activityReported = true; + continue; + } if (deviceSessionCtx.isDeviceSubscriptionAttributesTopic(topic)) { processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V1); activityReported = true; @@ -822,7 +840,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement case MqttTopics.GATEWAY_ATTRIBUTES_TOPIC: case MqttTopics.GATEWAY_RPC_TOPIC: case MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC: - case MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC: case MqttTopics.DEVICE_FIRMWARE_RESPONSES_TOPIC: case MqttTopics.DEVICE_FIRMWARE_ERROR_TOPIC: case MqttTopics.DEVICE_SOFTWARE_RESPONSES_TOPIC: @@ -873,7 +890,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement } private void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage mqttMsg) { - if (!checkConnected(ctx, mqttMsg)) { + if (!checkConnected(ctx, mqttMsg) && !deviceSessionCtx.isProvisionOnly()) { ctx.writeAndFlush(createUnSubAckMessage(mqttMsg.variableHeader().messageId(), Collections.singletonList((short) MqttReasonCodes.UnsubAck.NOT_AUTHORIZED.byteValue()))); return; @@ -887,6 +904,14 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement mqttQoSMap.remove(matcher); try { short resultValue = MqttReasonCodes.UnsubAck.SUCCESS.byteValue(); + if (deviceSessionCtx.isProvisionOnly()) { + if (!matcher.matches(MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC)) { + resultValue = MqttReasonCodes.UnsubAck.TOPIC_FILTER_INVALID.byteValue(); + } + unSubResults.add(resultValue); + activityReported = true; + continue; + } switch (topicName) { case MqttTopics.DEVICE_ATTRIBUTES_TOPIC: case MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC: @@ -917,7 +942,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement case MqttTopics.GATEWAY_ATTRIBUTES_TOPIC: case MqttTopics.GATEWAY_RPC_TOPIC: case MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC: - case MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC: case MqttTopics.DEVICE_FIRMWARE_RESPONSES_TOPIC: case MqttTopics.DEVICE_FIRMWARE_ERROR_TOPIC: case MqttTopics.DEVICE_SOFTWARE_RESPONSES_TOPIC: diff --git a/common/util/src/main/java/org/thingsboard/common/util/ThingsBoardExecutors.java b/common/util/src/main/java/org/thingsboard/common/util/ThingsBoardExecutors.java index fde2362091..913f32c7f9 100644 --- a/common/util/src/main/java/org/thingsboard/common/util/ThingsBoardExecutors.java +++ b/common/util/src/main/java/org/thingsboard/common/util/ThingsBoardExecutors.java @@ -17,6 +17,9 @@ package org.thingsboard.common.util; import java.util.concurrent.ExecutorService; import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; public class ThingsBoardExecutors { @@ -48,4 +51,16 @@ public class ThingsBoardExecutors { return newWorkStealingPool(parallelism, clazz.getSimpleName()); } + /* + * executor with limited tasks queue size + * */ + public static ExecutorService newLimitedTasksExecutor(int threads, int maxQueueSize, String name) { + ThreadPoolExecutor executor = new ThreadPoolExecutor(threads, threads, + 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(maxQueueSize), + ThingsBoardThreadFactory.forName(name), + new ThreadPoolExecutor.CallerRunsPolicy()); + executor.allowCoreThreadTimeOut(true); + return executor; + } + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/mobile/MobileAppServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/mobile/MobileAppServiceImpl.java index efd152f944..d3434e1016 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/mobile/MobileAppServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/mobile/MobileAppServiceImpl.java @@ -43,7 +43,6 @@ import java.util.Optional; public class MobileAppServiceImpl extends AbstractEntityService implements MobileAppService { private static final String PLATFORM_TYPE_IS_REQUIRED = "Platform type is required if package name is specified"; - private static final String MOBILE_APP_BUNDLE_CONSTRAINT = "The mobile app referenced by the mobile bundle cannot be deleted!"; @Autowired private MobileAppDao mobileAppDao; @@ -68,15 +67,8 @@ public class MobileAppServiceImpl extends AbstractEntityService implements Mobil @Override public void deleteMobileAppById(TenantId tenantId, MobileAppId mobileAppId) { log.trace("Executing deleteMobileAppById [{}]", mobileAppId.getId()); - try { - mobileAppDao.removeById(tenantId, mobileAppId.getId()); - eventPublisher.publishEvent(DeleteEntityEvent.builder().tenantId(tenantId).entityId(mobileAppId).build()); - } catch (Exception e) { - checkConstraintViolation(e, - Map.of("fk_android_app_id", MOBILE_APP_BUNDLE_CONSTRAINT, - "fk_ios_app_id", MOBILE_APP_BUNDLE_CONSTRAINT)); - throw e; - } + mobileAppDao.removeById(tenantId, mobileAppId.getId()); + eventPublisher.publishEvent(DeleteEntityEvent.builder().tenantId(tenantId).entityId(mobileAppId).build()); } @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/oauth2/OAuth2ClientDao.java b/dao/src/main/java/org/thingsboard/server/dao/oauth2/OAuth2ClientDao.java index 01d56feb20..16dda80840 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/oauth2/OAuth2ClientDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/oauth2/OAuth2ClientDao.java @@ -38,7 +38,7 @@ public interface OAuth2ClientDao extends Dao { List findByMobileAppBundleId(UUID mobileAppBundleId); - String findAppSecret(UUID id, String pkgName); + String findAppSecret(UUID id, String pkgName, PlatformType platformType); void deleteByTenantId(UUID tenantId); diff --git a/dao/src/main/java/org/thingsboard/server/dao/oauth2/OAuth2ClientServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/oauth2/OAuth2ClientServiceImpl.java index 880329335c..683c323ab5 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/oauth2/OAuth2ClientServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/oauth2/OAuth2ClientServiceImpl.java @@ -34,6 +34,7 @@ import org.thingsboard.server.dao.entity.AbstractEntityService; import org.thingsboard.server.dao.eventsourcing.DeleteEntityEvent; import org.thingsboard.server.dao.eventsourcing.SaveEntityEvent; import org.thingsboard.server.dao.service.DataValidator; +import org.thingsboard.server.dao.service.Validator; import java.util.Comparator; import java.util.List; @@ -45,6 +46,8 @@ import java.util.stream.Collectors; @Service("OAuth2ClientService") public class OAuth2ClientServiceImpl extends AbstractEntityService implements OAuth2ClientService { + private static final String PLATFORM_TYPE_IS_REQUIRED = "Platform type is required if package name is specified"; + @Autowired private OAuth2ClientDao oauth2ClientDao; @Autowired @@ -90,9 +93,10 @@ public class OAuth2ClientServiceImpl extends AbstractEntityService implements OA } @Override - public String findAppSecret(OAuth2ClientId oAuth2ClientId, String pkgName) { - log.trace("Executing findAppSecret oAuth2ClientId = [{}] pkgName = [{}]", oAuth2ClientId, pkgName); - return oauth2ClientDao.findAppSecret(oAuth2ClientId.getId(), pkgName); + public String findAppSecret(OAuth2ClientId oAuth2ClientId, String pkgName, PlatformType platformType) { + log.trace("Executing findAppSecret oAuth2ClientId = [{}] pkgName = [{}], platform [{}]", oAuth2ClientId, pkgName, platformType); + Validator.checkNotNull(platformType, PLATFORM_TYPE_IS_REQUIRED); + return oauth2ClientDao.findAppSecret(oAuth2ClientId.getId(), pkgName, platformType); } @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/oauth2/JpaOAuth2ClientDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/oauth2/JpaOAuth2ClientDao.java index d694ca0f93..4310e0b705 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/oauth2/JpaOAuth2ClientDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/oauth2/JpaOAuth2ClientDao.java @@ -80,8 +80,8 @@ public class JpaOAuth2ClientDao extends JpaAbstractDaocs_tb_ prefix, to avoid the data insertion to the common TB tables.
" + "Note: rule node can be used only for Cassandra DB.", @@ -87,11 +91,13 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { config = TbNodeUtils.convert(configuration, TbSaveToCustomCassandraTableNodeConfiguration.class); cassandraCluster = ctx.getCassandraCluster(); if (cassandraCluster == null) { - throw new RuntimeException("Unable to connect to Cassandra database"); - } else { - startExecutor(); - saveStmt = getSaveStmt(); + throw new TbNodeException("Unable to connect to Cassandra database", true); } + if (!isTableExists()) { + throw new TbNodeException("Table '" + TABLE_PREFIX + config.getTableName() + "' does not exist in Cassandra cluster."); + } + startExecutor(); + saveStmt = getSaveStmt(); } @Override @@ -115,6 +121,12 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { } } + private boolean isTableExists() { + var keyspaceMdOpt = getSession().getMetadata().getKeyspace(cassandraCluster.getKeyspaceName()); + return keyspaceMdOpt.map(keyspaceMetadata -> + keyspaceMetadata.getTable(TABLE_PREFIX + config.getTableName()).isPresent()).orElse(false); + } + private PreparedStatement prepare(String query) { return getSession().prepare(query); } @@ -127,10 +139,10 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { return session; } - private PreparedStatement getSaveStmt() { + private PreparedStatement getSaveStmt() throws TbNodeException { fieldsMap = config.getFieldsMapping(); if (fieldsMap.isEmpty()) { - throw new RuntimeException("Fields(key,value) map is empty!"); + throw new TbNodeException("Fields(key,value) map is empty!", true); } else { return prepareStatement(new ArrayList<>(fieldsMap.values())); } @@ -163,16 +175,19 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { query.append("?, "); } } + if (config.getDefaultTtl() > 0) { + query.append(" USING TTL ?"); + } return query.toString(); } private ListenableFuture save(TbMsg msg, TbContext ctx) { JsonElement data = JsonParser.parseString(msg.getData()); if (!data.isJsonObject()) { - throw new IllegalStateException("Invalid message structure, it is not a JSON Object:" + data); + throw new IllegalStateException("Invalid message structure, it is not a JSON Object: " + data); } else { JsonObject dataAsObject = data.getAsJsonObject(); - BoundStatementBuilder stmtBuilder = new BoundStatementBuilder(saveStmt.bind()); + BoundStatementBuilder stmtBuilder = getStmtBuilder(); AtomicInteger i = new AtomicInteger(0); fieldsMap.forEach((key, value) -> { if (key.equals(ENTITY_ID)) { @@ -197,17 +212,24 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { } else if (dataKeyElement.isJsonObject()) { stmtBuilder.setString(i.get(), dataKeyElement.getAsJsonObject().toString()); } else { - throw new IllegalStateException("Message data key: '" + key + "' with value: '" + value + "' is not a JSON Object or JSON Primitive!"); + throw new IllegalStateException("Message data key: '" + key + "' with value: '" + dataKeyElement + "' is not a JSON Object or JSON Primitive!"); } } else { throw new RuntimeException("Message data doesn't contain key: " + "'" + key + "'!"); } i.getAndIncrement(); }); + if (config.getDefaultTtl() > 0) { + stmtBuilder.setInt(i.get(), config.getDefaultTtl()); + } return getFuture(executeAsyncWrite(ctx, stmtBuilder.build()), rs -> null); } } + BoundStatementBuilder getStmtBuilder() { + return new BoundStatementBuilder(saveStmt.bind()); + } + private TbResultSetFuture executeAsyncWrite(TbContext ctx, Statement statement) { return executeAsync(ctx, statement, defaultWriteLevel); } @@ -240,4 +262,20 @@ public class TbSaveToCustomCassandraTableNode implements TbNode { }, readResultsProcessingExecutor); } + @Override + public TbPair upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException { + boolean hasChanges = false; + switch (fromVersion) { + case 0: + if (!oldConfiguration.has("defaultTtl")) { + hasChanges = true; + ((ObjectNode) oldConfiguration).put("defaultTtl", 0); + } + break; + default: + break; + } + return new TbPair<>(hasChanges, oldConfiguration); + } + } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeConfiguration.java index 0a5f153192..fefe871f82 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeConfiguration.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeConfiguration.java @@ -27,11 +27,13 @@ public class TbSaveToCustomCassandraTableNodeConfiguration implements NodeConfig private String tableName; private Map fieldsMapping; + private int defaultTtl; @Override public TbSaveToCustomCassandraTableNodeConfiguration defaultConfiguration() { TbSaveToCustomCassandraTableNodeConfiguration configuration = new TbSaveToCustomCassandraTableNodeConfiguration(); + configuration.setDefaultTtl(0); configuration.setTableName(""); Map map = new HashMap<>(); map.put("", ""); diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java new file mode 100644 index 0000000000..7df2a982de --- /dev/null +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbSaveToCustomCassandraTableNodeTest.java @@ -0,0 +1,396 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.action; + +import com.datastax.oss.driver.api.core.DefaultConsistencyLevel; +import com.datastax.oss.driver.api.core.ProtocolVersion; +import com.datastax.oss.driver.api.core.cql.BoundStatement; +import com.datastax.oss.driver.api.core.cql.BoundStatementBuilder; +import com.datastax.oss.driver.api.core.cql.ColumnDefinitions; +import com.datastax.oss.driver.api.core.cql.PreparedStatement; +import com.datastax.oss.driver.api.core.metadata.Metadata; +import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata; +import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata; +import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry; +import com.google.common.util.concurrent.SettableFuture; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.test.util.ReflectionTestUtils; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.common.util.ListeningExecutor; +import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest; +import org.thingsboard.rule.engine.TestDbCallbackExecutor; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNode; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.msg.TbMsgType; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.dao.cassandra.CassandraCluster; +import org.thingsboard.server.dao.cassandra.guava.GuavaSession; +import org.thingsboard.server.dao.nosql.CassandraStatementTask; +import org.thingsboard.server.dao.nosql.TbResultSet; +import org.thingsboard.server.dao.nosql.TbResultSetFuture; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.stream.Stream; + +import static java.util.Collections.emptyMap; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.awaitility.Awaitility.await; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyDouble; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.never; +import static org.mockito.BDDMockito.spy; +import static org.mockito.BDDMockito.then; +import static org.mockito.BDDMockito.willAnswer; + +@ExtendWith(MockitoExtension.class) +public class TbSaveToCustomCassandraTableNodeTest extends AbstractRuleNodeUpgradeTest { + + private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("ac4ca02e-2ae6-404a-8f7e-c4ae31c56aa7")); + private final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("64ad971e-9cfa-49e4-9f59-faa1a2350c6e")); + + private final ListeningExecutor dbCallbackExecutor = new TestDbCallbackExecutor(); + + private TbSaveToCustomCassandraTableNode node; + private TbSaveToCustomCassandraTableNodeConfiguration config; + + @Mock + private TbContext ctxMock; + @Mock + private CassandraCluster cassandraClusterMock; + @Mock + private GuavaSession sessionMock; + @Mock + private PreparedStatement preparedStatementMock; + @Mock + private BoundStatement boundStatementMock; + @Mock + private BoundStatementBuilder boundStatementBuilderMock; + @Mock + private ColumnDefinitions columnDefinitionsMock; + @Mock + private CodecRegistry codecRegistryMock; + @Mock + private ProtocolVersion protocolVersionMock; + @Mock + private Node nodeMock; + @Mock + private Metadata metadataMock; + @Mock + private KeyspaceMetadata keyspaceMetadataMock; + @Mock + private TableMetadata tableMetadataMock; + + @BeforeEach + public void setUp() { + node = spy(new TbSaveToCustomCassandraTableNode()); + config = new TbSaveToCustomCassandraTableNodeConfiguration().defaultConfiguration(); + } + + @AfterEach + public void tearDown() { + node.destroy(); + } + + @Test + public void verifyDefaultConfig() { + assertThat(config.getTableName()).isEqualTo(""); + assertThat(config.getFieldsMapping()).isEqualTo(Map.of("", "")); + assertThat(config.getDefaultTtl()).isEqualTo(0); + } + + @Test + public void givenCassandraClusterIsMissing_whenInit_thenThrowsException() { + var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + assertThatThrownBy(() -> node.init(ctxMock, configuration)) + .isInstanceOf(TbNodeException.class) + .hasMessage("Unable to connect to Cassandra database") + .extracting(e -> ((TbNodeException) e).isUnrecoverable()) + .isEqualTo(true); + } + + @Test + public void givenTableDoesNotExist_whenInit_thenThrowsException() { + config.setTableName("test_table"); + var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + + mockCassandraCluster(); + given(keyspaceMetadataMock.getTable(anyString())).willReturn(Optional.empty()); + + assertThatThrownBy(() -> node.init(ctxMock, configuration)) + .isInstanceOf(TbNodeException.class) + .hasMessage("Table 'cs_tb_test_table' does not exist in Cassandra cluster.") + .extracting(e -> ((TbNodeException) e).isUnrecoverable()) + .isEqualTo(false); + } + + @Test + public void givenFieldsMapIsEmpty_whenInit_thenThrowsException() { + config.setTableName("test_table"); + config.setFieldsMapping(emptyMap()); + var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config)); + + mockCassandraCluster(); + + assertThatThrownBy(() -> node.init(ctxMock, configuration)) + .isInstanceOf(TbNodeException.class) + .hasMessage("Fields(key,value) map is empty!"); + } + + @Test + public void givenInvalidMessageStructure_whenOnMsg_thenThrowsException() throws TbNodeException { + config.setTableName("temperature_sensor"); + config.setFieldsMapping(Map.of("temp", "temperature")); + + mockOnInit(); + + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_STRING); + assertThatThrownBy(() -> node.onMsg(ctxMock, msg)) + .isInstanceOf(IllegalStateException.class) + .hasMessage("Invalid message structure, it is not a JSON Object: " + null); + } + + @Test + public void givenDataKeyIsMissingInMsg_whenOnMsg_thenThrowsException() throws TbNodeException { + config.setTableName("temperature_sensor"); + config.setFieldsMapping(Map.of("temp", "temperature")); + + mockOnInit(); + mockBoundStatement(); + + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + + String data = """ + { + "humidity": 77 + } + """; + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, data); + assertThatThrownBy(() -> node.onMsg(ctxMock, msg)) + .isInstanceOf(RuntimeException.class) + .hasMessage("Message data doesn't contain key: 'temp'!"); + } + + @Test + public void givenUnsupportedData_whenOnMsg_thenThrowsException() throws TbNodeException { + config.setTableName("temperature_sensor"); + config.setFieldsMapping(Map.of("temp", "temperature")); + + mockOnInit(); + mockBoundStatement(); + + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + + String data = """ + { + "temp": [value] + } + """; + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, data); + assertThatThrownBy(() -> node.onMsg(ctxMock, msg)) + .isInstanceOf(RuntimeException.class) + .hasMessage("Message data key: 'temp' with value: '[\"value\"]' is not a JSON Object or JSON Primitive!"); + } + + @ParameterizedTest + @MethodSource + public void givenTtl_whenOnMsg_thenVerifyStatement(int ttlFromConfig, + String expectedQuery, + Consumer verifyBuilder) throws TbNodeException { + config.setTableName("readings"); + config.setFieldsMapping(Map.of("$entityId", "entityIdTableColumn")); + config.setDefaultTtl(ttlFromConfig); + + mockOnInit(); + willAnswer(invocation -> boundStatementBuilderMock).given(node).getStmtBuilder(); + given(boundStatementBuilderMock.setUuid(anyInt(), any(UUID.class))).willReturn(boundStatementBuilderMock); + given(boundStatementBuilderMock.build()).willReturn(boundStatementMock); + mockSubmittingCassandraTask(); + + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT); + node.onMsg(ctxMock, msg); + + then(sessionMock).should().prepare(expectedQuery); + verifyBuilder.accept(boundStatementBuilderMock); + } + + private static Stream givenTtl_whenOnMsg_thenVerifyStatement() { + return Stream.of( + Arguments.of(0, "INSERT INTO cs_tb_readings(entityIdTableColumn) VALUES(?)", + (Consumer) builder -> { + then(builder).should(never()).setInt(anyInt(), anyInt()); + }), + Arguments.of(20, "INSERT INTO cs_tb_readings(entityIdTableColumn) VALUES(?) USING TTL ?", + (Consumer) builder -> { + then(builder).should().setInt(1, 20); + }) + ); + } + + @Test + public void givenValidMsgStructure_whenOnMsg_thenVerifyMatchOfValuesInsertionOrderIntoStatementAndSaveToCustomCassandraTable() throws TbNodeException { + config.setDefaultTtl(25); + config.setTableName("readings"); + Map mappings = Map.of( + "$entityId", "entityIdTableColumn", + "doubleField", "doubleTableColumn", + "longField", "longTableColumn", + "booleanField", "booleanTableColumn", + "stringField", "stringTableColumn", + "jsonField", "jsonTableColumn" + ); + config.setFieldsMapping(mappings); + + mockOnInit(); + mockBoundStatementBuilder(); + mockSubmittingCassandraTask(); + + node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config))); + String data = """ + { + "doubleField": 22.5, + "longField": 56, + "booleanField": true, + "stringField": "some string", + "jsonField": { + "key": "value" + } + } + """; + TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, data); + node.onMsg(ctxMock, msg); + + verifySettingStatementBuilder(); + ArgumentCaptor taskCaptor = ArgumentCaptor.forClass(CassandraStatementTask.class); + then(ctxMock).should().submitCassandraWriteTask(taskCaptor.capture()); + CassandraStatementTask task = taskCaptor.getValue(); + assertThat(task.getTenantId()).isEqualTo(TENANT_ID); + assertThat(task.getSession()).isEqualTo(sessionMock); + assertThat(task.getStatement()).isEqualTo(boundStatementMock); + await().atMost(1, TimeUnit.SECONDS).untilAsserted( + () -> then(ctxMock).should().tellSuccess(msg) + ); + } + + @Override + protected TbNode getTestNode() { + return node; + } + + private static Stream givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() { + return Stream.of( + // config for version 1 with upgrade from version 0 + Arguments.of(0, + "{\"tableName\":\"\",\"fieldsMapping\":{\"\":\"\"}}", + true, + "{\"tableName\":\"\",\"fieldsMapping\":{\"\":\"\"},\"defaultTtl\":0}" + ), + // default config for version 1 with upgrade from version 1 + Arguments.of(1, + "{\"tableName\":\"\",\"fieldsMapping\":{\"\":\"\"},\"defaultTtl\":0}", + false, + "{\"tableName\":\"\",\"fieldsMapping\":{\"\":\"\"},\"defaultTtl\":0}" + ) + ); + } + + private void mockOnInit() { + mockCassandraCluster(); + given(cassandraClusterMock.getDefaultWriteConsistencyLevel()).willReturn(DefaultConsistencyLevel.ONE); + given(sessionMock.prepare(anyString())).willReturn(preparedStatementMock); + } + + private void mockCassandraCluster() { + given(ctxMock.getCassandraCluster()).willReturn(cassandraClusterMock); + given(cassandraClusterMock.getSession()).willReturn(sessionMock); + given(sessionMock.getMetadata()).willReturn(metadataMock); + given(cassandraClusterMock.getKeyspaceName()).willReturn("test_keyspace"); + given(metadataMock.getKeyspace(anyString())).willReturn(Optional.of(keyspaceMetadataMock)); + given(keyspaceMetadataMock.getTable(anyString())).willReturn(Optional.of(tableMetadataMock)); + } + + private void mockBoundStatement() { + given(preparedStatementMock.bind()).willReturn(boundStatementMock); + given(boundStatementMock.getPreparedStatement()).willReturn(preparedStatementMock); + given(preparedStatementMock.getVariableDefinitions()).willReturn(columnDefinitionsMock); + given(boundStatementMock.codecRegistry()).willReturn(codecRegistryMock); + given(boundStatementMock.protocolVersion()).willReturn(protocolVersionMock); + given(boundStatementMock.getNode()).willReturn(nodeMock); + } + + private void mockBoundStatementBuilder() { + willAnswer(invocation -> boundStatementBuilderMock).given(node).getStmtBuilder(); + given(boundStatementBuilderMock.setUuid(anyInt(), any(UUID.class))).willReturn(boundStatementBuilderMock); + given(boundStatementBuilderMock.setDouble(anyInt(), anyDouble())).willReturn(boundStatementBuilderMock); + given(boundStatementBuilderMock.setLong(anyInt(), anyLong())).willReturn(boundStatementBuilderMock); + given(boundStatementBuilderMock.setBoolean(anyInt(), anyBoolean())).willReturn(boundStatementBuilderMock); + given(boundStatementBuilderMock.setString(anyInt(), anyString())).willReturn(boundStatementBuilderMock); + given(boundStatementBuilderMock.setInt(anyInt(), anyInt())).willReturn(boundStatementBuilderMock); + given(boundStatementBuilderMock.build()).willReturn(boundStatementMock); + } + + private void mockSubmittingCassandraTask() { + given(ctxMock.getTenantId()).willReturn(TENANT_ID); + willAnswer(invocation -> { + SettableFuture mainFuture = SettableFuture.create(); + mainFuture.set(new TbResultSet(null, null, null)); + return new TbResultSetFuture(mainFuture); + }).given(ctxMock).submitCassandraWriteTask(any()); + given(ctxMock.getDbCallbackExecutor()).willReturn(dbCallbackExecutor); + } + + private void verifySettingStatementBuilder() { + Map fieldsMap = (Map) ReflectionTestUtils.getField(node, "fieldsMap"); + List values = new ArrayList<>(fieldsMap.values()); + then(boundStatementBuilderMock).should().setUuid(values.indexOf("entityIdTableColumn"), DEVICE_ID.getId()); + then(boundStatementBuilderMock).should().setDouble(values.indexOf("doubleTableColumn"), 22.5); + then(boundStatementBuilderMock).should().setLong(values.indexOf("longTableColumn"), 56L); + then(boundStatementBuilderMock).should().setBoolean(values.indexOf("booleanTableColumn"), true); + then(boundStatementBuilderMock).should().setString(values.indexOf("stringTableColumn"), "some string"); + then(boundStatementBuilderMock).should().setString(values.indexOf("jsonTableColumn"), "{\"key\":\"value\"}"); + then(boundStatementBuilderMock).should().setInt(values.size(), 25); + } + +}