merge with master

This commit is contained in:
YevhenBondarenko 2024-11-28 16:25:18 +01:00
commit 9c826f4b70
30 changed files with 705 additions and 120 deletions

View File

@ -32,7 +32,6 @@ Collect and Visualize your IoT data in minutes by following this [guide](https:/
## Support
- [Community chat](https://gitter.im/thingsboard/chat)
- [Q&A forum](https://groups.google.com/forum/#!forum/thingsboard)
- [Stackoverflow](http://stackoverflow.com/questions/tagged/thingsboard)

View File

@ -38,8 +38,8 @@ CREATE TABLE IF NOT EXISTS mobile_app_bundle (
ios_app_id uuid UNIQUE,
layout_config varchar(16384),
oauth2_enabled boolean,
CONSTRAINT fk_android_app_id FOREIGN KEY (android_app_id) REFERENCES mobile_app(id),
CONSTRAINT fk_ios_app_id FOREIGN KEY (ios_app_id) REFERENCES mobile_app(id)
CONSTRAINT fk_android_app_id FOREIGN KEY (android_app_id) REFERENCES mobile_app(id) ON DELETE SET NULL,
CONSTRAINT fk_ios_app_id FOREIGN KEY (ios_app_id) REFERENCES mobile_app(id) ON DELETE SET NULL
);
CREATE INDEX IF NOT EXISTS mobile_app_bundle_tenant_id ON mobile_app_bundle(tenant_id);

View File

@ -110,6 +110,7 @@ import org.thingsboard.server.dao.user.UserService;
import org.thingsboard.server.dao.widget.WidgetTypeService;
import org.thingsboard.server.dao.widget.WidgetsBundleService;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.queue.TbQueueCallback;
import org.thingsboard.server.queue.common.SimpleTbQueueCallback;
import org.thingsboard.server.service.executors.PubSubRuleNodeExecutorProvider;
import org.thingsboard.server.service.script.RuleNodeJsScriptEngine;
@ -173,8 +174,13 @@ class DefaultTbContext implements TbContext {
@Override
public void input(TbMsg msg, RuleChainId ruleChainId) {
msg.pushToStack(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId());
nodeCtx.getChainActor().tell(new RuleChainInputMsg(ruleChainId, msg));
if (!msg.isValid()) {
return;
}
TbMsg tbMsg = msg.copyWithRuleChainId(ruleChainId);
tbMsg.pushToStack(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId());
TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getQueueName(), getTenantId(), tbMsg.getOriginator());
doEnqueue(tpi, tbMsg, new SimpleTbQueueCallback(md -> ack(msg), t -> tellFailure(msg, t)));
}
@Override
@ -210,14 +216,10 @@ class DefaultTbContext implements TbContext {
}
return;
}
TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder()
.setTenantIdMSB(getTenantId().getId().getMostSignificantBits())
.setTenantIdLSB(getTenantId().getId().getLeastSignificantBits())
.setTbMsg(TbMsg.toByteString(tbMsg)).build();
if (nodeCtx.getSelf().isDebugMode()) {
mainCtx.persistDebugOutput(nodeCtx.getTenantId(), nodeCtx.getSelf().getId(), tbMsg, "To Root Rule Chain");
}
mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg, new SimpleTbQueueCallback(
doEnqueue(tpi, tbMsg, new SimpleTbQueueCallback(
metadata -> {
if (onSuccess != null) {
onSuccess.run();
@ -232,6 +234,14 @@ class DefaultTbContext implements TbContext {
}));
}
private void doEnqueue(TopicPartitionInfo tpi, TbMsg tbMsg, TbQueueCallback callback) {
TransportProtos.ToRuleEngineMsg msg = TransportProtos.ToRuleEngineMsg.newBuilder()
.setTenantIdMSB(getTenantId().getId().getMostSignificantBits())
.setTenantIdLSB(getTenantId().getId().getLeastSignificantBits())
.setTbMsg(TbMsg.toByteString(tbMsg)).build();
mainCtx.getClusterService().pushMsgToRuleEngine(tpi, tbMsg.getId(), msg, callback);
}
@Override
public void enqueueForTellFailure(TbMsg tbMsg, String failureMessage) {
TopicPartitionInfo tpi = resolvePartition(tbMsg);

View File

@ -38,6 +38,7 @@ import org.springframework.web.util.UriComponents;
import org.springframework.web.util.UriComponentsBuilder;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.id.OAuth2ClientId;
import org.thingsboard.server.common.data.oauth2.PlatformType;
import org.thingsboard.server.dao.oauth2.OAuth2Configuration;
import org.thingsboard.server.dao.oauth2.OAuth2ClientService;
import org.thingsboard.server.queue.util.TbCoreComponent;
@ -85,8 +86,9 @@ public class CustomOAuth2AuthorizationRequestResolver implements OAuth2Authoriza
String registrationId = this.resolveRegistrationId(request);
String redirectUriAction = getAction(request, "login");
String appPackage = getAppPackage(request);
String platform = getPlatform(request);
String appToken = getAppToken(request);
return resolve(request, registrationId, redirectUriAction, appPackage, appToken);
return resolve(request, registrationId, redirectUriAction, appPackage, platform, appToken);
}
@Override
@ -96,8 +98,9 @@ public class CustomOAuth2AuthorizationRequestResolver implements OAuth2Authoriza
}
String redirectUriAction = getAction(request, "authorize");
String appPackage = getAppPackage(request);
String platform = getPlatform(request);
String appToken = getAppToken(request);
return resolve(request, registrationId, redirectUriAction, appPackage, appToken);
return resolve(request, registrationId, redirectUriAction, appPackage, platform, appToken);
}
private String getAction(HttpServletRequest request, String defaultAction) {
@ -112,11 +115,15 @@ public class CustomOAuth2AuthorizationRequestResolver implements OAuth2Authoriza
return request.getParameter("pkg");
}
private String getPlatform(HttpServletRequest request) {
return request.getParameter("platform");
}
private String getAppToken(HttpServletRequest request) {
return request.getParameter("appToken");
}
private OAuth2AuthorizationRequest resolve(HttpServletRequest request, String oauth2ClientId, String redirectUriAction, String appPackage, String appToken) {
private OAuth2AuthorizationRequest resolve(HttpServletRequest request, String oauth2ClientId, String redirectUriAction, String appPackage, String platform, String appToken) {
if (oauth2ClientId == null) {
return null;
}
@ -132,11 +139,19 @@ public class CustomOAuth2AuthorizationRequestResolver implements OAuth2Authoriza
if (StringUtils.isEmpty(appToken)) {
throw new IllegalArgumentException("Invalid application token.");
} else {
String appSecret = this.oAuth2ClientService.findAppSecret(new OAuth2ClientId(UUID.fromString(oauth2ClientId)), appPackage);
if (StringUtils.isEmpty(appSecret)) {
throw new IllegalArgumentException("Invalid package: " + appPackage + ". No application secret found for Client Registration with given application package.");
String callbackUrlScheme;
if (platform != null) {
callbackUrlScheme = validateMobileAppToken(oauth2ClientId, appPackage, PlatformType.valueOf(platform), appToken);
} else {
// for backward compatibility with mobile apps of version 1.3.0 and less try to validate token with android and then ios app secret
try {
callbackUrlScheme = validateMobileAppToken(oauth2ClientId, appPackage, PlatformType.ANDROID, appToken);
} catch (IllegalArgumentException e) {
log.debug("Failed attempt to validate android application token, oauth client id: [{}], package name: [{}], appToken [{}] ",
oauth2ClientId, appPackage, appToken, e);
callbackUrlScheme = validateMobileAppToken(oauth2ClientId, appPackage, PlatformType.IOS, appToken);
}
}
String callbackUrlScheme = this.oAuth2AppTokenFactory.validateTokenAndGetCallbackUrlScheme(appPackage, appToken, appSecret);
attributes.put(TbOAuth2ParameterNames.CALLBACK_URL_SCHEME, callbackUrlScheme);
}
}
@ -174,6 +189,14 @@ public class CustomOAuth2AuthorizationRequestResolver implements OAuth2Authoriza
.build();
}
private String validateMobileAppToken(String oauth2ClientId, String appPackage, PlatformType platformType, String appToken) {
String appSecret = this.oAuth2ClientService.findAppSecret(new OAuth2ClientId(UUID.fromString(oauth2ClientId)), appPackage, platformType);
if (StringUtils.isEmpty(appSecret)) {
throw new IllegalArgumentException("Invalid package: " + appPackage + ". No application secret found for Client Registration with given application package.");
}
return this.oAuth2AppTokenFactory.validateTokenAndGetCallbackUrlScheme(appPackage, appToken, appSecret);
}
private String resolveRegistrationId(HttpServletRequest request) {
if (this.authorizationRequestMatcher.matches(request)) {
return this.authorizationRequestMatcher

View File

@ -41,6 +41,7 @@ import org.thingsboard.server.common.data.mobile.LoginMobileInfo;
import org.thingsboard.server.common.data.mobile.UserMobileInfo;
import org.thingsboard.server.common.data.mobile.app.MobileApp;
import org.thingsboard.server.common.data.mobile.app.MobileAppVersionInfo;
import org.thingsboard.server.common.data.mobile.app.StoreInfo;
import org.thingsboard.server.common.data.mobile.bundle.MobileAppBundle;
import org.thingsboard.server.common.data.mobile.layout.MobilePage;
import org.thingsboard.server.common.data.oauth2.OAuth2ClientLoginInfo;
@ -55,6 +56,7 @@ import org.thingsboard.server.service.security.permission.Operation;
import org.thingsboard.server.service.security.permission.Resource;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
@ -83,7 +85,9 @@ public class MobileAppController extends BaseController {
@RequestParam PlatformType platform) {
List<OAuth2ClientLoginInfo> oauth2Clients = oAuth2ClientService.findOAuth2ClientLoginInfosByMobilePkgNameAndPlatformType(pkgName, platform);
MobileApp mobileApp = mobileAppService.findMobileAppByPkgNameAndPlatformType(pkgName, platform);
return new LoginMobileInfo(oauth2Clients, mobileApp != null ? mobileApp.getVersionInfo() : null);
StoreInfo storeInfo = Optional.ofNullable(mobileApp).map(MobileApp::getStoreInfo).orElse(null);
MobileAppVersionInfo versionInfo = Optional.ofNullable(mobileApp).map(MobileApp::getVersionInfo).orElse(null);
return new LoginMobileInfo(oauth2Clients, storeInfo, versionInfo);
}
@ApiOperation(value = "Get user mobile app basic info (getUserMobileInfo)", notes = AVAILABLE_FOR_ANY_AUTHORIZED_USER)
@ -97,17 +101,10 @@ public class MobileAppController extends BaseController {
User user = userService.findUserById(securityUser.getTenantId(), securityUser.getId());
HomeDashboardInfo homeDashboardInfo = securityUser.isSystemAdmin() ? null : getHomeDashboardInfo(securityUser, user.getAdditionalInfo());
MobileAppBundle mobileAppBundle = mobileAppBundleService.findMobileAppBundleByPkgNameAndPlatform(securityUser.getTenantId(), pkgName, platform);
return new UserMobileInfo(user, homeDashboardInfo, getVisiblePages(mobileAppBundle));
}
@ApiOperation(value = "Get mobile app version info (getMobileVersionInfo)")
@GetMapping(value = "/mobile/versionInfo")
public MobileAppVersionInfo getMobileVersionInfo(@Parameter(description = "Mobile application package name")
@RequestParam String pkgName,
@Parameter(description = "Platform type", schema = @Schema(allowableValues = {"ANDROID", "IOS"}))
@RequestParam PlatformType platform) {
MobileApp mobileApp = mobileAppService.findMobileAppByPkgNameAndPlatformType(pkgName, platform);
return mobileApp != null ? mobileApp.getVersionInfo() : null;
StoreInfo storeInfo = Optional.ofNullable(mobileApp).map(MobileApp::getStoreInfo).orElse(null);
MobileAppVersionInfo versionInfo = Optional.ofNullable(mobileApp).map(MobileApp::getVersionInfo).orElse(null);
return new UserMobileInfo(user, storeInfo, versionInfo, homeDashboardInfo, getVisiblePages(mobileAppBundle));
}
@ApiOperation(value = "Save Or update Mobile app (saveMobileApp)",

View File

@ -108,6 +108,7 @@ public class ThingsboardInstallService {
String fromVersion = databaseSchemaVersionService.getDbSchemaVersion();
String toVersion = databaseSchemaVersionService.getPackageSchemaVersion();
cacheCleanupService.clearCache(fromVersion, toVersion);
log.info("Upgrading ThingsBoard from version {} to {} ...", fromVersion, toVersion);
databaseEntitiesUpgradeService.upgradeDatabase(fromVersion, toVersion);
// dataUpdateService.updateData(fromVersion, toVersion);
installScripts.updateResourcesUsage();

View File

@ -35,8 +35,6 @@ import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.entitiy.AbstractTbEntityService;
import org.thingsboard.server.service.security.system.SystemSecurityService;
import java.util.concurrent.TimeUnit;
@Service
@TbCoreComponent
@AllArgsConstructor
@ -90,16 +88,10 @@ public class DefaultUserService extends AbstractTbEntityService implements TbUse
public UserActivationLink getActivationLink(TenantId tenantId, CustomerId customerId, UserId userId, HttpServletRequest request) throws ThingsboardException {
UserCredentials userCredentials = userService.findUserCredentialsByUserId(tenantId, userId);
if (!userCredentials.isEnabled() && userCredentials.getActivateToken() != null) {
long ttl = userCredentials.getActivationTokenTtl();
if (ttl < TimeUnit.MINUTES.toMillis(15)) { // renew link if less than 15 minutes before expiration
userCredentials = userService.generateUserActivationToken(userCredentials);
userCredentials = userService.saveUserCredentials(tenantId, userCredentials);
ttl = userCredentials.getActivationTokenTtl();
log.debug("[{}][{}] Regenerated expired user activation token", tenantId, userId);
}
userCredentials = userService.checkUserActivationToken(tenantId, userCredentials);
String baseUrl = systemSecurityService.getBaseUrl(tenantId, customerId, request);
String link = baseUrl + "/api/noauth/activate?activateToken=" + userCredentials.getActivateToken();
return new UserActivationLink(link, ttl);
return new UserActivationLink(link, userCredentials.getActivationTokenTtl());
} else {
throw new ThingsboardException("User is already activated!", ThingsboardErrorCode.BAD_REQUEST_PARAMS);
}

View File

@ -53,6 +53,11 @@ public class DefaultDatabaseSchemaSettingsService implements DatabaseSchemaSetti
return;
}
String product = getProductFromDb();
if (!CURRENT_PRODUCT.equals(product)) {
onSchemaSettingsError(String.format("Upgrade failed: can't upgrade ThingsBoard %s database using ThingsBoard %s.", product, CURRENT_PRODUCT));
}
if (dbSchemaVersion.equals(getPackageSchemaVersion())) {
onSchemaSettingsError("Upgrade failed: database already upgraded to current version. You can set SKIP_SCHEMA_VERSION_CHECK to 'true' if force re-upgrade needed.");
}
@ -62,11 +67,6 @@ public class DefaultDatabaseSchemaSettingsService implements DatabaseSchemaSetti
dbSchemaVersion, SUPPORTED_VERSIONS_FOR_UPGRADE
));
}
String product = getProductFromDb();
if (!CURRENT_PRODUCT.equals(product)) {
onSchemaSettingsError(String.format("Upgrade failed: can't upgrade ThingsBoard %s database using ThingsBoard %s.", product, CURRENT_PRODUCT));
}
}
@Deprecated(forRemoval = true, since = "3.9.0")

View File

@ -44,7 +44,7 @@ public class DefaultCacheCleanupService implements CacheCleanupService {
@Override
public void clearCache(String from, String to) throws Exception {
log.info("Clearing cache to upgrade from version {} to {}", from, to);
clearAllCaches();
clearAll();
}
void clearAllCaches() {

View File

@ -16,10 +16,12 @@
package org.thingsboard.server.service.install.update;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.common.data.Dashboard;
import org.thingsboard.server.common.data.HasImage;
import org.thingsboard.server.common.data.id.DashboardId;
@ -42,6 +44,8 @@ import org.thingsboard.server.dao.widget.WidgetTypeDao;
import org.thingsboard.server.dao.widget.WidgetTypeService;
import org.thingsboard.server.dao.widget.WidgetsBundleDao;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Function;
@ -86,44 +90,60 @@ public class ResourcesUpdater {
log.debug("Created/updated system images and resources for default dashboard '{}'", defaultDashboard.getTitle());
}
@SneakyThrows
public void updateDashboardsResources() {
log.info("Updating resources usage in dashboards");
var dashboards = new PageDataIterable<>(dashboardService::findAllDashboardsIds, 512);
int totalCount = 0;
int updatedCount = 0;
for (DashboardId dashboardId : dashboards) {
Dashboard dashboard = dashboardService.findDashboardById(TenantId.SYS_TENANT_ID, dashboardId);
boolean updated = resourceService.updateResourcesUsage(dashboard); // will convert resources ids to new structure
if (updated) {
dashboardService.saveDashboard(dashboard);
updatedCount++;
}
totalCount++;
var executor = ThingsBoardExecutors.newLimitedTasksExecutor(4, 4, "dashboards-resources-upgrade");
if (totalCount % 1000 == 0) {
log.info("Processed {} dashboards, updated {}", totalCount, updatedCount);
}
var dashboards = new PageDataIterable<>(dashboardService::findAllDashboardsIds, 512);
AtomicInteger totalCount = new AtomicInteger();
AtomicInteger updatedCount = new AtomicInteger();
for (DashboardId dashboardId : dashboards) {
executor.submit(() -> {
Dashboard dashboard = dashboardService.findDashboardById(TenantId.SYS_TENANT_ID, dashboardId);
boolean updated = resourceService.updateResourcesUsage(dashboard); // will convert resources ids to new structure
if (updated) {
dashboardService.saveDashboard(dashboard);
updatedCount.incrementAndGet();
}
if (totalCount.incrementAndGet() % 1000 == 0) {
log.info("Processed {} dashboards, updated {}", totalCount, updatedCount);
}
});
}
executor.shutdown();
if (!executor.awaitTermination(10, TimeUnit.MINUTES)) {
throw new RuntimeException("Dashboards resources update timeout"); // just in case, should happen
}
log.info("Updated {} dashboards", updatedCount);
}
@SneakyThrows
public void updateWidgetsResources() {
log.info("Updating resources usage in widgets");
int totalCount = 0;
int updatedCount = 0;
var executor = ThingsBoardExecutors.newLimitedTasksExecutor(4, 4, "widgets-resources-upgrade");
AtomicInteger totalCount = new AtomicInteger();
AtomicInteger updatedCount = new AtomicInteger();
var widgets = new PageDataIterable<>(widgetTypeService::findAllWidgetTypesIds, 512);
for (WidgetTypeId widgetTypeId : widgets) {
WidgetTypeDetails widgetTypeDetails = widgetTypeService.findWidgetTypeDetailsById(TenantId.SYS_TENANT_ID, widgetTypeId);
boolean updated = resourceService.updateResourcesUsage(widgetTypeDetails);
if (updated) {
widgetTypeService.saveWidgetType(widgetTypeDetails);
updatedCount++;
}
totalCount++;
executor.submit(() -> {
WidgetTypeDetails widgetTypeDetails = widgetTypeService.findWidgetTypeDetailsById(TenantId.SYS_TENANT_ID, widgetTypeId);
boolean updated = resourceService.updateResourcesUsage(widgetTypeDetails);
if (updated) {
widgetTypeService.saveWidgetType(widgetTypeDetails);
updatedCount.incrementAndGet();
}
if (totalCount.incrementAndGet() % 200 == 0) {
log.info("Processed {} widgets, updated {}", totalCount, updatedCount);
}
});
}
if (totalCount % 200 == 0) {
log.info("Processed {} widgets, updated {}", totalCount, updatedCount);
}
executor.shutdown();
if (!executor.awaitTermination(10, TimeUnit.MINUTES)) {
throw new RuntimeException("Widgets resources update timeout");
}
log.info("Updated {} widgets", updatedCount);
}

View File

@ -31,7 +31,7 @@ import org.springframework.security.core.context.SecurityContext;
import org.springframework.security.core.context.SecurityContextHolder;
import org.thingsboard.common.util.DonAsynchron;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.common.adaptor.JsonConverter;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.EntityType;
@ -71,8 +71,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@ -90,16 +89,11 @@ public abstract class AbstractBulkImportService<E extends HasId<? extends Entity
@Autowired
private EntityActionService entityActionService;
private ThreadPoolExecutor executor;
private ExecutorService executor;
@PostConstruct
private void initExecutor() {
if (executor == null) {
executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(),
60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(150_000),
ThingsBoardThreadFactory.forName("bulk-import"), new ThreadPoolExecutor.CallerRunsPolicy());
executor.allowCoreThreadTimeOut(true);
}
executor = ThingsBoardExecutors.newLimitedTasksExecutor(Runtime.getRuntime().availableProcessors(), 150_000, "bulk-import");
}
public final BulkImportResult<E> processBulkImport(BulkImportRequest request, SecurityUser user) throws Exception {
@ -287,8 +281,8 @@ public abstract class AbstractBulkImportService<E extends HasId<? extends Entity
@PreDestroy
private void shutdownExecutor() {
if (!executor.isTerminating()) {
executor.shutdown();
if (executor != null) {
executor.shutdownNow();
}
}
@ -297,6 +291,7 @@ public abstract class AbstractBulkImportService<E extends HasId<? extends Entity
private final Map<BulkImportColumnType, String> fields = new LinkedHashMap<>();
private final Map<BulkImportRequest.ColumnMapping, ParsedValue> kvs = new LinkedHashMap<>();
private int lineNumber;
}
@Data

View File

@ -17,6 +17,7 @@ package org.thingsboard.server.rules.flow;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -59,6 +60,7 @@ import org.thingsboard.server.dao.event.EventService;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.mockito.Mockito.spy;
@ -331,6 +333,15 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule
RuleChain finalRuleChain = rootRuleChain;
RuleNode lastRuleNode = secondaryMetaData.getNodes().stream().filter(node -> !node.getId().equals(finalRuleChain.getFirstRuleNodeId())).findFirst().get();
Awaitility.await().atMost(TIMEOUT, TimeUnit.SECONDS)
.until(() ->
getDebugEvents(savedTenant.getId(), lastRuleNode.getId(), 1000)
.getData()
.stream()
.filter(filterByPostTelemetryEventType())
.count() == 2
);
eventsPage = getDebugEvents(savedTenant.getId(), lastRuleNode.getId(), 1000);
events = eventsPage.getData().stream().filter(filterByPostTelemetryEventType()).collect(Collectors.toList());

View File

@ -40,7 +40,7 @@ public interface OAuth2ClientService extends EntityDaoService {
OAuth2Client findOAuth2ClientById(TenantId tenantId, OAuth2ClientId providerId);
String findAppSecret(OAuth2ClientId oAuth2ClientId, String pkgName);
String findAppSecret(OAuth2ClientId oAuth2ClientId, String pkgName, PlatformType platformType);
void deleteOAuth2ClientById(TenantId tenantId, OAuth2ClientId oAuth2ClientId);

View File

@ -63,6 +63,8 @@ public interface UserService extends EntityDaoService {
UserCredentials generateUserActivationToken(UserCredentials userCredentials);
UserCredentials checkUserActivationToken(TenantId tenantId, UserCredentials userCredentials);
UserCredentials replaceUserCredentials(TenantId tenantId, UserCredentials userCredentials);
void deleteUser(TenantId tenantId, User user);

View File

@ -16,9 +16,12 @@
package org.thingsboard.server.common.data.mobile;
import org.thingsboard.server.common.data.mobile.app.MobileAppVersionInfo;
import org.thingsboard.server.common.data.mobile.app.StoreInfo;
import org.thingsboard.server.common.data.oauth2.OAuth2ClientLoginInfo;
import java.util.List;
public record LoginMobileInfo(List<OAuth2ClientLoginInfo> oAuth2ClientLoginInfos, MobileAppVersionInfo versionInfo) {
public record LoginMobileInfo(List<OAuth2ClientLoginInfo> oAuth2ClientLoginInfos,
StoreInfo storeInfo,
MobileAppVersionInfo versionInfo) {
}

View File

@ -18,7 +18,13 @@ package org.thingsboard.server.common.data.mobile;
import com.fasterxml.jackson.databind.JsonNode;
import org.thingsboard.server.common.data.HomeDashboardInfo;
import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.mobile.app.MobileAppVersionInfo;
import org.thingsboard.server.common.data.mobile.app.StoreInfo;
public record UserMobileInfo(User user, HomeDashboardInfo homeDashboardInfo, JsonNode pages) {
public record UserMobileInfo(User user,
StoreInfo storeInfo,
MobileAppVersionInfo versionInfo,
HomeDashboardInfo homeDashboardInfo,
JsonNode pages) {
}

View File

@ -313,6 +313,14 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
sendResponseForAdaptorErrorOrCloseContext(ctx, topicName, msgId);
}
break;
case SUBSCRIBE:
MqttSubscribeMessage subscribeMessage = (MqttSubscribeMessage) msg;
processSubscribe(ctx, subscribeMessage);
break;
case UNSUBSCRIBE:
MqttUnsubscribeMessage unsubscribeMessage = (MqttUnsubscribeMessage) msg;
processUnsubscribe(ctx, unsubscribeMessage);
break;
case PINGREQ:
ctx.writeAndFlush(new MqttMessage(new MqttFixedHeader(PINGRESP, false, AT_MOST_ONCE, false, 0)));
break;
@ -750,7 +758,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
private void processSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage mqttMsg) {
if (!checkConnected(ctx, mqttMsg)) {
if (!checkConnected(ctx, mqttMsg) && !deviceSessionCtx.isProvisionOnly()) {
ctx.writeAndFlush(createSubAckMessage(mqttMsg.variableHeader().messageId(), Collections.singletonList(MqttReasonCodes.SubAck.NOT_AUTHORIZED.byteValue() & 0xFF)));
return;
}
@ -760,6 +768,16 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
for (MqttTopicSubscription subscription : mqttMsg.payload().topicSubscriptions()) {
String topic = subscription.topicName();
MqttQoS reqQoS = subscription.qualityOfService();
if (deviceSessionCtx.isProvisionOnly()) {
if (MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC.equals(topic)) {
registerSubQoS(topic, grantedQoSList, reqQoS);
} else {
log.debug("[{}] Failed to subscribe to [{}][{}]", sessionId, topic, reqQoS);
grantedQoSList.add(ReturnCodeResolver.getSubscriptionReturnCode(deviceSessionCtx.getMqttVersion(), MqttReasonCodes.SubAck.TOPIC_FILTER_INVALID));
}
activityReported = true;
continue;
}
if (deviceSessionCtx.isDeviceSubscriptionAttributesTopic(topic)) {
processAttributesSubscribe(grantedQoSList, topic, reqQoS, TopicType.V1);
activityReported = true;
@ -822,7 +840,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
case MqttTopics.GATEWAY_ATTRIBUTES_TOPIC:
case MqttTopics.GATEWAY_RPC_TOPIC:
case MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC:
case MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC:
case MqttTopics.DEVICE_FIRMWARE_RESPONSES_TOPIC:
case MqttTopics.DEVICE_FIRMWARE_ERROR_TOPIC:
case MqttTopics.DEVICE_SOFTWARE_RESPONSES_TOPIC:
@ -873,7 +890,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
}
private void processUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage mqttMsg) {
if (!checkConnected(ctx, mqttMsg)) {
if (!checkConnected(ctx, mqttMsg) && !deviceSessionCtx.isProvisionOnly()) {
ctx.writeAndFlush(createUnSubAckMessage(mqttMsg.variableHeader().messageId(),
Collections.singletonList((short) MqttReasonCodes.UnsubAck.NOT_AUTHORIZED.byteValue())));
return;
@ -887,6 +904,14 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
mqttQoSMap.remove(matcher);
try {
short resultValue = MqttReasonCodes.UnsubAck.SUCCESS.byteValue();
if (deviceSessionCtx.isProvisionOnly()) {
if (!matcher.matches(MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC)) {
resultValue = MqttReasonCodes.UnsubAck.TOPIC_FILTER_INVALID.byteValue();
}
unSubResults.add(resultValue);
activityReported = true;
continue;
}
switch (topicName) {
case MqttTopics.DEVICE_ATTRIBUTES_TOPIC:
case MqttTopics.DEVICE_ATTRIBUTES_SHORT_TOPIC:
@ -917,7 +942,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
case MqttTopics.GATEWAY_ATTRIBUTES_TOPIC:
case MqttTopics.GATEWAY_RPC_TOPIC:
case MqttTopics.GATEWAY_ATTRIBUTES_RESPONSE_TOPIC:
case MqttTopics.DEVICE_PROVISION_RESPONSE_TOPIC:
case MqttTopics.DEVICE_FIRMWARE_RESPONSES_TOPIC:
case MqttTopics.DEVICE_FIRMWARE_ERROR_TOPIC:
case MqttTopics.DEVICE_SOFTWARE_RESPONSES_TOPIC:

View File

@ -17,6 +17,9 @@ package org.thingsboard.common.util;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThingsBoardExecutors {
@ -48,4 +51,16 @@ public class ThingsBoardExecutors {
return newWorkStealingPool(parallelism, clazz.getSimpleName());
}
/*
* executor with limited tasks queue size
* */
public static ExecutorService newLimitedTasksExecutor(int threads, int maxQueueSize, String name) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(threads, threads,
60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(maxQueueSize),
ThingsBoardThreadFactory.forName(name),
new ThreadPoolExecutor.CallerRunsPolicy());
executor.allowCoreThreadTimeOut(true);
return executor;
}
}

View File

@ -43,7 +43,6 @@ import java.util.Optional;
public class MobileAppServiceImpl extends AbstractEntityService implements MobileAppService {
private static final String PLATFORM_TYPE_IS_REQUIRED = "Platform type is required if package name is specified";
private static final String MOBILE_APP_BUNDLE_CONSTRAINT = "The mobile app referenced by the mobile bundle cannot be deleted!";
@Autowired
private MobileAppDao mobileAppDao;
@ -68,15 +67,8 @@ public class MobileAppServiceImpl extends AbstractEntityService implements Mobil
@Override
public void deleteMobileAppById(TenantId tenantId, MobileAppId mobileAppId) {
log.trace("Executing deleteMobileAppById [{}]", mobileAppId.getId());
try {
mobileAppDao.removeById(tenantId, mobileAppId.getId());
eventPublisher.publishEvent(DeleteEntityEvent.builder().tenantId(tenantId).entityId(mobileAppId).build());
} catch (Exception e) {
checkConstraintViolation(e,
Map.of("fk_android_app_id", MOBILE_APP_BUNDLE_CONSTRAINT,
"fk_ios_app_id", MOBILE_APP_BUNDLE_CONSTRAINT));
throw e;
}
mobileAppDao.removeById(tenantId, mobileAppId.getId());
eventPublisher.publishEvent(DeleteEntityEvent.builder().tenantId(tenantId).entityId(mobileAppId).build());
}
@Override

View File

@ -38,7 +38,7 @@ public interface OAuth2ClientDao extends Dao<OAuth2Client> {
List<OAuth2Client> findByMobileAppBundleId(UUID mobileAppBundleId);
String findAppSecret(UUID id, String pkgName);
String findAppSecret(UUID id, String pkgName, PlatformType platformType);
void deleteByTenantId(UUID tenantId);

View File

@ -34,6 +34,7 @@ import org.thingsboard.server.dao.entity.AbstractEntityService;
import org.thingsboard.server.dao.eventsourcing.DeleteEntityEvent;
import org.thingsboard.server.dao.eventsourcing.SaveEntityEvent;
import org.thingsboard.server.dao.service.DataValidator;
import org.thingsboard.server.dao.service.Validator;
import java.util.Comparator;
import java.util.List;
@ -45,6 +46,8 @@ import java.util.stream.Collectors;
@Service("OAuth2ClientService")
public class OAuth2ClientServiceImpl extends AbstractEntityService implements OAuth2ClientService {
private static final String PLATFORM_TYPE_IS_REQUIRED = "Platform type is required if package name is specified";
@Autowired
private OAuth2ClientDao oauth2ClientDao;
@Autowired
@ -90,9 +93,10 @@ public class OAuth2ClientServiceImpl extends AbstractEntityService implements OA
}
@Override
public String findAppSecret(OAuth2ClientId oAuth2ClientId, String pkgName) {
log.trace("Executing findAppSecret oAuth2ClientId = [{}] pkgName = [{}]", oAuth2ClientId, pkgName);
return oauth2ClientDao.findAppSecret(oAuth2ClientId.getId(), pkgName);
public String findAppSecret(OAuth2ClientId oAuth2ClientId, String pkgName, PlatformType platformType) {
log.trace("Executing findAppSecret oAuth2ClientId = [{}] pkgName = [{}], platform [{}]", oAuth2ClientId, pkgName, platformType);
Validator.checkNotNull(platformType, PLATFORM_TYPE_IS_REQUIRED);
return oauth2ClientDao.findAppSecret(oAuth2ClientId.getId(), pkgName, platformType);
}
@Override

View File

@ -80,8 +80,8 @@ public class JpaOAuth2ClientDao extends JpaAbstractDao<OAuth2ClientEntity, OAuth
}
@Override
public String findAppSecret(UUID id, String pkgName) {
return repository.findAppSecret(id, pkgName);
public String findAppSecret(UUID id, String pkgName, PlatformType platformType) {
return repository.findAppSecret(id, pkgName, platformType);
}
@Override

View File

@ -22,6 +22,7 @@ import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import org.springframework.transaction.annotation.Transactional;
import org.thingsboard.server.common.data.oauth2.PlatformType;
import org.thingsboard.server.dao.model.sql.OAuth2ClientEntity;
import java.util.List;
@ -72,9 +73,10 @@ public interface OAuth2ClientRepository extends JpaRepository<OAuth2ClientEntity
"LEFT JOIN MobileAppBundleOauth2ClientEntity bc ON bc.mobileAppBundleId = b.id " +
"LEFT JOIN OAuth2ClientEntity c ON bc.oauth2ClientId = c.id " +
"WHERE c.id = :clientId " +
"AND a.pkgName = :pkgName")
"AND a.pkgName = :pkgName and a.platformType = :platformType")
String findAppSecret(@Param("clientId") UUID id,
@Param("pkgName") String pkgName);
@Param("pkgName") String pkgName,
@Param("platformType") PlatformType platformType);
@Transactional
@Modifying

View File

@ -663,7 +663,7 @@ public class DefaultEntityQueryRepository implements EntityQueryRepository {
.append("nr.").append(fromOrTo).append("_id").append(" = re.").append(toOrFrom).append("_id")
.append(" and ")
.append("nr.").append(fromOrTo).append("_type").append(" = re.").append(toOrFrom).append("_type");
notExistsPart.append(" and nr.relation_type_group = 'COMMON'"); // hit the index, the same condition are on the recursive query
notExistsPart.append(")");
whereFilter += " and ( r_int.lvl = " + entityFilter.getMaxLevel() + " OR " + notExistsPart.toString() + ")";
}
@ -755,7 +755,7 @@ public class DefaultEntityQueryRepository implements EntityQueryRepository {
.append("nr.").append(fromOrTo).append("_type").append(" = re.").append(toOrFrom).append("_type")
.append(" and ")
.append(whereFilter.toString().replaceAll("re\\.", "nr\\."));
notExistsPart.append(" and nr.relation_type_group = 'COMMON'"); // hit the index, the same condition are on the recursive query
notExistsPart.append(")");
whereFilter.append(" and ( r_int.lvl = ").append(entityFilter.getMaxLevel()).append(" OR ").append(notExistsPart.toString()).append(")");
}

View File

@ -292,6 +292,16 @@ public class UserServiceImpl extends AbstractCachedEntityService<UserCacheKey, U
return userCredentials;
}
@Override
public UserCredentials checkUserActivationToken(TenantId tenantId, UserCredentials userCredentials) {
if (userCredentials.getActivationTokenTtl() < TimeUnit.MINUTES.toMillis(15)) { // renew link if less than 15 minutes before expiration
userCredentials = generateUserActivationToken(userCredentials);
userCredentials = saveUserCredentials(tenantId, userCredentials);
log.debug("[{}][{}] Regenerated expired user activation token", tenantId, userCredentials.getUserId());
}
return userCredentials;
}
@Override
public UserCredentials replaceUserCredentials(TenantId tenantId, UserCredentials userCredentials) {
log.trace("Executing replaceUserCredentials [{}]", userCredentials);

View File

@ -642,8 +642,8 @@ CREATE TABLE IF NOT EXISTS mobile_app_bundle (
ios_app_id uuid UNIQUE,
layout_config varchar(16384),
oauth2_enabled boolean,
CONSTRAINT fk_android_app_id FOREIGN KEY (android_app_id) REFERENCES mobile_app(id),
CONSTRAINT fk_ios_app_id FOREIGN KEY (ios_app_id) REFERENCES mobile_app(id)
CONSTRAINT fk_android_app_id FOREIGN KEY (android_app_id) REFERENCES mobile_app(id) ON DELETE SET NULL,
CONSTRAINT fk_ios_app_id FOREIGN KEY (ios_app_id) REFERENCES mobile_app(id) ON DELETE SET NULL
);
CREATE TABLE IF NOT EXISTS domain_oauth2_client (

View File

@ -28,6 +28,7 @@ import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttReasonCodeAndPropertiesVariableHeader;
import io.netty.handler.codec.mqtt.MqttReasonCodes;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttVersion;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
@ -491,6 +492,48 @@ public class MqttClientTest extends AbstractContainerTest {
updateDeviceProfileWithProvisioningStrategy(deviceProfile, DeviceProfileProvisionType.DISABLED);
}
@Test
public void provisionRequestForCheckSubAckReceived() throws Exception {
DeviceProfile deviceProfile = testRestClient.getDeviceProfileById(device.getDeviceProfileId());
deviceProfile = updateDeviceProfileWithProvisioningStrategy(deviceProfile, DeviceProfileProvisionType.ALLOW_CREATE_NEW_DEVICES);
MqttMessageListener listener = new MqttMessageListener();
MqttClient mqttClient = getMqttClient("provision", listener, MqttVersion.MQTT_5);
final MqttReasonCodes.SubAck[] subAckResult = new MqttReasonCodes.SubAck[1];
mqttClient.setCallback(new MqttClientCallback() {
@Override
public void connectionLost(Throwable cause) {
}
@Override
public void onSuccessfulReconnect() {
}
@Override
public void onSubAck(MqttSubAckMessage subAckMessage) {
subAckResult[0] = subAckMessage.payload().typedReasonCodes().get(0);
}
}
);
mqttClient.on("/provision/response", listener, MqttQoS.AT_LEAST_ONCE).get(3 * timeoutMultiplier, TimeUnit.SECONDS);
TimeUnit.SECONDS.sleep(2 * timeoutMultiplier);
assertThat(subAckResult[0]).isNotNull();
assertThat(MqttReasonCodes.SubAck.GRANTED_QOS_1.equals(subAckResult[0]));
subAckResult[0] = null;
mqttClient.on("v1/devices/me/attributes", listener, MqttQoS.AT_LEAST_ONCE).get(3 * timeoutMultiplier, TimeUnit.SECONDS);
TimeUnit.SECONDS.sleep(2 * timeoutMultiplier);
assertThat(subAckResult[0]).isNotNull();
assertThat(MqttReasonCodes.SubAck.TOPIC_FILTER_INVALID.equals(subAckResult[0]));
testRestClient.deleteDeviceIfExists(device.getId());
updateDeviceProfileWithProvisioningStrategy(deviceProfile, DeviceProfileProvisionType.DISABLED);
}
@Test
public void provisionRequestForDeviceWithDisabledProvisioningStrategy() throws Exception {

View File

@ -21,6 +21,8 @@ import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.BoundStatementBuilder;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@ -38,6 +40,7 @@ import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.data.rule.RuleChainType;
import org.thingsboard.server.common.data.util.TbPair;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.dao.cassandra.CassandraCluster;
import org.thingsboard.server.dao.cassandra.guava.GuavaSession;
@ -57,6 +60,7 @@ import static org.thingsboard.common.util.DonAsynchron.withCallback;
@RuleNode(type = ComponentType.ACTION,
name = "save to custom table",
configClazz = TbSaveToCustomCassandraTableNodeConfiguration.class,
version = 1,
nodeDescription = "Node stores data from incoming Message payload to the Cassandra database into the predefined custom table" +
" that should have <b>cs_tb_</b> prefix, to avoid the data insertion to the common TB tables.<br>" +
"<b>Note:</b> rule node can be used only for Cassandra DB.",
@ -87,11 +91,13 @@ public class TbSaveToCustomCassandraTableNode implements TbNode {
config = TbNodeUtils.convert(configuration, TbSaveToCustomCassandraTableNodeConfiguration.class);
cassandraCluster = ctx.getCassandraCluster();
if (cassandraCluster == null) {
throw new RuntimeException("Unable to connect to Cassandra database");
} else {
startExecutor();
saveStmt = getSaveStmt();
throw new TbNodeException("Unable to connect to Cassandra database", true);
}
if (!isTableExists()) {
throw new TbNodeException("Table '" + TABLE_PREFIX + config.getTableName() + "' does not exist in Cassandra cluster.");
}
startExecutor();
saveStmt = getSaveStmt();
}
@Override
@ -115,6 +121,12 @@ public class TbSaveToCustomCassandraTableNode implements TbNode {
}
}
private boolean isTableExists() {
var keyspaceMdOpt = getSession().getMetadata().getKeyspace(cassandraCluster.getKeyspaceName());
return keyspaceMdOpt.map(keyspaceMetadata ->
keyspaceMetadata.getTable(TABLE_PREFIX + config.getTableName()).isPresent()).orElse(false);
}
private PreparedStatement prepare(String query) {
return getSession().prepare(query);
}
@ -127,10 +139,10 @@ public class TbSaveToCustomCassandraTableNode implements TbNode {
return session;
}
private PreparedStatement getSaveStmt() {
private PreparedStatement getSaveStmt() throws TbNodeException {
fieldsMap = config.getFieldsMapping();
if (fieldsMap.isEmpty()) {
throw new RuntimeException("Fields(key,value) map is empty!");
throw new TbNodeException("Fields(key,value) map is empty!", true);
} else {
return prepareStatement(new ArrayList<>(fieldsMap.values()));
}
@ -163,16 +175,19 @@ public class TbSaveToCustomCassandraTableNode implements TbNode {
query.append("?, ");
}
}
if (config.getDefaultTtl() > 0) {
query.append(" USING TTL ?");
}
return query.toString();
}
private ListenableFuture<Void> save(TbMsg msg, TbContext ctx) {
JsonElement data = JsonParser.parseString(msg.getData());
if (!data.isJsonObject()) {
throw new IllegalStateException("Invalid message structure, it is not a JSON Object:" + data);
throw new IllegalStateException("Invalid message structure, it is not a JSON Object: " + data);
} else {
JsonObject dataAsObject = data.getAsJsonObject();
BoundStatementBuilder stmtBuilder = new BoundStatementBuilder(saveStmt.bind());
BoundStatementBuilder stmtBuilder = getStmtBuilder();
AtomicInteger i = new AtomicInteger(0);
fieldsMap.forEach((key, value) -> {
if (key.equals(ENTITY_ID)) {
@ -197,17 +212,24 @@ public class TbSaveToCustomCassandraTableNode implements TbNode {
} else if (dataKeyElement.isJsonObject()) {
stmtBuilder.setString(i.get(), dataKeyElement.getAsJsonObject().toString());
} else {
throw new IllegalStateException("Message data key: '" + key + "' with value: '" + value + "' is not a JSON Object or JSON Primitive!");
throw new IllegalStateException("Message data key: '" + key + "' with value: '" + dataKeyElement + "' is not a JSON Object or JSON Primitive!");
}
} else {
throw new RuntimeException("Message data doesn't contain key: " + "'" + key + "'!");
}
i.getAndIncrement();
});
if (config.getDefaultTtl() > 0) {
stmtBuilder.setInt(i.get(), config.getDefaultTtl());
}
return getFuture(executeAsyncWrite(ctx, stmtBuilder.build()), rs -> null);
}
}
BoundStatementBuilder getStmtBuilder() {
return new BoundStatementBuilder(saveStmt.bind());
}
private TbResultSetFuture executeAsyncWrite(TbContext ctx, Statement statement) {
return executeAsync(ctx, statement, defaultWriteLevel);
}
@ -240,4 +262,20 @@ public class TbSaveToCustomCassandraTableNode implements TbNode {
}, readResultsProcessingExecutor);
}
@Override
public TbPair<Boolean, JsonNode> upgrade(int fromVersion, JsonNode oldConfiguration) throws TbNodeException {
boolean hasChanges = false;
switch (fromVersion) {
case 0:
if (!oldConfiguration.has("defaultTtl")) {
hasChanges = true;
((ObjectNode) oldConfiguration).put("defaultTtl", 0);
}
break;
default:
break;
}
return new TbPair<>(hasChanges, oldConfiguration);
}
}

View File

@ -27,11 +27,13 @@ public class TbSaveToCustomCassandraTableNodeConfiguration implements NodeConfig
private String tableName;
private Map<String, String> fieldsMapping;
private int defaultTtl;
@Override
public TbSaveToCustomCassandraTableNodeConfiguration defaultConfiguration() {
TbSaveToCustomCassandraTableNodeConfiguration configuration = new TbSaveToCustomCassandraTableNodeConfiguration();
configuration.setDefaultTtl(0);
configuration.setTableName("");
Map<String, String> map = new HashMap<>();
map.put("", "");

View File

@ -0,0 +1,396 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.action;
import com.datastax.oss.driver.api.core.DefaultConsistencyLevel;
import com.datastax.oss.driver.api.core.ProtocolVersion;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.BoundStatementBuilder;
import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.metadata.Metadata;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;
import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry;
import com.google.common.util.concurrent.SettableFuture;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.test.util.ReflectionTestUtils;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ListeningExecutor;
import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest;
import org.thingsboard.rule.engine.TestDbCallbackExecutor;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.dao.cassandra.CassandraCluster;
import org.thingsboard.server.dao.cassandra.guava.GuavaSession;
import org.thingsboard.server.dao.nosql.CassandraStatementTask;
import org.thingsboard.server.dao.nosql.TbResultSet;
import org.thingsboard.server.dao.nosql.TbResultSetFuture;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Stream;
import static java.util.Collections.emptyMap;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyDouble;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.never;
import static org.mockito.BDDMockito.spy;
import static org.mockito.BDDMockito.then;
import static org.mockito.BDDMockito.willAnswer;
@ExtendWith(MockitoExtension.class)
public class TbSaveToCustomCassandraTableNodeTest extends AbstractRuleNodeUpgradeTest {
private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("ac4ca02e-2ae6-404a-8f7e-c4ae31c56aa7"));
private final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("64ad971e-9cfa-49e4-9f59-faa1a2350c6e"));
private final ListeningExecutor dbCallbackExecutor = new TestDbCallbackExecutor();
private TbSaveToCustomCassandraTableNode node;
private TbSaveToCustomCassandraTableNodeConfiguration config;
@Mock
private TbContext ctxMock;
@Mock
private CassandraCluster cassandraClusterMock;
@Mock
private GuavaSession sessionMock;
@Mock
private PreparedStatement preparedStatementMock;
@Mock
private BoundStatement boundStatementMock;
@Mock
private BoundStatementBuilder boundStatementBuilderMock;
@Mock
private ColumnDefinitions columnDefinitionsMock;
@Mock
private CodecRegistry codecRegistryMock;
@Mock
private ProtocolVersion protocolVersionMock;
@Mock
private Node nodeMock;
@Mock
private Metadata metadataMock;
@Mock
private KeyspaceMetadata keyspaceMetadataMock;
@Mock
private TableMetadata tableMetadataMock;
@BeforeEach
public void setUp() {
node = spy(new TbSaveToCustomCassandraTableNode());
config = new TbSaveToCustomCassandraTableNodeConfiguration().defaultConfiguration();
}
@AfterEach
public void tearDown() {
node.destroy();
}
@Test
public void verifyDefaultConfig() {
assertThat(config.getTableName()).isEqualTo("");
assertThat(config.getFieldsMapping()).isEqualTo(Map.of("", ""));
assertThat(config.getDefaultTtl()).isEqualTo(0);
}
@Test
public void givenCassandraClusterIsMissing_whenInit_thenThrowsException() {
var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
assertThatThrownBy(() -> node.init(ctxMock, configuration))
.isInstanceOf(TbNodeException.class)
.hasMessage("Unable to connect to Cassandra database")
.extracting(e -> ((TbNodeException) e).isUnrecoverable())
.isEqualTo(true);
}
@Test
public void givenTableDoesNotExist_whenInit_thenThrowsException() {
config.setTableName("test_table");
var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
mockCassandraCluster();
given(keyspaceMetadataMock.getTable(anyString())).willReturn(Optional.empty());
assertThatThrownBy(() -> node.init(ctxMock, configuration))
.isInstanceOf(TbNodeException.class)
.hasMessage("Table 'cs_tb_test_table' does not exist in Cassandra cluster.")
.extracting(e -> ((TbNodeException) e).isUnrecoverable())
.isEqualTo(false);
}
@Test
public void givenFieldsMapIsEmpty_whenInit_thenThrowsException() {
config.setTableName("test_table");
config.setFieldsMapping(emptyMap());
var configuration = new TbNodeConfiguration(JacksonUtil.valueToTree(config));
mockCassandraCluster();
assertThatThrownBy(() -> node.init(ctxMock, configuration))
.isInstanceOf(TbNodeException.class)
.hasMessage("Fields(key,value) map is empty!");
}
@Test
public void givenInvalidMessageStructure_whenOnMsg_thenThrowsException() throws TbNodeException {
config.setTableName("temperature_sensor");
config.setFieldsMapping(Map.of("temp", "temperature"));
mockOnInit();
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_STRING);
assertThatThrownBy(() -> node.onMsg(ctxMock, msg))
.isInstanceOf(IllegalStateException.class)
.hasMessage("Invalid message structure, it is not a JSON Object: " + null);
}
@Test
public void givenDataKeyIsMissingInMsg_whenOnMsg_thenThrowsException() throws TbNodeException {
config.setTableName("temperature_sensor");
config.setFieldsMapping(Map.of("temp", "temperature"));
mockOnInit();
mockBoundStatement();
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
String data = """
{
"humidity": 77
}
""";
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, data);
assertThatThrownBy(() -> node.onMsg(ctxMock, msg))
.isInstanceOf(RuntimeException.class)
.hasMessage("Message data doesn't contain key: 'temp'!");
}
@Test
public void givenUnsupportedData_whenOnMsg_thenThrowsException() throws TbNodeException {
config.setTableName("temperature_sensor");
config.setFieldsMapping(Map.of("temp", "temperature"));
mockOnInit();
mockBoundStatement();
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
String data = """
{
"temp": [value]
}
""";
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, data);
assertThatThrownBy(() -> node.onMsg(ctxMock, msg))
.isInstanceOf(RuntimeException.class)
.hasMessage("Message data key: 'temp' with value: '[\"value\"]' is not a JSON Object or JSON Primitive!");
}
@ParameterizedTest
@MethodSource
public void givenTtl_whenOnMsg_thenVerifyStatement(int ttlFromConfig,
String expectedQuery,
Consumer<BoundStatementBuilder> verifyBuilder) throws TbNodeException {
config.setTableName("readings");
config.setFieldsMapping(Map.of("$entityId", "entityIdTableColumn"));
config.setDefaultTtl(ttlFromConfig);
mockOnInit();
willAnswer(invocation -> boundStatementBuilderMock).given(node).getStmtBuilder();
given(boundStatementBuilderMock.setUuid(anyInt(), any(UUID.class))).willReturn(boundStatementBuilderMock);
given(boundStatementBuilderMock.build()).willReturn(boundStatementMock);
mockSubmittingCassandraTask();
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_JSON_OBJECT);
node.onMsg(ctxMock, msg);
then(sessionMock).should().prepare(expectedQuery);
verifyBuilder.accept(boundStatementBuilderMock);
}
private static Stream<Arguments> givenTtl_whenOnMsg_thenVerifyStatement() {
return Stream.of(
Arguments.of(0, "INSERT INTO cs_tb_readings(entityIdTableColumn) VALUES(?)",
(Consumer<BoundStatementBuilder>) builder -> {
then(builder).should(never()).setInt(anyInt(), anyInt());
}),
Arguments.of(20, "INSERT INTO cs_tb_readings(entityIdTableColumn) VALUES(?) USING TTL ?",
(Consumer<BoundStatementBuilder>) builder -> {
then(builder).should().setInt(1, 20);
})
);
}
@Test
public void givenValidMsgStructure_whenOnMsg_thenVerifyMatchOfValuesInsertionOrderIntoStatementAndSaveToCustomCassandraTable() throws TbNodeException {
config.setDefaultTtl(25);
config.setTableName("readings");
Map<String, String> mappings = Map.of(
"$entityId", "entityIdTableColumn",
"doubleField", "doubleTableColumn",
"longField", "longTableColumn",
"booleanField", "booleanTableColumn",
"stringField", "stringTableColumn",
"jsonField", "jsonTableColumn"
);
config.setFieldsMapping(mappings);
mockOnInit();
mockBoundStatementBuilder();
mockSubmittingCassandraTask();
node.init(ctxMock, new TbNodeConfiguration(JacksonUtil.valueToTree(config)));
String data = """
{
"doubleField": 22.5,
"longField": 56,
"booleanField": true,
"stringField": "some string",
"jsonField": {
"key": "value"
}
}
""";
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, TbMsgMetaData.EMPTY, data);
node.onMsg(ctxMock, msg);
verifySettingStatementBuilder();
ArgumentCaptor<CassandraStatementTask> taskCaptor = ArgumentCaptor.forClass(CassandraStatementTask.class);
then(ctxMock).should().submitCassandraWriteTask(taskCaptor.capture());
CassandraStatementTask task = taskCaptor.getValue();
assertThat(task.getTenantId()).isEqualTo(TENANT_ID);
assertThat(task.getSession()).isEqualTo(sessionMock);
assertThat(task.getStatement()).isEqualTo(boundStatementMock);
await().atMost(1, TimeUnit.SECONDS).untilAsserted(
() -> then(ctxMock).should().tellSuccess(msg)
);
}
@Override
protected TbNode getTestNode() {
return node;
}
private static Stream<Arguments> givenFromVersionAndConfig_whenUpgrade_thenVerifyHasChangesAndConfig() {
return Stream.of(
// config for version 1 with upgrade from version 0
Arguments.of(0,
"{\"tableName\":\"\",\"fieldsMapping\":{\"\":\"\"}}",
true,
"{\"tableName\":\"\",\"fieldsMapping\":{\"\":\"\"},\"defaultTtl\":0}"
),
// default config for version 1 with upgrade from version 1
Arguments.of(1,
"{\"tableName\":\"\",\"fieldsMapping\":{\"\":\"\"},\"defaultTtl\":0}",
false,
"{\"tableName\":\"\",\"fieldsMapping\":{\"\":\"\"},\"defaultTtl\":0}"
)
);
}
private void mockOnInit() {
mockCassandraCluster();
given(cassandraClusterMock.getDefaultWriteConsistencyLevel()).willReturn(DefaultConsistencyLevel.ONE);
given(sessionMock.prepare(anyString())).willReturn(preparedStatementMock);
}
private void mockCassandraCluster() {
given(ctxMock.getCassandraCluster()).willReturn(cassandraClusterMock);
given(cassandraClusterMock.getSession()).willReturn(sessionMock);
given(sessionMock.getMetadata()).willReturn(metadataMock);
given(cassandraClusterMock.getKeyspaceName()).willReturn("test_keyspace");
given(metadataMock.getKeyspace(anyString())).willReturn(Optional.of(keyspaceMetadataMock));
given(keyspaceMetadataMock.getTable(anyString())).willReturn(Optional.of(tableMetadataMock));
}
private void mockBoundStatement() {
given(preparedStatementMock.bind()).willReturn(boundStatementMock);
given(boundStatementMock.getPreparedStatement()).willReturn(preparedStatementMock);
given(preparedStatementMock.getVariableDefinitions()).willReturn(columnDefinitionsMock);
given(boundStatementMock.codecRegistry()).willReturn(codecRegistryMock);
given(boundStatementMock.protocolVersion()).willReturn(protocolVersionMock);
given(boundStatementMock.getNode()).willReturn(nodeMock);
}
private void mockBoundStatementBuilder() {
willAnswer(invocation -> boundStatementBuilderMock).given(node).getStmtBuilder();
given(boundStatementBuilderMock.setUuid(anyInt(), any(UUID.class))).willReturn(boundStatementBuilderMock);
given(boundStatementBuilderMock.setDouble(anyInt(), anyDouble())).willReturn(boundStatementBuilderMock);
given(boundStatementBuilderMock.setLong(anyInt(), anyLong())).willReturn(boundStatementBuilderMock);
given(boundStatementBuilderMock.setBoolean(anyInt(), anyBoolean())).willReturn(boundStatementBuilderMock);
given(boundStatementBuilderMock.setString(anyInt(), anyString())).willReturn(boundStatementBuilderMock);
given(boundStatementBuilderMock.setInt(anyInt(), anyInt())).willReturn(boundStatementBuilderMock);
given(boundStatementBuilderMock.build()).willReturn(boundStatementMock);
}
private void mockSubmittingCassandraTask() {
given(ctxMock.getTenantId()).willReturn(TENANT_ID);
willAnswer(invocation -> {
SettableFuture<TbResultSet> mainFuture = SettableFuture.create();
mainFuture.set(new TbResultSet(null, null, null));
return new TbResultSetFuture(mainFuture);
}).given(ctxMock).submitCassandraWriteTask(any());
given(ctxMock.getDbCallbackExecutor()).willReturn(dbCallbackExecutor);
}
private void verifySettingStatementBuilder() {
Map<String, String> fieldsMap = (Map<String, String>) ReflectionTestUtils.getField(node, "fieldsMap");
List<String> values = new ArrayList<>(fieldsMap.values());
then(boundStatementBuilderMock).should().setUuid(values.indexOf("entityIdTableColumn"), DEVICE_ID.getId());
then(boundStatementBuilderMock).should().setDouble(values.indexOf("doubleTableColumn"), 22.5);
then(boundStatementBuilderMock).should().setLong(values.indexOf("longTableColumn"), 56L);
then(boundStatementBuilderMock).should().setBoolean(values.indexOf("booleanTableColumn"), true);
then(boundStatementBuilderMock).should().setString(values.indexOf("stringTableColumn"), "some string");
then(boundStatementBuilderMock).should().setString(values.indexOf("jsonTableColumn"), "{\"key\":\"value\"}");
then(boundStatementBuilderMock).should().setInt(values.size(), 25);
}
}